Merge branch 'develop' of github.com:PaddlePaddle/Paddle into network

gangliao-patch-1
Superjom 8 years ago
commit 309b37e52e

@ -172,53 +172,3 @@ TEST_F(CommonTest, syncThreadPool) {
EXPECT_EQ((int)0, nums[i]); EXPECT_EQ((int)0, nums[i]);
} }
} }
TEST_F(CommonTest, barrierStat) {
const int threadNum = 10;
SyncThreadPool pool(threadNum);
#define TEST_BARRIER_RANDOM(statName, numConnThreads, ...) \
pool.exec([&](int tid, size_t numThreads) { \
struct timeval time; \
gettimeofday(&time, nullptr); \
uint64_t usec = timeToMicroSecond(time); \
std::srand(usec); \
auto value = std::rand() % 100000; \
usleep(value); \
REGISTER_SLOW_NODES_PROBE( \
globalStat, statName, numConnThreads, tid, __VA_ARGS__); \
});
for (auto i = 0; i < 10; i++) {
TEST_BARRIER_RANDOM("synThreadBarrier1", threadNum);
TEST_BARRIER_RANDOM("synThreadBarrier2", threadNum);
}
globalStat.printAllStatus();
globalStat.reset();
for (auto i = 0; i < 10; i++) {
TEST_BARRIER_RANDOM("synThreadBarrier3", threadNum, "tag0");
TEST_BARRIER_RANDOM("synThreadBarrier4", threadNum, "tag1");
}
globalStat.printAllStatus();
globalStat.reset();
// use it to test accurate barrier gap
#define TEST_BARRIER(statName, numConnThreads, ...) \
pool.exec([&](int tid, size_t numThreads) { \
usleep(tid * 10000); \
REGISTER_SLOW_NODES_PROBE( \
globalStat, statName, numConnThreads, tid, __VA_ARGS__); \
});
for (auto i = 0; i < 10; i++) {
TEST_BARRIER("synThreadBarrier3", threadNum, "tag0");
TEST_BARRIER("synThreadBarrier4", threadNum, "tag1");
}
globalStat.printAllStatus();
globalStat.reset();
}

File diff suppressed because it is too large Load Diff

@ -298,24 +298,6 @@ protected:
/// barrier performance tuning sync-sgd required /// barrier performance tuning sync-sgd required
std::atomic<int64_t> batchId_; std::atomic<int64_t> batchId_;
/// the beginning of addGradient without network overhead
ThreadLocal<struct timeval> addGradBegin_;
/**
* tuning barrier performance
* to better control log for sparse and dense parameter,
* we use different log entities for different parameterServer
* objects.
* it will output lots of performance stats to perceive the
* overhead of network, fluctuation of computation from
* forwardbackward and network, computation from optimization
* at pserver end, barrier overhead, etc. to understand tuning
* data, focus on the synchronization between addGradient and
* doOperation which indirectly call op_SGD operation controlled
* by remote updater controller
*/
std::unique_ptr<StatSet> statSet_;
public: public:
struct Buffer { struct Buffer {
real* base; real* base;
@ -325,7 +307,6 @@ public:
protected: protected:
/// async gradient commit control /// async gradient commit control
bool asyncGrdientCommitCheckAndStat(const SendParameterRequest& request); bool asyncGrdientCommitCheckAndStat(const SendParameterRequest& request);
void printAsyncGradientCommitStatAndReset();
public: public:
/// disable default parameter for overloading /// disable default parameter for overloading
@ -710,36 +691,6 @@ public:
void op_load(const Operation& operation, OperationResult* result); void op_load(const Operation& operation, OperationResult* result);
void op_save(const Operation& operation, OperationResult* result); void op_save(const Operation& operation, OperationResult* result);
/**
* @brief output log in at the middle stage of training
*
* @note flush log histroy and state at the end for sgd
*/
void tuningSgdMidOutput();
/**
* @brief output log in at the end stage of training
*
* @note flush log histroy and state at the end for sgd. it will also
* flush some stateful stat for next pass.
*/
void tuningSgdFinished();
/**
* @brief output log in at the middle stage of training
*
* @note flush log histroy and state at the end for async-sgd.
* it will log some performance log if some lagged node are found
*/
void tuningAsyncsgdMidOutput();
/**
* @brief output log in at the end stage of training
*
* @note flush log histroy and state at the end for async-sgd.
*/
void tuningAsyncsgdFinished();
}; };
} // namespace paddle } // namespace paddle

File diff suppressed because it is too large Load Diff

File diff suppressed because it is too large Load Diff

@ -97,34 +97,6 @@ std::ostream& operator<<(std::ostream& outPut, const Stat& stat) {
return outPut; return outPut;
} }
BarrierStatPtr StatSet::getStat(uint16_t numConnThreads,
const std::string& name,
BarrierStatType bType) {
{
ReadLockGuard guard(lock_);
auto it = barrierStatSet_.find(name);
if (it != barrierStatSet_.end()) {
return it->second;
}
}
std::lock_guard<RWLock> guard(lock_);
// test again with lock_guard
auto it = barrierStatSet_.find(name);
if (it != barrierStatSet_.end()) {
return it->second;
}
BarrierStatPtr stat;
if (bType == BARRIER_END) {
stat = std::make_shared<BarrierEndStat>(numConnThreads, name);
} else if (bType == BARRIER_DELTA) {
stat = std::make_shared<BarrierDeltaStat>(numConnThreads, name);
}
auto ret = barrierStatSet_.insert(std::make_pair(name, stat));
return ret.first->second;
}
void StatSet::printSegTimerStatus() { void StatSet::printSegTimerStatus() {
ReadLockGuard guard(lock_); ReadLockGuard guard(lock_);
LOG(INFO) << std::setiosflags(std::ios::left) << std::setfill(' ') LOG(INFO) << std::setiosflags(std::ios::left) << std::setfill(' ')
@ -135,46 +107,20 @@ void StatSet::printSegTimerStatus() {
} }
} }
void StatSet::printBarrierTimerStatus() {
ReadLockGuard guard(lock_);
if (barrierStatSet_.empty()) {
return;
}
// control barrierAbstact in runtime, so enable compliation
LOG(INFO) << std::setiosflags(std::ios::left) << std::setfill(' ')
<< "======= BarrierStatSet status ======" << std::endl;
for (auto& stat : barrierStatSet_) {
LOG(INFO) << std::setiosflags(std::ios::left) << std::setfill(' ')
<< *(stat.second);
}
}
void StatSet::printAllStatus() { void StatSet::printAllStatus() {
#ifndef PADDLE_DISABLE_TIMER #ifndef PADDLE_DISABLE_TIMER
printSegTimerStatus(); printSegTimerStatus();
#endif #endif
printBarrierTimerStatus();
LOG(INFO) << std::setiosflags(std::ios::left) LOG(INFO) << std::setiosflags(std::ios::left)
<< "--------------------------------------------------" << "--------------------------------------------------"
<< std::endl; << std::endl;
} }
void StatSet::printStatus(const std::string& name) {
ReadLockGuard guard(lock_);
auto iter = statSet_.find(name);
CHECK(iter != statSet_.end()) << name << " is not registed in " << name_;
LOG(INFO) << *(iter->second);
}
void StatSet::reset(bool clearRawData) { void StatSet::reset(bool clearRawData) {
ReadLockGuard guard(lock_); ReadLockGuard guard(lock_);
for (auto& stat : statSet_) { for (auto& stat : statSet_) {
stat.second->reset(); stat.second->reset();
} }
// reset barrierStat
for (auto& stat : barrierStatSet_) {
stat.second->reset(clearRawData);
}
} }
void StatSet::setThreadInfo(const std::string& name, bool flag) { void StatSet::setThreadInfo(const std::string& name, bool flag) {
@ -184,13 +130,6 @@ void StatSet::setThreadInfo(const std::string& name, bool flag) {
iter->second->setThreadInfo(flag); iter->second->setThreadInfo(flag);
} }
void StatSet::deleteStat(const std::string& name) {
std::lock_guard<RWLock> guard(lock_);
auto iter = statSet_.find(name);
CHECK(iter != statSet_.end()) << name << " is not registed in " << name_;
statSet_.erase(iter);
}
StatInfo::~StatInfo() { StatInfo::~StatInfo() {
if (stat_) { if (stat_) {
std::lock_guard<std::mutex> guard(stat_->lock_); std::lock_guard<std::mutex> guard(stat_->lock_);

@ -23,7 +23,6 @@ limitations under the License. */
#include <string> #include <string>
#include <unordered_map> #include <unordered_map>
#include "BarrierStat.h"
#include "Locks.h" #include "Locks.h"
#include "Logging.h" #include "Logging.h"
#include "ThreadLocal.h" #include "ThreadLocal.h"
@ -60,12 +59,6 @@ public:
class Stat; class Stat;
typedef std::shared_ptr<Stat> StatPtr; typedef std::shared_ptr<Stat> StatPtr;
typedef std::shared_ptr<BarrierStatBase> BarrierStatPtr;
enum BarrierStatType {
BARRIER_END = 0,
BARRIER_DELTA = 1,
};
class StatSet { class StatSet {
public: public:
@ -74,11 +67,8 @@ public:
// print to LOG(INFO) // print to LOG(INFO)
void printSegTimerStatus(); void printSegTimerStatus();
void printBarrierTimerStatus();
void printAllStatus(); void printAllStatus();
void printStatus(const std::string& name);
StatPtr getStat(const std::string& name) { StatPtr getStat(const std::string& name) {
{ {
ReadLockGuard guard(lock_); ReadLockGuard guard(lock_);
@ -93,12 +83,6 @@ public:
return ret.first->second; return ret.first->second;
} }
BarrierStatPtr getStat(uint16_t numConnThreads,
const std::string& name,
BarrierStatType bType);
void deleteStat(const std::string& name);
// true for showing stats for each thread // true for showing stats for each thread
// false for showing stats aggragated over threads // false for showing stats aggragated over threads
void setThreadInfo(const std::string& name, bool flag); void setThreadInfo(const std::string& name, bool flag);
@ -120,7 +104,6 @@ public:
private: private:
std::unordered_map<std::string, StatPtr> statSet_; std::unordered_map<std::string, StatPtr> statSet_;
std::unordered_map<std::string, BarrierStatPtr> barrierStatSet_;
const std::string name_; const std::string name_;
RWLock lock_; RWLock lock_;
}; };

Loading…
Cancel
Save