You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
Paddle/python/paddle/fluid/tests/demo/pipeline_train.py

509 lines
17 KiB

# Copyright (c) 2019 PaddlePaddle Authors. All Rights Reserve.
#
#Licensed under the Apache License, Version 2.0 (the "License");
#you may not use this file except in compliance with the License.
#You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
#Unless required by applicable law or agreed to in writing, software
#distributed under the License is distributed on an "AS IS" BASIS,
#WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
#See the License for the specific language governing permissions and
#limitations under the License.
import numpy as np
import copy
import pickle
import os
from functools import partial
import logging
import time
import paddle
import paddle.fluid as fluid
import paddle.fluid.layers as layers
import argparse
import random
import sys
import math
logging.basicConfig(format='%(asctime)s - %(levelname)s - %(message)s')
logger = logging.getLogger("fluid")
logger.setLevel(logging.INFO)
batch_size = 100
ncards = 4
nreaders = 4
nscopes = 30
learning_rate = 0.1
is_profile = False
sync_steps = 1
def parse_args():
parser = argparse.ArgumentParser("gnn")
parser.add_argument(
'--train_path',
type=str,
default='./data/diginetica/train.txt',
help='dir of training data')
parser.add_argument(
'--config_path',
type=str,
default='./data/diginetica/config.txt',
help='dir of config')
parser.add_argument(
'--model_path',
type=str,
default='./saved_model',
help="path of model parameters")
parser.add_argument(
'--epoch_num',
type=int,
default=30,
help='number of epochs to train for')
parser.add_argument(
'--batch_size', type=int, default=100, help='input batch size')
parser.add_argument(
'--hidden_size', type=int, default=100, help='hidden state size')
parser.add_argument('--l2', type=float, default=1e-5, help='l2 penalty')
parser.add_argument('--lr', type=float, default=0.001, help='learning rate')
parser.add_argument(
'--emb_lr_rate', type=float, default=0.5, help='learning rate')
parser.add_argument(
'--step', type=int, default=1, help='gnn propogation steps')
parser.add_argument(
'--lr_dc', type=float, default=0.1, help='learning rate decay rate')
parser.add_argument(
'--lr_dc_step',
type=int,
default=3,
help='the number of steps after which the learning rate decay')
parser.add_argument(
'--use_cuda', type=int, default=0, help='whether to use gpu')
parser.add_argument(
'--use_parallel',
type=int,
default=1,
help='whether to use parallel executor')
return parser.parse_args()
def network(batch_size, items_num, hidden_size, step, rate):
stdv = 1.0 / math.sqrt(hidden_size)
items = layers.data(
name="items",
shape=[batch_size, -1, 1],
dtype="int64",
append_batch_size=False) #[bs, uniq_max, 1]
seq_index = layers.data(
name="seq_index",
shape=[batch_size, -1],
dtype="int64",
append_batch_size=False) #[-1(seq_max)*batch_size, 1]
last_index = layers.data(
name="last_index",
shape=[batch_size],
dtype="int64",
append_batch_size=False) #[batch_size, 1]
adj_in = layers.data(
name="adj_in",
shape=[batch_size, -1, -1],
dtype="float32",
append_batch_size=False)
adj_out = layers.data(
name="adj_out",
shape=[batch_size, -1, -1],
dtype="float32",
append_batch_size=False)
mask = layers.data(
name="mask",
shape=[batch_size, -1, 1],
dtype="float32",
append_batch_size=False)
label = layers.data(
name="label",
shape=[batch_size, 1],
dtype="int64",
append_batch_size=False)
items_emb = layers.embedding(
input=items,
is_sparse=True,
param_attr=fluid.ParamAttr(
name="emb",
learning_rate=rate,
initializer=fluid.initializer.Uniform(
low=-stdv, high=stdv)),
size=[items_num, hidden_size]) #[batch_size, uniq_max, h]
data_feed = [items, seq_index, last_index, adj_in, adj_out, mask, label]
pre_state = items_emb
for i in range(step):
pre_state = layers.reshape(
x=pre_state, shape=[batch_size, -1, hidden_size])
state_in = layers.fc(
input=pre_state,
name="state_in",
size=hidden_size,
act=None,
num_flatten_dims=2,
param_attr=fluid.ParamAttr(initializer=fluid.initializer.Uniform(
low=-stdv, high=stdv)),
bias_attr=fluid.ParamAttr(initializer=fluid.initializer.Uniform(
low=-stdv, high=stdv))) #[batch_size, uniq_max, h]
state_out = layers.fc(
input=pre_state,
name="state_out",
size=hidden_size,
act=None,
num_flatten_dims=2,
param_attr=fluid.ParamAttr(initializer=fluid.initializer.Uniform(
low=-stdv, high=stdv)),
bias_attr=fluid.ParamAttr(initializer=fluid.initializer.Uniform(
low=-stdv, high=stdv))) #[batch_size, uniq_max, h]
state_adj_in = layers.matmul(adj_in,
state_in) #[batch_size, uniq_max, h]
state_adj_out = layers.matmul(adj_out,
state_out) #[batch_size, uniq_max, h]
gru_input = layers.concat([state_adj_in, state_adj_out], axis=2)
gru_input = layers.reshape(x=gru_input, shape=[-1, hidden_size * 2])
gru_fc = layers.fc(input=gru_input,
name="gru_fc",
size=3 * hidden_size,
bias_attr=False)
pre_state, _, _ = fluid.layers.gru_unit(
input=gru_fc,
hidden=layers.reshape(
x=pre_state, shape=[-1, hidden_size]),
size=3 * hidden_size)
final_state = pre_state
seq_index = layers.reshape(seq_index, shape=[-1])
seq = layers.gather(final_state, seq_index) #[batch_size*-1(seq_max), h]
last = layers.gather(final_state, last_index) #[batch_size, h]
seq = layers.reshape(
seq, shape=[batch_size, -1, hidden_size]) #[batch_size, -1(seq_max), h]
last = layers.reshape(
last, shape=[batch_size, hidden_size]) #[batch_size, h]
seq_fc = layers.fc(
input=seq,
name="seq_fc",
size=hidden_size,
bias_attr=False,
act=None,
num_flatten_dims=2,
param_attr=fluid.ParamAttr(initializer=fluid.initializer.Uniform(
low=-stdv, high=stdv))) #[batch_size, -1(seq_max), h]
last_fc = layers.fc(input=last,
name="last_fc",
size=hidden_size,
bias_attr=False,
act=None,
num_flatten_dims=1,
param_attr=fluid.ParamAttr(
initializer=fluid.initializer.Uniform(
low=-stdv, high=stdv))) #[bathc_size, h]
seq_fc_t = layers.transpose(
seq_fc, perm=[1, 0, 2]) #[-1(seq_max), batch_size, h]
add = layers.elementwise_add(seq_fc_t,
last_fc) #[-1(seq_max), batch_size, h]
b = layers.create_parameter(
shape=[hidden_size],
dtype='float32',
default_initializer=fluid.initializer.Constant(value=0.0)) #[h]
add = layers.elementwise_add(add, b) #[-1(seq_max), batch_size, h]
add_sigmoid = layers.sigmoid(add) #[-1(seq_max), batch_size, h]
add_sigmoid = layers.transpose(
add_sigmoid, perm=[1, 0, 2]) #[batch_size, -1(seq_max), h]
weight = layers.fc(input=add_sigmoid,
name="weight_fc",
size=1,
act=None,
num_flatten_dims=2,
bias_attr=False,
param_attr=fluid.ParamAttr(
initializer=fluid.initializer.Uniform(
low=-stdv, high=stdv))) #[batch_size, -1, 1]
weight *= mask
weight_mask = layers.elementwise_mul(seq, weight, axis=0)
global_attention = layers.reduce_sum(weight_mask, dim=1)
final_attention = layers.concat(
[global_attention, last], axis=1) #[batch_size, 2*h]
final_attention_fc = layers.fc(
input=final_attention,
name="fina_attention_fc",
size=hidden_size,
bias_attr=False,
act=None,
param_attr=fluid.ParamAttr(initializer=fluid.initializer.Uniform(
low=-stdv, high=stdv))) #[batch_size, h]
all_vocab = layers.create_global_var(
shape=[items_num - 1, 1],
value=0,
dtype="int64",
persistable=True,
name="all_vocab")
all_emb = layers.embedding(
input=all_vocab,
is_sparse=True,
param_attr=fluid.ParamAttr(
name="emb",
learning_rate=rate,
initializer=fluid.initializer.Uniform(
low=-stdv, high=stdv)),
size=[items_num, hidden_size]) #[all_vocab, h]
logits = layers.matmul(
x=final_attention_fc, y=all_emb,
transpose_y=True) #[batch_size, all_vocab]
softmax = layers.softmax_with_cross_entropy(
logits=logits, label=label) #[batch_size, 1]
loss = layers.reduce_mean(softmax) # [1]
#fluid.layers.Print(loss)
acc = layers.accuracy(input=logits, label=label, k=20)
return loss, acc, data_feed, [items_emb, all_emb]
def train():
args = parse_args()
lr = args.lr
rate = args.emb_lr_rate
train_data_dir = "./gnn_data_new_8"
filelist = [
os.path.join(train_data_dir, f) for f in os.listdir(train_data_dir)
if os.path.isfile(os.path.join(train_data_dir, f))
][:]
items_num = read_config(args.config_path)
loss, acc, data_vars, cut_list = network(batch_size, items_num,
args.hidden_size, args.step, rate)
print("card: %d, thread: %d, lr: %f, lr_rate: %f, scope: %d, sync_step: %d"
% (ncards, nreaders, lr, rate, nscopes, sync_steps))
place = fluid.CPUPlace()
exe = fluid.Executor(place)
step_per_epoch = 750000 // batch_size
"""
opt = fluid.optimizer.SGD(
learning_rate=fluid.layers.exponential_decay(
learning_rate=args.lr,
decay_steps=step_per_epoch * 10,
decay_rate=args.lr_dc),
regularization=fluid.regularizer.L2DecayRegularizer(regularization_coeff=args.l2))
"""
opt = fluid.optimizer.SGD(lr)
opt = fluid.optimizer.PipelineOptimizer(
opt,
cut_list=[cut_list, [loss, acc]],
place_list=[fluid.CPUPlace(), fluid.CUDAPlace(0), fluid.CPUPlace()],
concurrency_list=[1, 1, nreaders],
queue_size=nscopes,
sync_steps=sync_steps)
opt.minimize(loss)
exe.run(fluid.default_startup_program())
all_vocab = fluid.global_scope().var("all_vocab").get_tensor()
all_vocab.set(
np.arange(1, items_num).astype("int64").reshape((-1, 1)), place)
logger.info("begin train")
dataset = fluid.DatasetFactory().create_dataset("FileInstantDataset")
dataset.set_use_var(data_vars)
dataset.set_batch_size(batch_size)
dataset.set_filelist(filelist)
total_time = []
start_time = time.time()
loss_sum = 0.0
acc_sum = 0.0
global_step = 0
for i in range(25):
logger.info("begin epoch %d" % (i))
epoch_sum = []
random.shuffle(filelist)
dataset.set_filelist(filelist)
exe.train_from_dataset(
fluid.default_main_program(),
dataset,
thread=ncards,
debug=is_profile,
fetch_list=[loss, acc],
fetch_info=["loss", "acc"],
print_period=1)
model_path = args.model_path
model_path += "_" + str(lr) + "_" + str(rate)
save_dir = model_path + "/epoch_" + str(i)
fetch_vars = [loss, acc]
feed_list = [
"items", "seq_index", "last_index", "adj_in", "adj_out", "mask",
"label"
]
fluid.io.save_inference_model(save_dir, feed_list, fetch_vars, exe)
class Data():
def __init__(self, path, shuffle=False):
data = pickle.load(open(path, 'rb'))
self.shuffle = shuffle
self.length = len(data[0])
self.input = list(zip(data[0], data[1]))
def make_data(self, cur_batch, batch_size):
cur_batch = [list(e) for e in cur_batch]
max_seq_len = 0
for e in cur_batch:
max_seq_len = max(max_seq_len, len(e[0]))
last_id = []
for e in cur_batch:
last_id.append(len(e[0]) - 1)
e[0] += [0] * (max_seq_len - len(e[0]))
max_uniq_len = 0
for e in cur_batch:
max_uniq_len = max(max_uniq_len, len(np.unique(e[0])))
items, adj_in, adj_out, seq_index, last_index = [], [], [], [], []
mask, label = [], []
id = 0
for e in cur_batch:
node = np.unique(e[0])
items.append(node.tolist() + (max_uniq_len - len(node)) * [0])
adj = np.zeros((max_uniq_len, max_uniq_len))
for i in np.arange(len(e[0]) - 1):
if e[0][i + 1] == 0:
break
u = np.where(node == e[0][i])[0][0]
v = np.where(node == e[0][i + 1])[0][0]
adj[u][v] = 1
u_deg_in = np.sum(adj, 0)
u_deg_in[np.where(u_deg_in == 0)] = 1
adj_in.append(np.divide(adj, u_deg_in).transpose())
u_deg_out = np.sum(adj, 1)
u_deg_out[np.where(u_deg_out == 0)] = 1
adj_out.append(np.divide(adj.transpose(), u_deg_out).transpose())
seq_index.append(
[np.where(node == i)[0][0] + id * max_uniq_len for i in e[0]])
last_index.append(
np.where(node == e[0][last_id[id]])[0][0] + id * max_uniq_len)
label.append(e[1] - 1)
mask.append([[1] * (last_id[id] + 1) + [0] *
(max_seq_len - last_id[id] - 1)])
id += 1
items = np.array(items).astype("uint64").reshape((batch_size, -1, 1))
seq_index = np.array(seq_index).astype("uint64").reshape(
(batch_size, -1))
last_index = np.array(last_index).astype("uint64").reshape(
(batch_size, 1))
adj_in = np.array(adj_in).astype("float32").reshape(
(batch_size, max_uniq_len, max_uniq_len))
adj_out = np.array(adj_out).astype("float32").reshape(
(batch_size, max_uniq_len, max_uniq_len))
mask = np.array(mask).astype("float32").reshape((batch_size, -1, 1))
label = np.array(label).astype("uint64").reshape((batch_size, 1))
return list(
zip(items, seq_index, last_index, adj_in, adj_out, mask, label))
def reader(self, batch_size, batch_group_size, train=True):
if self.shuffle:
random.shuffle(self.input)
group_remain = self.length % batch_group_size
for bg_id in range(0, self.length - group_remain, batch_group_size):
cur_bg = copy.deepcopy(self.input[bg_id:bg_id + batch_group_size])
if train:
cur_bg = sorted(cur_bg, key=lambda x: len(x[0]), reverse=True)
for i in range(0, batch_group_size, batch_size):
cur_batch = cur_bg[i:i + batch_size]
yield self.make_data(cur_batch, batch_size)
#deal with the remaining, discard at most batch_size data
if group_remain < batch_size:
return
remain_data = copy.deepcopy(self.input[-group_remain:])
if train:
remain_data = sorted(
remain_data, key=lambda x: len(x[0]), reverse=True)
for i in range(0, batch_group_size, batch_size):
if i + batch_size <= len(remain_data):
cur_batch = remain_data[i:i + batch_size]
yield self.make_data(cur_batch, batch_size)
def read_config(path):
with open(path, "r") as fin:
item_num = int(fin.readline())
return item_num
induce_map = {0: [0], 1: [0], 2: [], 3: [0, 1], 4: [0, 1], 5: [0], 6: []}
def binary_print(slot, fout, index):
shape_array = slot.shape
num = 1
for e in shape_array:
num *= e
num += len(induce_map[index])
num = np.uint16(num)
num.tofile(fout)
for e in induce_map[index]:
tmp_shape = np.uint64(shape_array[e])
tmp_shape.tofile(fout)
slot.tofile(fout)
def make_binary_data():
data_reader = Data('./data/diginetica/train.txt', True)
index = 0
id = -1
filename = None
fout = None
binary = True
for data in data_reader.reader(batch_size, 20 * batch_size, True):
if index % (batch_size * 900) == 0:
id += 1
if not binary:
filename = "./gnn_data_text/" + str(id)
else:
filename = "./gnn_data_new_8/" + str(id)
print("filename: " + filename)
if fout:
fout.close()
fout = open(filename, "wb" if binary else "w")
for ins in data:
for i, slot in enumerate(ins):
if binary:
binary_print(slot, fout, i)
else:
text_print(slot, fout, i)
index += batch_size
if __name__ == "__main__":
make_binary_data()
train()