|
|
|
@ -31,14 +31,15 @@ typedef Queue<int> PidQueue;
|
|
|
|
|
typedef std::unique_ptr<TrainerThread> TrainerThreadPtr;
|
|
|
|
|
|
|
|
|
|
struct GradBuffer {
|
|
|
|
|
// GradBuffer is used for gathering gradient for GPU parameters
|
|
|
|
|
/// GradBuffer is used for gathering gradient for GPU parameters
|
|
|
|
|
int paramId;
|
|
|
|
|
|
|
|
|
|
// sem is used to notify that the local gradient merge of the current thread
|
|
|
|
|
// finished for the current thread.
|
|
|
|
|
/// sem is used to notify that the local gradient merge of the current thread
|
|
|
|
|
/// finished for the current thread.
|
|
|
|
|
Semaphore sem;
|
|
|
|
|
|
|
|
|
|
std::vector<VectorPtr> bufs; // bufs[mergeIndex]
|
|
|
|
|
// bufs[mergeIndex]
|
|
|
|
|
std::vector<VectorPtr> bufs;
|
|
|
|
|
};
|
|
|
|
|
|
|
|
|
|
/**
|
|
|
|
@ -189,14 +190,14 @@ public:
|
|
|
|
|
return useGpu_;
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
// @return whether to pass the gradients in outArgs_ to each threads.
|
|
|
|
|
/// @return whether to pass the gradients in outArgs_ to each threads.
|
|
|
|
|
bool isPassGrad() { return isPassGrad_; }
|
|
|
|
|
|
|
|
|
|
// @brief set whether to pass the gradient in outArgs_ to each threads.
|
|
|
|
|
/// @brief set whether to pass the gradient in outArgs_ to each threads.
|
|
|
|
|
void setPassGrad(bool isPass) { isPassGrad_ = isPass; }
|
|
|
|
|
|
|
|
|
|
// Set the gradients of the outputs.
|
|
|
|
|
// The gradietns will be copied to each thread in the computing threads.
|
|
|
|
|
/// Set the gradients of the outputs.
|
|
|
|
|
/// The gradietns will be copied to each thread in the computing threads.
|
|
|
|
|
virtual void setOutputGrad(const std::vector<Argument>& args);
|
|
|
|
|
|
|
|
|
|
protected:
|
|
|
|
@ -205,8 +206,8 @@ protected:
|
|
|
|
|
std::vector<TrainerThreadPtr>& getAllThreads() {
|
|
|
|
|
return threads_;
|
|
|
|
|
}
|
|
|
|
|
// Calculate the real device id based on the logical device id and the
|
|
|
|
|
// thread id.
|
|
|
|
|
/// Calculate the real device id based on the logical device id and the
|
|
|
|
|
/// thread id.
|
|
|
|
|
int logicalDeviceId2RealDeviceId(int logicalId, int threadId = 0) const {
|
|
|
|
|
if (logicalId == -1) {
|
|
|
|
|
logicalId = 0;
|
|
|
|
@ -215,8 +216,8 @@ protected:
|
|
|
|
|
numDevices_);
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
// Calculate the logical device id based on the real device id and the
|
|
|
|
|
// thread id.
|
|
|
|
|
/// Calculate the logical device id based on the real device id and the
|
|
|
|
|
/// thread id.
|
|
|
|
|
int realDeviceId2LogicalDeviceId(int realId, int threadId = 0) const {
|
|
|
|
|
if (realId == -1) {
|
|
|
|
|
return 0;
|
|
|
|
@ -232,15 +233,15 @@ protected:
|
|
|
|
|
return hasNonstaticCpuParamters_;
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
// Called TrainerThread to wait before merging CPU parameter gradients.
|
|
|
|
|
/// Called TrainerThread to wait before merging CPU parameter gradients.
|
|
|
|
|
void waitBeforeMerge() { trainerBarrier_.wait(); }
|
|
|
|
|
|
|
|
|
|
// called by MultiGradientMachine and TrainerThread to wait after merging
|
|
|
|
|
// CPU parameter graidents.
|
|
|
|
|
/// called by MultiGradientMachine and TrainerThread to wait after merging
|
|
|
|
|
/// CPU parameter graidents.
|
|
|
|
|
void waitAfterMerge() { allBarrier_.wait(); }
|
|
|
|
|
|
|
|
|
|
// called by MultiGradientMachine and TrainerThread to wait for copyInArgs()
|
|
|
|
|
// finishing
|
|
|
|
|
/// called by MultiGradientMachine and TrainerThread to wait for copyInArgs()
|
|
|
|
|
/// finishing
|
|
|
|
|
void waitForCopyInArgs() { allBarrier_.wait(); }
|
|
|
|
|
|
|
|
|
|
TrainerThreadPtr& getThread(int threadId) {
|
|
|
|
@ -255,8 +256,8 @@ protected:
|
|
|
|
|
return passType_;
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
// Called by TrainerThread to notify MultiGradientMachine that the gradient
|
|
|
|
|
// for paramId is ready
|
|
|
|
|
/// Called by TrainerThread to notify MultiGradientMachine that the gradient
|
|
|
|
|
/// for paramId is ready
|
|
|
|
|
void notifyGradientTransfer(int paramId);
|
|
|
|
|
|
|
|
|
|
const std::vector<Argument>& getInArgs() {
|
|
|
|
@ -297,7 +298,7 @@ protected:
|
|
|
|
|
virtual void backwardImp(
|
|
|
|
|
const UpdateCallback& callback = NULL);
|
|
|
|
|
|
|
|
|
|
// update all parameters
|
|
|
|
|
/// update all parameters
|
|
|
|
|
void updateThreadParameters();
|
|
|
|
|
|
|
|
|
|
void startTask(TaskType taskType);
|
|
|
|
@ -311,7 +312,7 @@ protected:
|
|
|
|
|
|
|
|
|
|
bool hasNonstaticCpuParamters_;
|
|
|
|
|
|
|
|
|
|
// store main parameter only
|
|
|
|
|
/// store main parameter only
|
|
|
|
|
std::unique_ptr<GradientMachine> gradientMachine_;
|
|
|
|
|
|
|
|
|
|
std::vector<TrainerThreadPtr> threads_;
|
|
|
|
@ -326,7 +327,7 @@ protected:
|
|
|
|
|
std::vector<Argument> outArgs_;
|
|
|
|
|
hl_stream_t outArgStream_;
|
|
|
|
|
|
|
|
|
|
// ParameterType which needs to be merged from each GPU
|
|
|
|
|
/// ParameterType which needs to be merged from each GPU
|
|
|
|
|
std::vector<ParameterType> mergeTypes_;
|
|
|
|
|
int numDevices_; /* number of gpu devices */
|
|
|
|
|
int numLogicalDevices_; // number of GPU used by one NN
|
|
|
|
@ -334,16 +335,16 @@ protected:
|
|
|
|
|
|
|
|
|
|
UpdateCallback backwardCallback_;
|
|
|
|
|
|
|
|
|
|
// barrrier for threads_
|
|
|
|
|
/// barrrier for threads_
|
|
|
|
|
ThreadBarrier trainerBarrier_;
|
|
|
|
|
|
|
|
|
|
// barrier for both MultiGradientMachine and threds_
|
|
|
|
|
/// barrier for both MultiGradientMachine and threds_
|
|
|
|
|
ThreadBarrier allBarrier_;
|
|
|
|
|
|
|
|
|
|
// indicate whether inArgs is copied before forward()
|
|
|
|
|
/// indicate whether inArgs is copied before forward()
|
|
|
|
|
bool inArgsCopied_;
|
|
|
|
|
|
|
|
|
|
// Whether to copy the gradient back from an external input.
|
|
|
|
|
/// Whether to copy the gradient back from an external input.
|
|
|
|
|
bool isPassGrad_;
|
|
|
|
|
};
|
|
|
|
|
|
|
|
|
@ -413,7 +414,7 @@ public:
|
|
|
|
|
|
|
|
|
|
void prefetch();
|
|
|
|
|
|
|
|
|
|
// copy the output gradient from the main GradientMachine.
|
|
|
|
|
/// copy the output gradient from the main GradientMachine.
|
|
|
|
|
void copyOutputGrad();
|
|
|
|
|
|
|
|
|
|
protected:
|
|
|
|
@ -441,51 +442,60 @@ protected:
|
|
|
|
|
void backward();
|
|
|
|
|
void backwardCallback(Parameter* para);
|
|
|
|
|
|
|
|
|
|
// call the actuall callback supplied by the caller of
|
|
|
|
|
// GradientMachine::backward
|
|
|
|
|
/// call the actuall callback supplied by the caller of
|
|
|
|
|
/// GradientMachine::backward
|
|
|
|
|
void doCallback(int pid);
|
|
|
|
|
|
|
|
|
|
protected:
|
|
|
|
|
MultiGradientMachine* multiMachine_;
|
|
|
|
|
ModelConfig config_;
|
|
|
|
|
bool stopping_; // whether the thread should stop
|
|
|
|
|
int partnerId_; // the threads form which to collect gradient
|
|
|
|
|
int threadId_; // from 0 to #threads-1
|
|
|
|
|
/// whether the thread should stop
|
|
|
|
|
bool stopping_;
|
|
|
|
|
/// the threads form which to collect gradient
|
|
|
|
|
int partnerId_;
|
|
|
|
|
/// from 0 to threads-1
|
|
|
|
|
int threadId_;
|
|
|
|
|
int deviceId_;
|
|
|
|
|
std::unique_ptr<GradientMachine> gradientMachine_;
|
|
|
|
|
std::vector<ParameterPtr> parameters_;
|
|
|
|
|
|
|
|
|
|
// ParameterType which needs to be merged from each GPU
|
|
|
|
|
/// ParameterType which needs to be merged from each GPU
|
|
|
|
|
std::vector<ParameterType> mergeTypes_;
|
|
|
|
|
|
|
|
|
|
std::unique_ptr<std::thread> computeThread_; // compute thread
|
|
|
|
|
/// compute thread
|
|
|
|
|
std::unique_ptr<std::thread> computeThread_;
|
|
|
|
|
std::vector<Argument> inArgs_;
|
|
|
|
|
std::vector<Argument> outArgs_;
|
|
|
|
|
Semaphore taskReadySem_;
|
|
|
|
|
Semaphore outArgsReadySem_;
|
|
|
|
|
|
|
|
|
|
std::unique_ptr<std::thread> copyThread_; // copy thread
|
|
|
|
|
PidQueue gradBufQueue_; // queue of gradient needs to be copied to partner
|
|
|
|
|
/// copy thread
|
|
|
|
|
std::unique_ptr<std::thread> copyThread_;
|
|
|
|
|
/// queue of gradient needs to be copied to partner
|
|
|
|
|
PidQueue gradBufQueue_;
|
|
|
|
|
hl_stream_t gradStream_;
|
|
|
|
|
|
|
|
|
|
std::unique_ptr<std::thread> gradCollectThread_; // grad merge thread
|
|
|
|
|
// queue of gradient needs to be merged with gradient coopied by
|
|
|
|
|
// copyGradToBufferThread
|
|
|
|
|
/// grad merge thread
|
|
|
|
|
std::unique_ptr<std::thread> gradCollectThread_;
|
|
|
|
|
/// queue of gradient needs to be merged with gradient coopied by
|
|
|
|
|
/// copyGradToBufferThread
|
|
|
|
|
PidQueue gradQueue_;
|
|
|
|
|
UpdateCallback backwardCallback_;
|
|
|
|
|
|
|
|
|
|
std::unique_ptr<std::thread> valueDispatchThread_; // value dispatch thread
|
|
|
|
|
// queue of the parameter whose the vale are ready for copy
|
|
|
|
|
/// value dispatch thread
|
|
|
|
|
std::unique_ptr<std::thread> valueDispatchThread_;
|
|
|
|
|
/// queue of the parameter whose the vale are ready for copy
|
|
|
|
|
PidQueue valueReadyQueue_;
|
|
|
|
|
|
|
|
|
|
// used to notify all the parameter values are ready
|
|
|
|
|
/// used to notify all the parameter values are ready
|
|
|
|
|
LockedCondition valueReadyCond_;
|
|
|
|
|
|
|
|
|
|
hl_stream_t valueStream_;
|
|
|
|
|
std::atomic<int> updateCounter_; // how many parameters are updated
|
|
|
|
|
/// how many parameters are updated
|
|
|
|
|
std::atomic<int> updateCounter_;
|
|
|
|
|
bool parameterUpdated_;
|
|
|
|
|
|
|
|
|
|
// indicate whether inArgs is copied before forward()
|
|
|
|
|
/// indicate whether inArgs is copied before forward()
|
|
|
|
|
bool inArgsCopied_;
|
|
|
|
|
};
|
|
|
|
|
|
|
|
|
|