fix bug for allreduce fusion and add resnet unit test

pull/915/head
c00425699 5 years ago committed by chang zherui
parent 10312620d2
commit b3fd38c23a

@ -359,7 +359,7 @@ Status AllreduceFusion::SetFusionByBackwardCompAndAllreduceTime() {
return FAILED;
}
double para_size = (tail_time_ - allreduce_inherent_time_) / allreduce_bandwidth_;
double to_cost = allreduce_graph_.max() + FUSION_COST_EPS;
double to_cost = allreduce_graph_.max();
int32_t fusion = 1;
while (to_cost != 0) {
MS_LOG(INFO) << "to_cost: " << to_cost << " para_size: " << para_size;

@ -38,7 +38,6 @@ constexpr double DEFAULT_COST_MODEL_ALLREDUCE_FUSION_COMPUTATION_TIME_PARAMETER
constexpr char FUSION[] = "fusion";
constexpr char PARAMETER[] = "parameter";
const uint32_t MAX_RECURSIVE_CALL_TIMES = 100;
const double FUSION_COST_EPS = 1e-7;
class AllreduceFusion {
public:
AllreduceFusion()

@ -24,7 +24,19 @@
namespace mindspore {
namespace parallel {
Status AllreduceGraph::AddNode(const CNodePtr& node, const AnfNodePtr& para) {
auto arnode = std::make_shared<AllreduceNode>(AllreduceNode());
AllreduceNodePtr arnode;
auto cnode_emplace_return = cnode_set_.emplace(node);
if (!cnode_emplace_return.second) {
MS_LOG(INFO) << "node: " << node->DebugString() << " has already been added!";
auto cnode_arnode_pair = cnode_arnode_map_.find(node);
if (cnode_arnode_pair == cnode_arnode_map_.end()) {
MS_LOG(EXCEPTION) << "node is not in cnode_arnode_map_!";
}
arnode = cnode_arnode_pair->second;
} else {
arnode = std::make_shared<AllreduceNode>(AllreduceNode());
}
if (arnode->Init(node) != SUCCESS) {
MS_LOG(ERROR) << "AllreduceNode Init failed";
return FAILED;
@ -39,10 +51,6 @@ Status AllreduceGraph::AddNode(const CNodePtr& node, const AnfNodePtr& para) {
if (!arnode_emplace_return.second) {
MS_LOG(INFO) << "node: " << node->DebugString() << "'s arnode has already been added!";
}
auto cnode_emplace_return = cnode_set_.emplace(node);
if (!cnode_emplace_return.second) {
MS_LOG(INFO) << "node: " << node->DebugString() << " has already been added!";
}
cnode_emplace_return = para_cnodeset_map_[para].emplace(node);
if (!cnode_emplace_return.second) {
MS_LOG(INFO) << "node: " << node->DebugString() << " already in para: " << para->fullname_with_scope()
@ -75,7 +83,7 @@ Status AllreduceGraph::AddEdge(const CNodePtr& from, const CNodePtr& to, double
MS_LOG(ERROR) << "from_arnode AddNext failed";
return FAILED;
}
if (to_arnode->AddPrev(from_arnode, dist) != SUCCESS) {
if (to_arnode->AddPrev(from_arnode, dist, &max_) != SUCCESS) {
MS_LOG(ERROR) << "to_arnode AddPrev failed";
return FAILED;
}
@ -110,7 +118,7 @@ std::pair<std::vector<AnfNodePtr>, double> AllreduceGraph::GetParaByParaSize(dou
double cur_para_size = 0;
double from = to;
for (auto& arnode : arnode_vec_) {
if (arnode.depend_feat_size() >= to) {
if (arnode.depend_feat_size() != max_ && arnode.depend_feat_size() >= to) {
continue;
}
if (para_size > 0 && cur_para_size >= para_size && arnode.depend_feat_size() < from) {

@ -15,6 +15,7 @@
*/
#include "parallel/allreduce_fusion/allreduce_node.h"
#include <queue>
#include "parallel/tensor_layout/tensor_layout.h"
#include "utils/log_adapter.h"
@ -29,7 +30,7 @@ Status AllreduceNode::AddNext(const AllreduceNodePtr& next_node) {
return SUCCESS;
}
Status AllreduceNode::AddPrev(const AllreduceNodePtr& prev_node, double dist) {
Status AllreduceNode::AddPrev(const AllreduceNodePtr& prev_node, double dist, double* max) {
if (prev_node == nullptr) {
MS_LOG(ERROR) << "next_node is nullptr!";
return FAILED;
@ -39,7 +40,26 @@ Status AllreduceNode::AddPrev(const AllreduceNodePtr& prev_node, double dist) {
return FAILED;
}
prev_.emplace_back(prev_node);
depend_feat_size_ += prev_node->depend_feat_size() + dist;
double add_dist = prev_node->depend_feat_size() + dist;
depend_feat_size_ += add_dist;
if (depend_feat_size_ > *max) {
*max = depend_feat_size_;
}
std::queue<AllreduceNodePtr> next_queue;
for (auto& next : next_) {
next_queue.push(next);
}
while (!next_queue.empty()) {
auto ele = next_queue.front();
ele->AddDependFeatSize(add_dist);
if (ele->depend_feat_size() > *max) {
*max = ele->depend_feat_size();
}
for (auto& next : ele->next()) {
next_queue.push(next);
}
next_queue.pop();
}
return SUCCESS;
}

@ -39,9 +39,14 @@ class AllreduceNode {
const std::unordered_set<AnfNodePtr>& paras() const { return paras_; }
double curr_para_size() const { return curr_para_size_; }
virtual ~AllreduceNode() = default;
Status AddPrev(const AllreduceNodePtr& prev_node, double dist);
// Add previous node
// prev_node is the previous to be added
// max is the current max depend_feat_size of the AllreduceGraph
Status AddPrev(const AllreduceNodePtr& prev_node, double dist, double* max);
Status AddNext(const AllreduceNodePtr& next_node);
double depend_feat_size() const { return depend_feat_size_; }
void AddDependFeatSize(double add_dist) { depend_feat_size_ += add_dist; }
const std::vector<AllreduceNodePtr>& next() const { return next_; }
void ToString() const;
bool operator<(const AllreduceNode& node) const { return depend_feat_size_ < node.depend_feat_size(); }
bool operator>(const AllreduceNode& node) const { return depend_feat_size_ > node.depend_feat_size(); }

@ -275,7 +275,7 @@ def test_allreduce_fusion5():
expect_dict = {'backbone2.fc8.weight': 3,
'backbone2.fc7.weight': 3,
'backbone2.fc6.weight': 3,
'backbone2.fc5.weight': 2,
'backbone2.fc5.weight': 3,
'backbone2.fc4.weight': 2,
'backbone2.fc3.weight': 2,
'backbone2.fc2.weight': 1,
@ -283,7 +283,7 @@ def test_allreduce_fusion5():
'backbone1.fc8.weight': 3,
'backbone1.fc7.weight': 3,
'backbone1.fc6.weight': 3,
'backbone1.fc5.weight': 2,
'backbone1.fc5.weight': 3,
'backbone1.fc4.weight': 2,
'backbone1.fc3.weight': 2,
'backbone1.fc2.weight': 1,

File diff suppressed because it is too large Load Diff
Loading…
Cancel
Save