You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
Paddle/python/paddle/fluid/incubate/fleet/utils/fleet_util.py

1738 lines
67 KiB

# Copyright (c) 2019 PaddlePaddle Authors. All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
"""Fleet Utils."""
import collections
import copy
import json
import logging
import math
import numpy as np
import os
import sys
import time
import paddle.fluid as fluid
from paddle.fluid.log_helper import get_logger
from paddle.fluid.incubate.fleet.parameter_server.distribute_transpiler import fleet as fleet_pslib
from paddle.fluid.incubate.fleet.parameter_server.distribute_transpiler import fleet as fleet_transpiler
from paddle.distributed.fleet.utils.fs import LocalFS, HDFSClient
from . import utils
__all__ = ["FleetUtil"]
_logger = get_logger(
__name__, logging.INFO, fmt='%(asctime)s-%(levelname)s: %(message)s')
fleet = fleet_pslib
class FleetUtil(object):
"""
FleetUtil provides some common functions for users' convenience.
Examples:
.. code-block:: python
from paddle.fluid.incubate.fleet.utils.fleet_util import FleetUtil
fleet_util = FleetUtil()
fleet_util.rank0_print("my log")
"""
def __init__(self, mode="pslib"):
global fleet
if mode == "pslib":
fleet = fleet_pslib
elif mode == "transpiler":
fleet = fleet_transpiler
else:
raise ValueError(
"Please choose one mode from [\"pslib\", \"transpiler\"]")
def rank0_print(self, s):
"""
Worker of rank 0 print some log.
Args:
s(str): string to print
Examples:
.. code-block:: python
from paddle.fluid.incubate.fleet.utils.fleet_util import FleetUtil
fleet_util = FleetUtil()
fleet_util.rank0_print("my log")
"""
if fleet.worker_index() != 0:
return
print(s)
sys.stdout.flush()
def rank0_info(self, s):
"""
Worker of rank 0 print some log info.
Args:
s(str): string to log
Examples:
.. code-block:: python
from paddle.fluid.incubate.fleet.utils.fleet_util import FleetUtil
fleet_util = FleetUtil()
fleet_util.rank0_info("my log info")
"""
if fleet.worker_index() != 0:
return
_logger.info(s)
def rank0_error(self, s):
"""
Worker of rank 0 print some log error.
Args:
s(str): string to log
Examples:
.. code-block:: python
from paddle.fluid.incubate.fleet.utils.fleet_util import FleetUtil
fleet_util = FleetUtil()
fleet_util.rank0_error("my log error")
"""
if fleet.worker_index() != 0:
return
_logger.error(s)
def set_zero(self,
var_name,
scope=fluid.global_scope(),
place=fluid.CPUPlace(),
param_type="int64"):
"""
Set tensor of a Variable to zero.
Args:
var_name(str): name of Variable
scope(Scope): Scope object, default is fluid.global_scope()
place(Place): Place object, default is fluid.CPUPlace()
param_type(str): param data type, default is int64
Examples:
.. code-block:: python
from paddle.fluid.incubate.fleet.utils.fleet_util import FleetUtil
fleet_util = FleetUtil()
fleet_util.set_zero(myvar.name, myscope)
"""
param = scope.var(var_name).get_tensor()
param_array = np.zeros(param._get_dims()).astype(param_type)
param.set(param_array, place)
def print_global_auc(self,
scope=fluid.global_scope(),
stat_pos="_generated_var_2",
stat_neg="_generated_var_3",
print_prefix=""):
"""
Print global auc of all distributed workers.
Args:
scope(Scope): Scope object, default is fluid.global_scope()
stat_pos(str): name of auc pos bucket Variable
stat_neg(str): name of auc neg bucket Variable
print_prefix(str): prefix of print auc
Examples:
.. code-block:: python
from paddle.fluid.incubate.fleet.utils.fleet_util import FleetUtil
fleet_util = FleetUtil()
fleet_util.print_global_auc(myscope, stat_pos=stat_pos.name,
stat_neg=stat_neg.name)
# below is part of model
emb = my_slot_net(slots, label) # emb can be fc layer of size 1
similarity_norm = fluid.layers.sigmoid(fluid.layers.clip(\
emb, min=-15.0, max=15.0), name="similarity_norm")\
binary_predict = fluid.layers.concat(input=[\
fluid.layers.elementwise_sub(\
fluid.layers.ceil(similarity_norm), similarity_norm),\
similarity_norm], axis=1)
auc, batch_auc, [batch_stat_pos, batch_stat_neg, stat_pos, \
stat_neg] = fluid.layers.auc(input=binary_predict,\
label=label, curve='ROC',\
num_thresholds=4096)
"""
auc_value = self.get_global_auc(scope, stat_pos, stat_neg)
self.rank0_print(print_prefix + " global auc = %s" % auc_value)
def get_global_auc(self,
scope=fluid.global_scope(),
stat_pos="_generated_var_2",
stat_neg="_generated_var_3"):
"""
Get global auc of all distributed workers.
Args:
scope(Scope): Scope object, default is fluid.global_scope()
stat_pos(str): name of auc pos bucket Variable
stat_neg(str): name of auc neg bucket Variable
Returns:
auc_value(float), total_ins_num(int)
Examples:
.. code-block:: python
from paddle.fluid.incubate.fleet.utils.fleet_util import FleetUtil
fleet_util = FleetUtil()
auc_value, _ = fleet_util.get_global_auc(myscope,
stat_pos=stat_pos,
stat_neg=stat_neg)
"""
if scope.find_var(stat_pos) is None or scope.find_var(stat_neg) is None:
self.rank0_print("not found auc bucket")
return None
fleet._role_maker._barrier_worker()
# auc pos bucket
pos = np.array(scope.find_var(stat_pos).get_tensor())
# auc pos bucket shape
old_pos_shape = np.array(pos.shape)
# reshape to one dim
pos = pos.reshape(-1)
global_pos = np.copy(pos) * 0
# mpi allreduce
fleet._role_maker._all_reduce(pos, global_pos)
# reshape to its original shape
global_pos = global_pos.reshape(old_pos_shape)
# auc neg bucket
neg = np.array(scope.find_var(stat_neg).get_tensor())
old_neg_shape = np.array(neg.shape)
neg = neg.reshape(-1)
global_neg = np.copy(neg) * 0
fleet._role_maker._all_reduce(neg, global_neg)
global_neg = global_neg.reshape(old_neg_shape)
# calculate auc
num_bucket = len(global_pos[0])
area = 0.0
pos = 0.0
neg = 0.0
new_pos = 0.0
new_neg = 0.0
total_ins_num = 0
for i in xrange(num_bucket):
index = num_bucket - 1 - i
new_pos = pos + global_pos[0][index]
total_ins_num += global_pos[0][index]
new_neg = neg + global_neg[0][index]
total_ins_num += global_neg[0][index]
area += (new_neg - neg) * (pos + new_pos) / 2
pos = new_pos
neg = new_neg
auc_value = None
if pos * neg == 0 or total_ins_num == 0:
auc_value = 0.5
else:
auc_value = area / (pos * neg)
fleet._role_maker._barrier_worker()
return auc_value
def load_fleet_model_one_table(self, table_id, path):
"""
load pslib model to one table
Args:
table_id(int): load model to one table, default is None, which mean
load all table.
path(str): model path
Examples:
.. code-block:: python
from paddle.fluid.incubate.fleet.utils.fleet_util import FleetUtil
fleet_util = FleetUtil()
fleet_util.load_fleet_model("hdfs:/my/model/path", table_id=1)
"""
fleet.load_one_table(table_id, path)
def load_fleet_model(self, path, mode=0):
"""
load pslib model
Args:
path(str): model path
mode(str): 0 or 1, which means load checkpoint or delta model,
default is 0
Examples:
.. code-block:: python
from paddle.fluid.incubate.fleet.utils.fleet_util import FleetUtil
fleet_util = FleetUtil()
fleet_util.load_fleet_model("hdfs:/my/model/path")
fleet_util.load_fleet_model("hdfs:/my/model/path", mode=0)
"""
fleet.init_server(path, mode=mode)
def save_fleet_model(self, path, mode=0):
"""
save pslib model
Args:
path(str): model path
mode(str): 0 or 1, which means save checkpoint or delta model,
default is 0
Examples:
.. code-block:: python
from paddle.fluid.incubate.fleet.utils.fleet_util import FleetUtil
fleet_util = FleetUtil()
fleet_util.save_fleet_model("hdfs:/my/model/path")
"""
fleet.save_persistables(None, path, mode=mode)
def _get_xbox_str(self,
output_path,
day,
model_path,
xbox_base_key,
data_path,
hadoop_fs_name,
monitor_data={},
mode="patch"):
xbox_dict = collections.OrderedDict()
if mode == "base":
xbox_dict["id"] = str(xbox_base_key)
elif mode == "patch":
xbox_dict["id"] = str(int(time.time()))
else:
print("warning: unknown mode %s, set it to patch" % mode)
mode = "patch"
xbox_dict["id"] = str(int(time.time()))
xbox_dict["key"] = str(xbox_base_key)
if model_path.startswith("hdfs:") or model_path.startswith("afs:"):
model_path = model_path[model_path.find(":") + 1:]
xbox_dict["input"] = hadoop_fs_name + model_path.rstrip("/") + "/000"
xbox_dict["record_count"] = "111111"
xbox_dict["partition_type"] = "2"
xbox_dict["job_name"] = "default_job_name"
xbox_dict["ins_tag"] = "feasign"
xbox_dict["ins_path"] = data_path
job_id_with_host = os.popen("echo -n ${JOB_ID}").read().strip()
instance_id = os.popen("echo -n ${INSTANCE_ID}").read().strip()
start_pos = instance_id.find(job_id_with_host)
end_pos = instance_id.find("--")
if start_pos != -1 and end_pos != -1:
job_id_with_host = instance_id[start_pos:end_pos]
xbox_dict["job_id"] = job_id_with_host
# currently hard code here, set monitor_data empty string
xbox_dict["monitor_data"] = ""
xbox_dict["monitor_path"] = output_path.rstrip("/") + "/monitor/" \
+ day + ".txt"
xbox_dict["mpi_size"] = str(fleet.worker_num())
return json.dumps(xbox_dict)
def write_model_donefile(self,
output_path,
day,
pass_id,
xbox_base_key,
hadoop_fs_name,
hadoop_fs_ugi,
hadoop_home="$HADOOP_HOME",
donefile_name="donefile.txt"):
"""
write donefile when save model
Args:
output_path(str): output path
day(str|int): training day
pass_id(str|int): training pass id
xbox_base_key(str|int): xbox base key
hadoop_fs_name(str): hdfs/afs fs name
hadoop_fs_ugi(str): hdfs/afs fs ugi
hadoop_home(str): hadoop home, default is "$HADOOP_HOME"
donefile_name(str): donefile name, default is "donefile.txt"
Examples:
.. code-block:: python
from paddle.fluid.incubate.fleet.utils.fleet_util import FleetUtil
fleet_util = FleetUtil()
fleet_util.write_model_donefile(output_path="hdfs:/my/output",
model_path="hdfs:/my/model",
day=20190723,
pass_id=66,
xbox_base_key=int(time.time()),
hadoop_fs_name="hdfs://xxx",
hadoop_fs_ugi="user,passwd")
"""
day = str(day)
pass_id = str(pass_id)
xbox_base_key = int(xbox_base_key)
if pass_id != "-1":
suffix_name = "/%s/%s/" % (day, pass_id)
model_path = output_path.rstrip("/") + suffix_name
else:
suffix_name = "/%s/0/" % day
model_path = output_path.rstrip("/") + suffix_name
if fleet.worker_index() == 0:
donefile_path = output_path + "/" + donefile_name
content = "%s\t%lu\t%s\t%s\t%d" % (day, xbox_base_key,\
model_path, pass_id, 0)
configs = {
"fs.default.name": hadoop_fs_name,
"hadoop.job.ugi": hadoop_fs_ugi
}
client = HDFSClient(hadoop_home, configs)
if client.is_file(donefile_path):
pre_content = client.cat(donefile_path)
pre_content_list = pre_content.split("\n")
day_list = [i.split("\t")[0] for i in pre_content_list]
pass_list = [i.split("\t")[3] for i in pre_content_list]
exist = False
for i in range(len(day_list)):
if int(day) == int(day_list[i]) and \
int(pass_id) == int(pass_list[i]):
exist = True
break
if not exist:
with open(donefile_name, "w") as f:
f.write(pre_content + "\n")
f.write(content + "\n")
client.delete(donefile_path)
client.upload(
output_path,
donefile_name,
multi_processes=1,
overwrite=False)
self.rank0_error("write %s/%s %s succeed" % \
(day, pass_id, donefile_name))
else:
self.rank0_error("not write %s because %s/%s already "
"exists" % (donefile_name, day, pass_id))
else:
with open(donefile_name, "w") as f:
f.write(content + "\n")
client.upload(
output_path,
donefile_name,
multi_processes=1,
overwrite=False)
self.rank0_error("write %s/%s %s succeed" % \
(day, pass_id, donefile_name))
fleet._role_maker._barrier_worker()
def write_xbox_donefile(self,
output_path,
day,
pass_id,
xbox_base_key,
data_path,
hadoop_fs_name,
hadoop_fs_ugi,
monitor_data={},
hadoop_home="$HADOOP_HOME",
donefile_name=None):
"""
write delta donefile or xbox base donefile
Args:
output_path(str): output path
day(str|int): training day of model
pass_id(str|int): training pass id of model
xbox_base_key(str|int): xbox base key
data_path(str|list): training data path
hadoop_fs_name(str): hdfs/afs fs name
hadoop_fs_ugi(str): hdfs/afs fs ugi
monitor_data(dict): metrics
hadoop_home(str): hadoop home, default is "$HADOOP_HOME"
donefile_name(str): donefile name, default is None"
Examples:
.. code-block:: python
from paddle.fluid.incubate.fleet.utils.fleet_util import FleetUtil
fleet_util = FleetUtil()
fleet_util.write_xbox_donefile(
output_path="hdfs:/my/output/",
model_path="hdfs:/my/output/20190722/01",
day=20190722,
pass_id=1,
xbox_base_key=int(time.time()),
data_path="hdfs:/my/data/",
hadoop_fs_name="hdfs://xxx",
hadoop_fs_ugi="user,passwd",
monitor_data={}
)
"""
day = str(day)
pass_id = str(pass_id)
xbox_base_key = int(xbox_base_key)
mode = None
if pass_id != "-1":
mode = "patch"
suffix_name = "/%s/delta-%s/" % (day, pass_id)
model_path = output_path.rstrip("/") + suffix_name
if donefile_name is None:
donefile_name = "xbox_patch_done.txt"
else:
mode = "base"
suffix_name = "/%s/base/" % day
model_path = output_path.rstrip("/") + suffix_name
if donefile_name is None:
donefile_name = "xbox_base_done.txt"
if isinstance(data_path, list):
data_path = ",".join(data_path)
if fleet.worker_index() == 0:
donefile_path = output_path + "/" + donefile_name
xbox_str = self._get_xbox_str(output_path, day, model_path, \
xbox_base_key, data_path, hadoop_fs_name, monitor_data={},
mode=mode)
configs = {
"fs.default.name": hadoop_fs_name,
"hadoop.job.ugi": hadoop_fs_ugi
}
client = HDFSClient(hadoop_home, configs)
if client.is_file(donefile_path):
pre_content = client.cat(donefile_path)
last_dict = json.loads(pre_content.split("\n")[-1])
last_day = last_dict["input"].split("/")[-3]
last_pass = last_dict["input"].split("/")[-2].split("-")[-1]
exist = False
if int(day) < int(last_day) or \
int(day) == int(last_day) and \
int(pass_id) <= int(last_pass):
exist = True
if not exist:
with open(donefile_name, "w") as f:
f.write(pre_content + "\n")
f.write(xbox_str + "\n")
client.delete(donefile_path)
client.upload(
output_path,
donefile_name,
multi_processes=1,
overwrite=False)
self.rank0_error("write %s/%s %s succeed" % \
(day, pass_id, donefile_name))
else:
self.rank0_error("not write %s because %s/%s already "
"exists" % (donefile_name, day, pass_id))
else:
with open(donefile_name, "w") as f:
f.write(xbox_str + "\n")
client.upload(
output_path,
donefile_name,
multi_processes=1,
overwrite=False)
self.rank0_error("write %s/%s %s succeed" % \
(day, pass_id, donefile_name))
fleet._role_maker._barrier_worker()
def write_cache_donefile(self,
output_path,
day,
pass_id,
key_num,
hadoop_fs_name,
hadoop_fs_ugi,
hadoop_home="$HADOOP_HOME",
donefile_name="sparse_cache.meta",
**kwargs):
"""
write cache donefile
Args:
output_path(str): output path
day(str|int): training day of model
pass_id(str|int): training pass id of model
key_num(str|int): save cache return value
hadoop_fs_name(str): hdfs/afs fs name
hadoop_fs_ugi(str): hdfs/afs fs ugi
hadoop_home(str): hadoop home, default is "$HADOOP_HOME"
donefile_name(str): donefile name, default is "sparse_cache.meta"
kwargs(dict): user defined properties
file_num(int): cache file num
table_id(int): cache table id
Examples:
.. code-block:: python
from paddle.fluid.incubate.fleet.utils.fleet_util import FleetUtil
fleet_util = FleetUtil()
fleet_util.write_cache_donefile(
output_path="hdfs:/my/output/",
day=20190722,
pass_id=1,
key_num=123456,
hadoop_fs_name="hdfs://xxx",
hadoop_fs_ugi="user,passwd",
)
"""
day = str(day)
pass_id = str(pass_id)
key_num = int(key_num)
file_num = kwargs.get("file_num", 16)
table_id = kwargs.get("table_id", 0)
if pass_id != "-1":
suffix_name = "/%s/delta-%s/%03d_cache" % (day, pass_id, table_id)
model_path = output_path.rstrip("/") + suffix_name
else:
suffix_name = "/%s/base/%03d_cache" % (day, table_id)
model_path = output_path.rstrip("/") + suffix_name
if fleet.worker_index() == 0:
donefile_path = model_path + "/" + donefile_name
configs = {
"fs.default.name": hadoop_fs_name,
"hadoop.job.ugi": hadoop_fs_ugi
}
client = HDFSClient(hadoop_home, configs)
if client.is_file(donefile_path):
self.rank0_error( \
"not write because %s already exists" % donefile_path)
else:
meta_str = "file_prefix:part\npart_num:%s\nkey_num:%d\n" \
% (file_num, key_num)
with open(donefile_name, "w") as f:
f.write(meta_str)
client.upload(
model_path,
donefile_name,
multi_processes=1,
overwrite=False)
self.rank0_error("write %s succeed" % donefile_path)
fleet._role_maker._barrier_worker()
def load_model(self, output_path, day, pass_id):
"""
load pslib model
Args:
output_path(str): output path
day(str|int): training day
pass_id(str|int): training pass id
Examples:
.. code-block:: python
from paddle.fluid.incubate.fleet.utils.fleet_util import FleetUtil
fleet_util = FleetUtil()
fleet_util.load_model("hdfs:/my/path", 20190722, 88)
"""
day = str(day)
pass_id = str(pass_id)
suffix_name = "/%s/%s/" % (day, pass_id)
load_path = output_path + suffix_name
self.rank0_error("going to load_model %s" % load_path)
self.load_fleet_model(load_path)
self.rank0_error("load_model done")
def save_model(self, output_path, day, pass_id):
"""
save pslib model
Args:
output_path(str): output path
day(str|int): training day
pass_id(str|int): training pass id
Examples:
.. code-block:: python
from paddle.fluid.incubate.fleet.utils.fleet_util import FleetUtil
fleet_util = FleetUtil()
fleet_util.save_model("hdfs:/my/path", 20190722, 88)
"""
day = str(day)
pass_id = str(pass_id)
suffix_name = "/%s/%s/" % (day, pass_id)
model_path = output_path + suffix_name
self.rank0_print("going to save_model %s" % model_path)
self.save_fleet_model(model_path)
self.rank0_print("save_model done")
def save_batch_model(self, output_path, day):
"""
save batch model
Args:
output_path(str): output path
day(str|int): training day
Examples:
.. code-block:: python
from paddle.fluid.incubate.fleet.utils.fleet_util import FleetUtil
fleet_util = FleetUtil()
fleet_util.save_batch_model("hdfs:/my/path", 20190722)
"""
day = str(day)
suffix_name = "/%s/0/" % day
model_path = output_path + suffix_name
self.rank0_print("going to save_model %s" % model_path)
fleet.save_persistables(None, model_path, mode=3)
self.rank0_print("save_batch_model done")
def save_delta_model(self, output_path, day, pass_id):
"""
save delta model
Args:
output_path(str): output path
day(str|int): training day
pass_id(str|int): training pass id
Examples:
.. code-block:: python
from paddle.fluid.incubate.fleet.utils.fleet_util import FleetUtil
fleet_util = FleetUtil()
fleet_util.save_batch_model("hdfs:/my/path", 20190722, 88)
"""
day = str(day)
pass_id = str(pass_id)
suffix_name = "/%s/delta-%s/" % (day, pass_id)
model_path = output_path + suffix_name
self.rank0_print("going to save_delta_model %s" % model_path)
fleet.save_persistables(None, model_path, mode=1)
self.rank0_print("save_delta_model done")
def save_xbox_base_model(self, output_path, day):
"""
save xbox base model
Args:
output_path(str): output path
day(str|int): training day
Examples:
.. code-block:: python
from paddle.fluid.incubate.fleet.utils.fleet_util import FleetUtil
fleet_util = FleetUtil()
fleet_util.save_xbox_base_model("hdfs:/my/path", 20190722, 88)
"""
day = str(day)
suffix_name = "/%s/base/" % day
model_path = output_path + suffix_name
self.rank0_print("going to save_xbox_base_model " + model_path)
fleet.save_persistables(None, model_path, mode=2)
self.rank0_print("save_xbox_base_model done")
def save_cache_model(self, output_path, day, pass_id, mode=1, **kwargs):
"""
save cache model
Args:
output_path(str): output path
day(str|int): training day
pass_id(str|int): training pass id
mode(str|int): save mode
kwargs(dict): user defined properties
table_id(int): table id to save cache
Returns:
key_num(int): cache key num
Examples:
.. code-block:: python
from paddle.fluid.incubate.fleet.utils.fleet_util import FleetUtil
fleet_util = FleetUtil()
fleet_util.save_cache_model("hdfs:/my/path", 20190722, 88)
"""
day = str(day)
pass_id = str(pass_id)
mode = int(mode)
table_id = kwargs.get("table_id", 0)
suffix_name = "/%s/delta-%s" % (day, pass_id)
model_path = output_path.rstrip("/") + suffix_name
self.rank0_print("going to save_cache_model %s" % model_path)
key_num = fleet.save_cache_model(
None, model_path, mode=mode, table_id=table_id)
self.rank0_print("save_cache_model done")
return key_num
def save_cache_base_model(self, output_path, day, **kwargs):
"""
save cache model
Args:
output_path(str): output path
day(str|int): training day
pass_id(str|int): training pass id
kwargs(dict): user defined properties
table_id(int): table id to save cache
Returns:
key_num(int): cache key num
Examples:
.. code-block:: python
from paddle.fluid.incubate.fleet.utils.fleet_util import FleetUtil
fleet_util = FleetUtil()
fleet_util.save_cache_base_model("hdfs:/my/path", 20190722)
"""
day = str(day)
table_id = kwargs.get("table_id", 0)
suffix_name = "/%s/base" % day
model_path = output_path.rstrip("/") + suffix_name
self.rank0_print("going to save_cache_base_model %s" % model_path)
key_num = fleet.save_cache_model(
None, model_path, mode=2, table_id=table_id)
self.rank0_print("save_cache_base_model done")
return key_num
def pull_all_dense_params(self, scope, program):
"""
pull all dense params in trainer of rank 0
Args:
scope(Scope): fluid Scope
program(Program): fluid Program
Examples:
.. code-block:: python
from paddle.fluid.incubate.fleet.utils.fleet_util import FleetUtil
fleet_util = FleetUtil()
fleet_util.pull_all_dense_params(my_scope, my_program)
"""
fleet._role_maker._barrier_worker()
if fleet._role_maker.is_first_worker():
prog_id = str(id(program))
tables = fleet._opt_info["program_id_to_worker"][prog_id].\
get_desc().dense_table
prog_conf = fleet._opt_info['program_configs'][prog_id]
prog_tables = {}
for key in prog_conf:
if "dense" not in key:
continue
for table_id in prog_conf[key]:
prog_tables[int(table_id)] = 0
for table in tables:
if int(table.table_id) not in prog_tables:
continue
var_name_list = []
for i in range(0, len(table.dense_variable_name)):
var_name = table.dense_variable_name[i]
if scope.find_var(var_name) is None:
raise ValueError("var " + var_name +
" not found in scope " +
"when pull dense")
var_name_list.append(var_name)
fleet._fleet_ptr.pull_dense(scope,
int(table.table_id), var_name_list)
fleet._role_maker._barrier_worker()
def save_paddle_inference_model(self,
executor,
scope,
program,
feeded_vars,
target_vars,
output_path,
day,
pass_id,
hadoop_fs_name,
hadoop_fs_ugi,
hadoop_home="$HADOOP_HOME",
save_combine=True):
"""
save paddle inference model, and upload to hdfs dnn_plugin path
Args:
executor(Executor): fluid Executor
scope(Scope): fluid Scope
program(Program): fluid Program
feeded_vars(list[Variable]): feed vars
target_vars(list[variable]): fetch vars
output_path(str): hdfs/afs output path
day(str|int): training day
pass_id(str|int): training pass
hadoop_fs_name(str): hadoop fs name
hadoop_fs_ugi(str): hadoop fs ugi
hadoop_home(str): hadoop home, default is "$HADOOP_HOME"
save_combine(bool): whether to save in a file or separate files,
default is True
Examples:
.. code-block:: python
from paddle.fluid.incubate.fleet.utils.fleet_util import FleetUtil
fleet_util = FleetUtil()
fleet_util.save_paddle_inference_model(exe,
join_scope,
join_program,
feeded_vars,
target_vars,
"hdfs:/my/output/path/",
day=20190727,
pass_id=6,
hadoop_fs_name="xxx",
hadoop_fs_ugi="xxx,xxx")
"""
day = str(day)
pass_id = str(pass_id)
feeded_var_names = [i.name for i in feeded_vars]
model_name = "inference_model"
# pull dense before save
self.pull_all_dense_params(scope, program)
if fleet.worker_index() == 0:
with fluid.scope_guard(scope):
if save_combine:
fluid.io.save_inference_model(
dirname=model_name,
feeded_var_names=feeded_var_names,
target_vars=target_vars,
executor=executor,
main_program=program.clone(),
params_filename="params")
else:
fluid.io.save_inference_model(
dirname=model_name,
feeded_var_names=feeded_var_names,
target_vars=target_vars,
executor=executor,
main_program=program.clone())
configs = {
"fs.default.name": hadoop_fs_name,
"hadoop.job.ugi": hadoop_fs_ugi
}
client = HDFSClient(hadoop_home, configs)
if pass_id == "-1":
dest = "%s/%s/base/dnn_plugin/" % (output_path, day)
else:
dest = "%s/%s/delta-%s/dnn_plugin/" % (output_path, day,
pass_id)
if not client.is_exist(dest):
client.makedirs(dest)
client.upload(dest, model_name)
fleet._role_maker._barrier_worker()
def save_paddle_params(self,
executor,
scope,
program,
model_name,
output_path,
day,
pass_id,
hadoop_fs_name,
hadoop_fs_ugi,
hadoop_home="$HADOOP_HOME",
var_names=None,
save_combine=True):
"""
save paddle model, and upload to hdfs dnn_plugin path
Args:
executor(Executor): fluid Executor
scope(Scope): fluid Scope
program(Program): fluid Program
model_name(str): save model local dir or filename
output_path(str): hdfs/afs output path
day(str|int): training day
pass_id(str|int): training pass
hadoop_fs_name(str): hadoop fs name
hadoop_fs_ugi(str): hadoop fs ugi
hadoop_home(str): hadoop home, default is "$HADOOP_HOME"
var_names(list): save persistable var names, default is None
save_combine(bool): whether to save in a file or separate files,
default is True
Examples:
.. code-block:: python
from paddle.fluid.incubate.fleet.utils.fleet_util import FleetUtil
fleet_util = FleetUtil()
fleet_util.save_paddle_params(exe,
join_scope,
join_program,
"paddle_dense.model.0",
"hdfs:/my/output/path/",
day=20190727,
pass_id=6,
hadoop_fs_name="xxx",
hadoop_fs_ugi="xxx,xxx",
var_names=join_all_var_names)
fleet_util.save_paddle_params(exe,
join_scope,
join_program,
"paddle_dense.model.usr.0",
"hdfs:/my/output/path/",
day=20190727,
pass_id=6,
hadoop_fs_name="xxx",
hadoop_fs_ugi="xxx,xxx",
var_names=join_user_var_names)
fleet_util.save_paddle_params(exe,
join_scope,
join_program,
"paddle_dense.model.item.0",
"hdfs:/my/output/path/",
day=20190727,
pass_id=6,
hadoop_fs_name="xxx",
hadoop_fs_ugi="xxx,xxx",
var_names=join_user_item_names)
"""
day = str(day)
pass_id = str(pass_id)
# pull dense before save
self.pull_all_dense_params(scope, program)
if fleet.worker_index() == 0:
vars = [program.global_block().var(i) for i in var_names]
with fluid.scope_guard(scope):
if save_combine:
fluid.io.save_vars(
executor, "./", program, vars=vars, filename=model_name)
else:
fluid.io.save_vars(executor, model_name, program, vars=vars)
configs = {
"fs.default.name": hadoop_fs_name,
"hadoop.job.ugi": hadoop_fs_ugi
}
client = HDFSClient(hadoop_home, configs)
if pass_id == "-1":
dest = "%s/%s/base/dnn_plugin/" % (output_path, day)
else:
dest = "%s/%s/delta-%s/dnn_plugin/" % (output_path, day,
pass_id)
if not client.is_exist(dest):
client.makedirs(dest)
if os.path.isdir(model_name):
client.upload_dir(dest, model_name)
else:
client.upload(dest, model_name)
fleet._role_maker._barrier_worker()
def get_last_save_xbox_base(self,
output_path,
hadoop_fs_name,
hadoop_fs_ugi,
hadoop_home="$HADOOP_HOME"):
"""
get last saved base xbox info from xbox_base_done.txt
Args:
output_path(str): output path
hadoop_fs_name(str): hdfs/afs fs_name
hadoop_fs_ugi(str): hdfs/afs fs_ugi
hadoop_home(str): hadoop home, default is "$HADOOP_HOME"
Returns:
[last_save_day, last_path, xbox_base_key]
last_save_day(int): day of saved model
last_path(str): model path
xbox_base_key(int): xbox key
Examples:
.. code-block:: python
from paddle.fluid.incubate.fleet.utils.fleet_util import FleetUtil
fleet_util = FleetUtil()
last_save_day, last_path, xbox_base_key = \
fleet_util.get_last_save_xbox_base("hdfs:/my/path", 20190722,
88)
"""
donefile_path = output_path + "/xbox_base_done.txt"
configs = {
"fs.default.name": hadoop_fs_name,
"hadoop.job.ugi": hadoop_fs_ugi
}
client = HDFSClient(hadoop_home, configs)
if not client.is_file(donefile_path):
return [-1, -1, int(time.time())]
pre_content = client.cat(donefile_path)
last_dict = json.loads(pre_content.split("\n")[-1])
last_day = int(last_dict["input"].split("/")[-3])
last_path = "/".join(last_dict["input"].split("/")[:-1])
xbox_base_key = int(last_dict["key"])
return [last_day, last_path, xbox_base_key]
def get_last_save_xbox(self,
output_path,
hadoop_fs_name,
hadoop_fs_ugi,
hadoop_home="$HADOOP_HOME"):
"""
get last saved xbox info from xbox_patch_done.txt
Args:
output_path(str): output path
hadoop_fs_name(str): hdfs/afs fs_name
hadoop_fs_ugi(str): hdfs/afs fs_ugi
hadoop_home(str): hadoop home, default is "$HADOOP_HOME"
Returns:
[last_save_day, last_save_pass, last_path, xbox_base_key]
last_save_day(int): day of saved model
last_save_pass(int): pass id of saved
last_path(str): model path
xbox_base_key(int): xbox key
Examples:
.. code-block:: python
from paddle.fluid.incubate.fleet.utils.fleet_util import FleetUtil
fleet_util = FleetUtil()
last_save_day, last_save_pass, last_path, xbox_base_key = \
fleet_util.get_last_save_xbox("hdfs:/my/path", 20190722, 88)
"""
donefile_path = output_path + "/xbox_patch_done.txt"
configs = {
"fs.default.name": hadoop_fs_name,
"hadoop.job.ugi": hadoop_fs_ugi
}
client = HDFSClient(hadoop_home, configs)
if not client.is_file(donefile_path):
return [-1, -1, "", int(time.time())]
pre_content = client.cat(donefile_path)
last_dict = json.loads(pre_content.split("\n")[-1])
last_day = int(last_dict["input"].split("/")[-3])
last_pass = int(last_dict["input"].split("/")[-2].split("-")[-1])
last_path = "/".join(last_dict["input"].split("/")[:-1])
xbox_base_key = int(last_dict["key"])
return [last_day, last_pass, last_path, xbox_base_key]
def get_last_save_model(self,
output_path,
hadoop_fs_name,
hadoop_fs_ugi,
hadoop_home="$HADOOP_HOME"):
"""
get last saved model info from donefile.txt
Args:
output_path(str): output path
hadoop_fs_name(str): hdfs/afs fs_name
hadoop_fs_ugi(str): hdfs/afs fs_ugi
hadoop_home(str): hadoop home, default is "$HADOOP_HOME"
Returns:
[last_save_day, last_save_pass, last_path, xbox_base_key]
last_save_day(int): day of saved model
last_save_pass(int): pass id of saved
last_path(str): model path
xbox_base_key(int): xbox key
Examples:
.. code-block:: python
from paddle.fluid.incubate.fleet.utils.fleet_util import FleetUtil
fleet_util = FleetUtil()
last_save_day, last_save_pass, last_path, xbox_base_key = \
fleet_util.get_last_save_model("hdfs:/my/path", 20190722, 88)
"""
last_save_day = -1
last_save_pass = -1
last_path = ""
donefile_path = output_path + "/donefile.txt"
configs = {
"fs.default.name": hadoop_fs_name,
"hadoop.job.ugi": hadoop_fs_ugi
}
client = HDFSClient(hadoop_home, configs)
if not client.is_file(donefile_path):
return [-1, -1, "", int(time.time())]
content = client.cat(donefile_path)
content = content.split("\n")[-1].split("\t")
last_save_day = int(content[0])
last_save_pass = int(content[3])
last_path = content[2]
xbox_base_key = int(content[1])
return [last_save_day, last_save_pass, last_path, xbox_base_key]
def get_online_pass_interval(self, days, hours, split_interval,
split_per_pass, is_data_hourly_placed):
"""
get online pass interval
Args:
days(str): days to train
hours(str): hours to train
split_interval(int|str): split interval
split_per_pass(int}str): split per pass
is_data_hourly_placed(bool): is data hourly placed
Returns:
online_pass_interval(list)
Examples:
.. code-block:: python
from paddle.fluid.incubate.fleet.utils.fleet_util import FleetUtil
fleet_util = FleetUtil()
online_pass_interval = fleet_util.get_online_pass_interval(
days="{20190720..20190729}",
hours="{0..23}",
split_interval=5,
split_per_pass=2,
is_data_hourly_placed=False)
"""
days = os.popen("echo -n " + days).read().split(" ")
hours = os.popen("echo -n " + hours).read().split(" ")
split_interval = int(split_interval)
split_per_pass = int(split_per_pass)
splits_per_day = 24 * 60 / split_interval
pass_per_day = splits_per_day / split_per_pass
left_train_hour = int(hours[0])
right_train_hour = int(hours[-1])
start = 0
split_path = []
for i in range(splits_per_day):
h = start / 60
m = start % 60
if h < left_train_hour or h > right_train_hour:
start += split_interval
continue
if is_data_hourly_placed:
split_path.append("%02d" % h)
else:
split_path.append("%02d%02d" % (h, m))
start += split_interval
start = 0
online_pass_interval = []
for i in range(pass_per_day):
online_pass_interval.append([])
for j in range(start, start + split_per_pass):
online_pass_interval[i].append(split_path[j])
start += split_per_pass
return online_pass_interval
def get_global_metrics(self,
scope=fluid.global_scope(),
stat_pos_name="_generated_var_2",
stat_neg_name="_generated_var_3",
sqrerr_name="sqrerr",
abserr_name="abserr",
prob_name="prob",
q_name="q",
pos_ins_num_name="pos",
total_ins_num_name="total"):
"""
get global metrics, including auc, bucket_error, mae, rmse,
actual_ctr, predicted_ctr, copc, mean_predict_qvalue, total_ins_num.
Args:
scope(Scope): Scope object, default is fluid.global_scope()
stat_pos_name(str): name of auc pos bucket Variable
stat_neg_name(str): name of auc neg bucket Variable
sqrerr_name(str): name of sqrerr Variable
abserr_name(str): name of abserr Variable
prob_name(str): name of prob Variable
q_name(str): name of q Variable
pos_ins_num_name(str): name of pos ins num Variable
total_ins_num_name(str): name of total ins num Variable
Returns:
[auc, bucket_error, mae, rmse, actual_ctr, predicted_ctr, copc,
mean_predict_qvalue, total_ins_num]
Examples:
.. code-block:: python
from paddle.fluid.incubate.fleet.utils.fleet_util import FleetUtil
fleet_util = FleetUtil()
metric_list = fleet_util.get_global_metrics(myscope,
stat_pos.name,
stat_neg.name,
local_sqrerr.name,
local_abserr.name,
local_prob.name,
local_q.name,
local_pos_ins.name,
local_total_ins.name)
# below is part of example model
label = fluid.layers.data(name="click", shape=[-1, 1],\
dtype="int64", lod_level=0, append_batch_size=False)
emb = my_slot_net(slots, label) # emb can be fc layer of size 1
similarity_norm = fluid.layers.sigmoid(fluid.layers.clip(\
emb, min=-15.0, max=15.0), name="similarity_norm")\
binary_predict = fluid.layers.concat(input=[\
fluid.layers.elementwise_sub(\
fluid.layers.ceil(similarity_norm), similarity_norm),\
similarity_norm], axis=1)
auc, batch_auc, [batch_stat_pos, batch_stat_neg, stat_pos, \
stat_neg] = fluid.layers.auc(input=binary_predict,\
label=label, curve='ROC',\
num_thresholds=4096)
local_sqrerr, local_abserr, local_prob, local_q, local_pos_ins,\
local_total_ins = fluid.contrib.layers.ctr_metric_bundle(\
similarity_norm, label)
"""
if scope.find_var(stat_pos_name) is None or \
scope.find_var(stat_neg_name) is None:
self.rank0_print("not found auc bucket")
return [None] * 9
elif scope.find_var(sqrerr_name) is None:
self.rank0_print("not found sqrerr_name=%s" % sqrerr_name)
return [None] * 9
elif scope.find_var(abserr_name) is None:
self.rank0_print("not found abserr_name=%s" % abserr_name)
return [None] * 9
elif scope.find_var(prob_name) is None:
self.rank0_print("not found prob_name=%s" % prob_name)
return [None] * 9
elif scope.find_var(q_name) is None:
self.rank0_print("not found q_name=%s" % q_name)
return [None] * 9
elif scope.find_var(pos_ins_num_name) is None:
self.rank0_print("not found pos_ins_num_name=%s" % pos_ins_num_name)
return [None] * 9
elif scope.find_var(total_ins_num_name) is None:
self.rank0_print("not found total_ins_num_name=%s" % \
total_ins_num_name)
return [None] * 9
# barrier worker to ensure all workers finished training
fleet._role_maker._barrier_worker()
# get auc
auc = self.get_global_auc(scope, stat_pos_name, stat_neg_name)
pos = np.array(scope.find_var(stat_pos_name).get_tensor())
# auc pos bucket shape
old_pos_shape = np.array(pos.shape)
# reshape to one dim
pos = pos.reshape(-1)
global_pos = np.copy(pos) * 0
# mpi allreduce
fleet._role_maker._all_reduce(pos, global_pos)
# reshape to its original shape
global_pos = global_pos.reshape(old_pos_shape)
# auc neg bucket
neg = np.array(scope.find_var(stat_neg_name).get_tensor())
old_neg_shape = np.array(neg.shape)
neg = neg.reshape(-1)
global_neg = np.copy(neg) * 0
fleet._role_maker._all_reduce(neg, global_neg)
global_neg = global_neg.reshape(old_neg_shape)
num_bucket = len(global_pos[0])
def get_metric(name):
metric = np.array(scope.find_var(name).get_tensor())
old_metric_shape = np.array(metric.shape)
metric = metric.reshape(-1)
global_metric = np.copy(metric) * 0
fleet._role_maker._all_reduce(metric, global_metric)
global_metric = global_metric.reshape(old_metric_shape)
return global_metric[0]
global_sqrerr = get_metric(sqrerr_name)
global_abserr = get_metric(abserr_name)
global_prob = get_metric(prob_name)
global_q_value = get_metric(q_name)
# note: get ins_num from auc bucket is not actual value,
# so get it from metric op
pos_ins_num = get_metric(pos_ins_num_name)
total_ins_num = get_metric(total_ins_num_name)
neg_ins_num = total_ins_num - pos_ins_num
mae = global_abserr / total_ins_num
rmse = math.sqrt(global_sqrerr / total_ins_num)
return_actual_ctr = pos_ins_num / total_ins_num
predicted_ctr = global_prob / total_ins_num
mean_predict_qvalue = global_q_value / total_ins_num
copc = 0.0
if abs(predicted_ctr > 1e-6):
copc = return_actual_ctr / predicted_ctr
# calculate bucket error
last_ctr = -1.0
impression_sum = 0.0
ctr_sum = 0.0
click_sum = 0.0
error_sum = 0.0
error_count = 0.0
click = 0.0
show = 0.0
ctr = 0.0
adjust_ctr = 0.0
relative_error = 0.0
actual_ctr = 0.0
relative_ctr_error = 0.0
k_max_span = 0.01
k_relative_error_bound = 0.05
for i in xrange(num_bucket):
click = global_pos[0][i]
show = global_pos[0][i] + global_neg[0][i]
ctr = float(i) / num_bucket
if abs(ctr - last_ctr) > k_max_span:
last_ctr = ctr
impression_sum = 0.0
ctr_sum = 0.0
click_sum = 0.0
impression_sum += show
ctr_sum += ctr * show
click_sum += click
if impression_sum == 0:
continue
adjust_ctr = ctr_sum / impression_sum
if adjust_ctr == 0:
continue
relative_error = \
math.sqrt((1 - adjust_ctr) / (adjust_ctr * impression_sum))
if relative_error < k_relative_error_bound:
actual_ctr = click_sum / impression_sum
relative_ctr_error = abs(actual_ctr / adjust_ctr - 1)
error_sum += relative_ctr_error * impression_sum
error_count += impression_sum
last_ctr = -1
bucket_error = error_sum / error_count if error_count > 0 else 0.0
return [
auc, bucket_error, mae, rmse, return_actual_ctr, predicted_ctr,
copc, mean_predict_qvalue, int(total_ins_num)
]
def print_global_metrics(self,
scope=fluid.global_scope(),
stat_pos_name="_generated_var_2",
stat_neg_name="_generated_var_3",
sqrerr_name="sqrerr",
abserr_name="abserr",
prob_name="prob",
q_name="q",
pos_ins_num_name="pos",
total_ins_num_name="total",
print_prefix=""):
"""
print global metrics, including auc, bucket_error, mae, rmse,
actual_ctr, predicted_ctr, copc, mean_predict_qvalue, total_ins_num.
Args:
scope(Scope): Scope object, default is fluid.global_scope()
stat_pos_name(str): name of auc pos bucket Variable
stat_neg_name(str): name of auc neg bucket Variable
sqrerr_name(str): name of sqrerr Variable
abserr_name(str): name of abserr Variable
prob_name(str): name of prob Variable
q_name(str): name of q Variable
pos_ins_num_name(str): name of pos ins num Variable
total_ins_num_name(str): name of total ins num Variable
print_prefix(str): print prefix
Examples:
.. code-block:: python
from paddle.fluid.incubate.fleet.utils.fleet_util import FleetUtil
fleet_util = FleetUtil()
fleet_util.print_global_metrics(myscope,
stat_pos.name,
stat_neg.name,
local_sqrerr.name,
local_abserr.name,
local_prob.name,
local_q.name,
local_pos_ins.name,
local_total_ins.name)
# below is part of model
label = fluid.layers.data(name="click", shape=[-1, 1],\
dtype="int64", lod_level=0, append_batch_size=False)
emb = my_slot_net(slots, label) # emb can be fc layer of size 1
similarity_norm = fluid.layers.sigmoid(fluid.layers.clip(\
emb, min=-15.0, max=15.0), name="similarity_norm")\
binary_predict = fluid.layers.concat(input=[\
fluid.layers.elementwise_sub(\
fluid.layers.ceil(similarity_norm), similarity_norm),\
similarity_norm], axis=1)
auc, batch_auc, [batch_stat_pos, batch_stat_neg, stat_pos, \
stat_neg] = fluid.layers.auc(input=binary_predict,\
label=label, curve='ROC',\
num_thresholds=4096)
local_sqrerr, local_abserr, local_prob, local_q, local_pos_ins, \
local_total_ins = fluid.contrib.layers.ctr_metric_bundle(\
similarity_norm, label)
"""
if scope.find_var(stat_pos_name) is None or \
scope.find_var(stat_neg_name) is None:
self.rank0_print("not found auc bucket")
return
elif scope.find_var(sqrerr_name) is None:
self.rank0_print("not found sqrerr_name=%s" % sqrerr_name)
return
elif scope.find_var(abserr_name) is None:
self.rank0_print("not found abserr_name=%s" % abserr_name)
return
elif scope.find_var(prob_name) is None:
self.rank0_print("not found prob_name=%s" % prob_name)
return
elif scope.find_var(q_name) is None:
self.rank0_print("not found q_name=%s" % q_name)
return
elif scope.find_var(pos_ins_num_name) is None:
self.rank0_print("not found pos_ins_num_name=%s" % pos_ins_num_name)
return
elif scope.find_var(total_ins_num_name) is None:
self.rank0_print("not found total_ins_num_name=%s" % \
total_ins_num_name)
return
auc, bucket_error, mae, rmse, actual_ctr, predicted_ctr, copc,\
mean_predict_qvalue, total_ins_num = self.get_global_metrics(\
scope, stat_pos_name, stat_neg_name, sqrerr_name, abserr_name,\
prob_name, q_name, pos_ins_num_name, total_ins_num_name)
self.rank0_print("%s global AUC=%.6f BUCKET_ERROR=%.6f MAE=%.6f "
"RMSE=%.6f Actural_CTR=%.6f Predicted_CTR=%.6f "
"COPC=%.6f MEAN Q_VALUE=%.6f Ins number=%s" %
(print_prefix, auc, bucket_error, mae, rmse,
actual_ctr, predicted_ctr, copc, mean_predict_qvalue,
total_ins_num))
def program_type_trans(self, prog_dir, prog_fn, is_text):
return utils.program_type_trans(prog_dir, prog_fn, is_text)
def draw_from_program_file(self, model_filename, is_text, output_dir,
output_filename):
"""draw program from file"""
program = utils.load_program(model_filename, is_text)
utils.graphviz(program.global_block(), output_dir, output_filename)
def draw_from_program(self, program, output_dir, output_name):
"""draw Program"""
utils.graphviz(program.global_block(), output_dir, output_name)
def check_two_programs(self, config):
train_prog = utils.load_program(config.train_prog_path,
config.is_text_train_program)
pruned_prog = utils.load_program(config.pruned_prog_path,
config.is_text_pruned_program)
if config.draw:
pruned_dir = os.path.dirname(config.pruned_prog_path)
self.draw_from_program(pruned_prog, pruned_dir,
config.draw_out_name)
res = utils.check_pruned_program_vars(train_prog, pruned_prog)
if res:
_logger.info("check_programs succeed.")
else:
_logger.info(
"check_programs failed. pruned program and train program not match!"
)
return res
def check_vars_and_dump(self, config):
_logger.info("start check_vars_and_dump.")
results = utils.check_saved_vars_try_dump(
config.dump_model_dir, config.dump_program_filename,
config.is_text_dump_program, config.feed_config,
config.fetch_config, config.batch_size, config.save_params_filename)
_logger.info("check_vars_and_dump succeed.")
return results
def parse_program_proto(self, prog_path, is_text, output_dir):
"""
Parse program.proto into a more readable format.
This function will generate three files:
output_dir/vars_all.log,
output_dir/vars_persistable.log,
output_dir/ops.log.
Args:
prog_path(str): proto file path to be parsed.
is_text(bool): proto file is human-readale format or not(binary).
output_dir(str): output dir.
Examples:
.. code-block:: python
from paddle.fluid.incubate.fleet.utils.fleet_util import FleetUtil
fleet_util = FleetUtil()
program_path = "./program.pbtxt"
is_text = True
output_dir = "/tmp/"
fleet_util.parse_program_proto(program_path, is_text, output_dir)
"""
program = utils.load_program(prog_path, is_text)
utils.parse_program(program, output_dir)
def split_program_by_device(self, program):
ops_list = []
type_list = []
pre = None
type_cpu = "cpu"
for op in program.global_block().ops:
if op.has_attr("op_device"):
if pre is None or pre != op.attr("op_device"):
ops_list.append([])
type_list.append(
op.attr("op_device")
if op.attr("op_device") != "" else type_cpu)
ops_list[-1].append(op)
pre = op.attr("op_device")
l = len(type_list)
i = 0
type_heter = None
while i < l:
while i < l and type_list[i] == type_cpu:
i += 1
if i == l:
break
type_heter = type_list[i]
i += 1
start = i
valid = True
while i < l and type_list[i] != type_heter:
if type_list[i] != type_cpu:
valid = False
break
i += 1
if i == l:
break
elif not valid:
continue
for j in range(start, i):
for op in ops_list[j]:
op._set_attr("op_device", type_heter)
type_list[j] = type_heter
j += 1
pre = None
merged_ops_list = []
merged_type_list = []
for i in range(l):
if pre is None or pre != type_list[i]:
merged_ops_list.append([])
merged_type_list.append(type_list[i])
merged_ops_list[-1].extend(ops_list[i])
pre = type_list[i]
data_vars = set()
for k in program.global_block().vars:
var = program.global_block().var(k)
if not var.persistable:
data_vars.add(var.name)
l = len(merged_ops_list)
inputs_pre = set()
outputs_pre = set()
in_from_pre = [[] for i in range(l)]
for i in range(l):
inputs = set()
outputs = set()
for op in merged_ops_list[i]:
for input in op.input_names:
for tmp in op.input(input):
if tmp not in outputs:
inputs.add(tmp)
for output in op.output_names:
for tmp in op.output(output):
outputs.add(tmp)
if i == 0:
in_from_pre[i] = []
elif i == 1:
in_from_pre[i] = (outputs_pre | data_vars) & inputs
else:
in_from_pre[i] = outputs_pre & inputs
inputs_pre = copy.deepcopy(inputs)
outputs_pre = copy.deepcopy(outputs)
l = len(in_from_pre)
start_list = []
end_list = []
send_list = [[] for i in range(l)]
sum = 0
program_list = []
for i in range(l):
start_list.append(sum)
end_list.append(sum + len(merged_ops_list[i]) - 1)
sum += len(merged_ops_list[i])
if i < l - 1:
send_list[i].extend(list(in_from_pre[i + 1]))
prog = program.clone()
if merged_type_list[i] != type_cpu:
prog = prog._prune_with_input(
list(in_from_pre[i]), list(send_list[i]))
program_list.append(prog)
else:
program_list.append(prog)
recv_list = [list(i) for i in in_from_pre]
found = False
heter_index = None
for i in range(len(merged_type_list)):
t = merged_type_list[i]
if t != type_cpu:
if found:
print("only one region of program can be heter")
found = True
heter_index = i
if heter_index is None:
print("warning: non heter program")
return None
else:
return [start_list[heter_index], end_list[heter_index], send_list[heter_index], \
recv_list[heter_index], program_list[heter_index]]