You can not select more than 25 topics
			Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
		
		
		
		
		
			
		
			
				
					
					
						
							417 lines
						
					
					
						
							14 KiB
						
					
					
				
			
		
		
	
	
							417 lines
						
					
					
						
							14 KiB
						
					
					
				| /* Copyright (c) 2016 PaddlePaddle Authors. All Rights Reserve.
 | |
| 
 | |
| Licensed under the Apache License, Version 2.0 (the "License");
 | |
| you may not use this file except in compliance with the License.
 | |
| You may obtain a copy of the License at
 | |
| 
 | |
|     http://www.apache.org/licenses/LICENSE-2.0
 | |
| 
 | |
| Unless required by applicable law or agreed to in writing, software
 | |
| distributed under the License is distributed on an "AS IS" BASIS,
 | |
| WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 | |
| See the License for the specific language governing permissions and
 | |
| limitations under the License. */
 | |
| 
 | |
| #pragma once
 | |
| 
 | |
| #include <functional>
 | |
| #include <thread>
 | |
| #include "ParameterUpdater.h"
 | |
| #include "paddle/pserver/ParameterClient2.h"
 | |
| #include "paddle/utils/Queue.h"
 | |
| #include "paddle/utils/Util.h"
 | |
| 
 | |
| namespace paddle {
 | |
| 
 | |
| // TODO(yanfei):
 | |
| // I think that the biggest feature of rdma is packet lossless control
 | |
| // feature instead of high bandwiths, zero copy and gpu-direct rdma in
 | |
| // theroy.
 | |
| // But zero-copy and gpu-direct rdma features can help to reduce latency
 | |
| // caused by os system.
 | |
| // So, for some specified cluster, such as high density gpu cluster,
 | |
| // gpu-direct and zero copy could help to improve cluster communication
 | |
| // performance.
 | |
| //
 | |
| 
 | |
| /**
 | |
|  * Normal remote parameter updater for dense parameters.
 | |
|  *
 | |
|  * It first packs all parameters for all pservers using ParameterClient
 | |
|  * module, then wait for merged parameters data from all pservers.
 | |
|  * The synchronization pattern specified by sync-sgd or async-sgd is
 | |
|  * achieved by all pservers with the help of the controller within this
 | |
|  * remote parameter updater.
 | |
|  * This module indeedly bridges the gradient machines and parameter servers.
 | |
|  * It helps to transfer the parameters from acceleration device to cpu end
 | |
|  * for network. It contains additional parameters copy buffers for
 | |
|  * acceleration devices at cpu end, such as gpu, otherwise it will
 | |
|  * directly use original parameters data to update pservers.
 | |
|  *
 | |
|  * This remote parameter updater does not use pipeline mechanism to hide
 | |
|  * copy latency from gpu to cpu buffer. In addition the overlapped between
 | |
|  * backward and communication is not supported.
 | |
|  */
 | |
| class RemoteParameterUpdater : public ParameterUpdater {
 | |
| public:
 | |
|   RemoteParameterUpdater(
 | |
|       const OptimizationConfig& config,
 | |
|       int expectedPassCount,
 | |
|       std::unique_ptr<ParameterUpdater>&& localUpdater = nullptr);
 | |
|   ~RemoteParameterUpdater() {
 | |
|     if (controllerThread_) {
 | |
|       controllerThread_->join();
 | |
|     }
 | |
|   }
 | |
| 
 | |
|   /**
 | |
|    * initialize the internal parameter client and itself.
 | |
|    */
 | |
|   virtual void init(const std::vector<ParameterPtr>& parameters);
 | |
|   /**
 | |
|    * @brief start batch
 | |
|    *
 | |
|    * @note  one batch training exhibits stateful feature to help
 | |
|    *        to do performance tuning, sgd optimization if necessary.
 | |
|    */
 | |
|   virtual PassType startBatch(int64_t batchSize) {
 | |
|     if (localUpdater_) {
 | |
|       localUpdater_->startBatch(batchSize);
 | |
|     }
 | |
|     batchSize_ = batchSize;
 | |
|     batchStatus_ = BATCH_START;
 | |
|     return PASS_TRAIN;
 | |
|   }
 | |
| 
 | |
|   /**
 | |
|    * send parameters to pservers and get returned parameters
 | |
|    * from all pservers if necessary. it will implictly
 | |
|    * cooperate with controller thread for sync-sgd.
 | |
|    */
 | |
|   virtual void finishBatch(real cost);
 | |
|   virtual void startPass();
 | |
|   virtual bool finishPass();
 | |
| 
 | |
| #ifndef PADDLE_DISABLE_TIMER
 | |
|   virtual void setForwardbackwardTime(uint64_t delta) {
 | |
|     parameterClient_->setForwardbackwardTime(delta);
 | |
|   }
 | |
| #endif
 | |
| 
 | |
|   virtual void apply();
 | |
|   virtual void restore();
 | |
| 
 | |
| protected:
 | |
|   /**
 | |
|    * control all pservers with all trainers for sync-sgd
 | |
|    */
 | |
|   virtual void controller();
 | |
| 
 | |
|   /**
 | |
|    * work need to do after finishBatch
 | |
|    */
 | |
|   virtual void updateImpl(Parameter* para);
 | |
| 
 | |
|   void startController();
 | |
| 
 | |
|   /**
 | |
|    * @brief copy parameters from cpu host to device, such as gpu.
 | |
|    *
 | |
|    * @note  return if all data are transfered.
 | |
|    */
 | |
|   void copyParametersToDevice(ParameterType parameterType);
 | |
| 
 | |
|   /**
 | |
|    * @brief copy parameters from device to cpu host
 | |
|    *
 | |
|    * @note  return if all data are transfered
 | |
|    */
 | |
|   void copyParametersFromDevice(ParameterType parameterType);
 | |
| 
 | |
| protected:
 | |
|   /// Optimization config used to guide initialization and finishBatch
 | |
|   OptimizationConfig config_;
 | |
|   /// internal parameter client object for exchanging data with pserver
 | |
|   std::unique_ptr<ParameterClient2> parameterClient_;
 | |
|   /// internal shadow buffer at cpu host end, use original parameters_
 | |
|   /// if no acceleration devices are used.
 | |
|   std::vector<ParameterPtr> cpuParameters_;
 | |
|   /// local updater for aggregating multi-batches local delta
 | |
|   std::unique_ptr<ParameterUpdater> localUpdater_;
 | |
|   /// the size of mini-batch
 | |
|   int64_t batchSize_;
 | |
|   /// batches passed
 | |
|   int64_t numBatches_;
 | |
|   /// for stateful control
 | |
|   BatchStatus batchStatus_;
 | |
|   /// controller thread for sync-sgd
 | |
|   std::unique_ptr<std::thread> controllerThread_;
 | |
|   /// passed already finished
 | |
|   int64_t passCount_;
 | |
|   /// expected passes to finished
 | |
|   int64_t expectedPassCount_;
 | |
|   /// use normal synchronization communication if True
 | |
|   bool separateSendAndRecv_;
 | |
|   /// true if it's first pass
 | |
|   bool isFirstPass_;
 | |
|   bool useApplyInPserver_;
 | |
| 
 | |
|   static const std::string kAverage;
 | |
|   static const std::string kElasticAverage;
 | |
| };
 | |
| 
 | |
| // TODO(yanfei):
 | |
| // do parameters level synchronization Optimization at pserver end with
 | |
| // ConcurrentRemoteParameterUpdater to get more parallelization, at last
 | |
| // to really hide pserver latency in backward computation.
 | |
| //
 | |
| /**
 | |
|  * This updater add additional optimization for overlapping synchronization
 | |
|  * from pservers with backward computation.
 | |
|  *
 | |
|  * Parameter can be sent to pservers when related backward stage is finished.
 | |
|  * This concurrent udpater does data copy from acceleration device to host
 | |
|  * memory aynchronously. In addition internal parameter client reads data in
 | |
|  * host memory and send them to all pservers in next stage. So this class
 | |
|  * help to pipeline device-to-host copy and host-to-network to hide network
 | |
|  * latency in backward stage.
 | |
|  * It contains separate send and recv thread for pipeline usage.
 | |
|  */
 | |
| class ConcurrentRemoteParameterUpdater : public RemoteParameterUpdater {
 | |
| public:
 | |
|   ConcurrentRemoteParameterUpdater(
 | |
|       OptimizationConfig config,
 | |
|       int expectedPassCount,
 | |
|       std::unique_ptr<ParameterUpdater>&& localUpdater);
 | |
|   ~ConcurrentRemoteParameterUpdater();
 | |
| 
 | |
|   /**
 | |
|    * @brief send paraemeters to all pservers
 | |
|    *
 | |
|    * @note  it just signal the end signal to internal parameter client
 | |
|    *        to finished the aynchronous send action. In addition it also
 | |
|    *        do synchronization for all asynchronous host-to-device copy.
 | |
|    */
 | |
|   virtual void finishBatch(real cost);
 | |
| 
 | |
| protected:
 | |
|   virtual void updateImpl(Parameter* para);
 | |
|   /// internal thread called in send thread
 | |
|   void send(Parameter* para);  // para == NULL indicate end of a minibatch
 | |
|   /// internal function called in recv thread
 | |
|   void recv(Parameter* para);
 | |
|   /**
 | |
|    * @brief send thread for relaying data from gradient to parameter client
 | |
|    *
 | |
|    * @note  just pipe data to internal parameter client for pipeline
 | |
|    */
 | |
|   void send();
 | |
|   /**
 | |
|    * @brief recv thread for relaying data from internal parameter client to
 | |
|    *        host memory
 | |
|    *
 | |
|    * @note  it contains the asynchronous data copy form host to device
 | |
|    */
 | |
|   void recv();
 | |
|   /// copy specified parameter from host to device
 | |
|   void copySingleParaToDevice(Parameter* para, ParameterType parameterType);
 | |
|   /// copy specified parameter from device to host
 | |
|   void copySingleParaFromDevice(Parameter* para, ParameterType parameterType);
 | |
|   bool needToUpdateRemotely() {
 | |
|     return (numBatches_ + 1) % config_.num_batches_per_send_parameter() == 0;
 | |
|   }
 | |
| 
 | |
| private:
 | |
|   /// send thread used for overlapping
 | |
|   std::unique_ptr<std::thread> sendThread_;
 | |
|   /// recv thread used for overlapping
 | |
|   std::unique_ptr<std::thread> recvThread_;
 | |
|   /// buffer queue for overlapping
 | |
|   Queue<int> sendQueue_;
 | |
|   /// buffer queue for overlapping
 | |
|   Queue<int> recvQueue_;
 | |
|   /// flags indicating to stop
 | |
|   bool stopping_;
 | |
|   /// conditional variable for threads synchronization between the
 | |
|   /// thread calling finishBatch and internal recv thread
 | |
|   LockedCondition finishBatchCond_;
 | |
|   bool oneBatchFinished_;
 | |
| };
 | |
| 
 | |
| // TODO(yanfei):
 | |
| // merge sparse updater with dense updater, and could help to reduce
 | |
| // the synchronization between sparse and dense udpater. it could also
 | |
| // reduce the threads for managing all connections.
 | |
| /**
 | |
|  * This class is specified for updating sparse parameters.
 | |
|  *
 | |
|  * It allows part of parameter to be exchanged with all pservers.
 | |
|  * If sparse input assigned, part gradients of first hidden layer
 | |
|  * could remained zero which can not need to be exchanged within
 | |
|  * all pservers. This is the key optimization point for this updater
 | |
|  *
 | |
|  * For updating sparse parameters, all latest parameters are stored
 | |
|  * in pservers instead of keeping full copy at train end, so need to
 | |
|  * prefetch parameters weight value which can be changed in next-batch
 | |
|  * before doing next forwardbackward. Also, with above fact that the
 | |
|  * parameters can be stored in pserver instead of trainer, we can
 | |
|  * fetch specified parmeters if necessary, and can support huge
 | |
|  * parameters which is larger enough than  the RAM size in single
 | |
|  * node.
 | |
|  *
 | |
|  * Internally, this updater will direct internal parameter client
 | |
|  * to encapsulate sparse specified message for all pservers.
 | |
|  */
 | |
| class SparseRemoteParameterUpdater : public ParameterUpdater {
 | |
| public:
 | |
|   SparseRemoteParameterUpdater(const OptimizationConfig& config,
 | |
|                                int expectedPassCount,
 | |
|                                bool testing);
 | |
|   ~SparseRemoteParameterUpdater() {
 | |
|     if (controllerThread_) {
 | |
|       controllerThread_->join();
 | |
|     }
 | |
|   }
 | |
| 
 | |
|   /// initialization
 | |
|   virtual void init(const std::vector<ParameterPtr>& parameters);
 | |
| 
 | |
|   /// stateful batch control
 | |
|   virtual PassType startBatch(int64_t batchSize);
 | |
|   /// send all sparse related parameters to all pservers
 | |
|   virtual void finishBatch(real cost);
 | |
|   virtual void startPass();
 | |
|   virtual bool finishPass();
 | |
| 
 | |
|   virtual void apply();
 | |
|   virtual void restore();
 | |
| 
 | |
|   /// load parameters from pservers
 | |
|   virtual void loadParametersRemote(const std::string& dirName);
 | |
|   /// save parameters to pservers
 | |
|   virtual void saveParametersRemote(const std::string& dirName);
 | |
|   /**
 | |
|    * @brief get latest sparse parameters value from all pservers
 | |
|    *
 | |
|    * @note  call it before next mini-batch
 | |
|    */
 | |
|   virtual void getParametersRemote(bool fullSize, bool apply);
 | |
|   virtual void randParametersRemote();
 | |
| #ifndef PADDLE_DISABLE_TIMER
 | |
|   virtual void setForwardbackwardTime(uint64_t delta) {
 | |
|     parameterClient_->setForwardbackwardTime(delta);
 | |
|   }
 | |
| #endif
 | |
| 
 | |
| protected:
 | |
|   /// update implimentation, not implemented
 | |
|   virtual void updateImpl(Parameter* para) {}
 | |
| 
 | |
|   /// internal controller routine for controller thread
 | |
|   virtual void controller();
 | |
| 
 | |
|   /// start controller thread
 | |
|   void startController();
 | |
| 
 | |
| protected:
 | |
|   /// optimization config
 | |
|   OptimizationConfig config_;
 | |
|   /// internal parameter client
 | |
|   std::unique_ptr<ParameterClient2> parameterClient_;
 | |
|   int64_t batchSize_;
 | |
|   std::unique_ptr<std::thread> controllerThread_;
 | |
|   int64_t passCount_;
 | |
|   int64_t expectedPassCount_;
 | |
|   bool testing_;
 | |
|   bool useApplyInPserver_;
 | |
| };
 | |
| 
 | |
| /**
 | |
|  * Class for supporting normal updater and sparse updater
 | |
|  *
 | |
|  * Not all parts of one model are sparse, so it exists dense updater
 | |
|  * for normal layers while sparse updater is for sparse layers.
 | |
|  *
 | |
|  * it directly call internal dense and sparse udpater individually.
 | |
|  */
 | |
| class SparseRemoteParameterUpdaterComposite : public ParameterUpdaterComposite {
 | |
| public:
 | |
|   enum {
 | |
|     UPDATER_SPARSE_REMOTE = 0,  // execute in sync thread pool(tid:0)
 | |
|     UPDATER_NORMAL = 1,         // execute in Owner thread(tid:1)
 | |
|     NUMBER_UPDATERS = 2,
 | |
|   };
 | |
|   /**
 | |
|    * @brief create one dense updater and one sparse updater
 | |
|    *
 | |
|    * @note  use syncThreadPool to synchronize these two updaters
 | |
|    */
 | |
|   SparseRemoteParameterUpdaterComposite(
 | |
|       const OptimizationConfig& config,
 | |
|       int expectedPassCount,
 | |
|       bool testing,
 | |
|       std::unique_ptr<ParameterUpdater>&& normalUpdater) {
 | |
|     updaters_.resize(NUMBER_UPDATERS);
 | |
|     updaters_[UPDATER_SPARSE_REMOTE].reset(
 | |
|         new SparseRemoteParameterUpdater(config, expectedPassCount, testing));
 | |
|     updaters_[UPDATER_NORMAL] = std::move(normalUpdater);
 | |
| 
 | |
|     syncThreadPool_.reset(new SyncThreadPool(NUMBER_UPDATERS - 1));
 | |
|   }
 | |
| 
 | |
|   /// initialization of dense and sparse updaters
 | |
|   virtual void init(const std::vector<ParameterPtr>& parameters);
 | |
| };
 | |
| 
 | |
| class ParameterUpdaterCreators {
 | |
| public:
 | |
|   /**
 | |
|    * @brief add a creator to create custom ParameterUpdater while training.
 | |
|    *        The creator is a function with type (alogrithm, optConfig, isLocal,
 | |
|    *        numPasses) -> ParameterUpdater*. Trainer will use this
 | |
|    *        ParameterUpdater if creator can create a no nullptr
 | |
|    *        ParameterUpdater. Return nullptr will use trainer's default
 | |
|    *        updaters.
 | |
|    *
 | |
|    * @param creator method which can create ParameterUpdater.
 | |
|    */
 | |
|   static void addCreator(
 | |
|       const std::function<ParameterUpdater*(
 | |
|           const std::string&,         // algo
 | |
|           const OptimizationConfig&,  // optConfig
 | |
|           bool,                       // isLocal
 | |
|           size_t                      // numPasses
 | |
|           )>& creator) {  // NOLINT  explicit move closing ) in this line
 | |
|                           // for readability
 | |
|     constructors_.push_back(creator);
 | |
|   }
 | |
| 
 | |
|   /**
 | |
|    * @brief Try to create an updater by given algo, optConfig, isLocal,
 | |
|    *        numPasses. Return nullptr if cannot create anyone.
 | |
|    * @param algo algorithm string.
 | |
|    * @param optConfig optimization config.
 | |
|    * @param isLocal is in local mode or not.
 | |
|    * @param numPasses total passes that trainer will train.
 | |
|    * @return nullptr if fail, not nullptr if we can create an updater.
 | |
|    */
 | |
|   static ParameterUpdater* tryCreateUpdater(const std::string& algo,
 | |
|                                             const OptimizationConfig& optConfig,
 | |
|                                             bool isLocal,
 | |
|                                             size_t numPasses) {
 | |
|     for (auto& c : constructors_) {
 | |
|       if (auto updater = c(algo, optConfig, isLocal, numPasses)) {
 | |
|         return updater;
 | |
|       }
 | |
|     }
 | |
|     return nullptr;
 | |
|   }
 | |
| 
 | |
| private:
 | |
|   static std::vector<std::function<ParameterUpdater*(
 | |
|       const std::string&, const OptimizationConfig&, bool, size_t)>>
 | |
|       constructors_;
 | |
| };
 | |
| 
 | |
| }  // namespace paddle
 |