!14588 add prepare run of graph scheduler

From: @limingqi107
Reviewed-by: @cristoval,@wilfchen
Signed-off-by: @wilfchen
pull/14588/MERGE
mindspore-ci-bot 4 years ago committed by Gitee
commit 64689d56a1

@ -22,6 +22,8 @@
#include "backend/optimizer/common/helper.h" #include "backend/optimizer/common/helper.h"
#include "utils/config_manager.h" #include "utils/config_manager.h"
#include "utils/log_adapter.h" #include "utils/log_adapter.h"
#include "utils/convert_utils.h"
#include "common/trans.h"
namespace mindspore { namespace mindspore {
namespace runtime { namespace runtime {
@ -101,6 +103,177 @@ void UpdateRefCount(const AnfNodePtr &node, size_t output_idx) {
device_tensor->IncreaseRefCount(); device_tensor->IncreaseRefCount();
device_tensor->ResetRefCountUsed(); device_tensor->ResetRefCountUsed();
} }
// The branch processing of PrepareDataForValueNode that value type is tensor.
void PrepareDataForValueNodeTensor(const ValueNodePtr &node, const ValuePtr &node_value,
const DeviceContext *device_context) {
MS_EXCEPTION_IF_NULL(node);
MS_EXCEPTION_IF_NULL(node_value);
MS_EXCEPTION_IF_NULL(device_context);
std::vector<TensorPtr> tensors;
TensorValueToTensor(node_value, &tensors);
for (size_t i = 0; i < tensors.size(); i++) {
const auto &tensor = tensors[i];
if (tensor == nullptr) {
MS_LOG(WARNING) << "Tensor is null";
return;
}
const auto &device_tensor = AnfAlgo::GetMutableOutputAddr(node, i);
MS_EXCEPTION_IF_NULL(device_tensor);
// If the ptr of device tensor is not nullptr, it indicates that the device data has been prepared.
if (device_tensor->GetPtr() != nullptr) {
return;
}
MS_LOG(INFO) << "Prepare device data for value node: " << node->fullname_with_scope() << ", output index: " << i;
tensor->set_device_address(device_tensor);
// Allocate device memory.
if (!device_context->AllocateMemory(device_tensor.get(), device_tensor->GetSize())) {
MS_LOG(EXCEPTION) << "Device memory isn't enough and alloc failed, node name: " << node->fullname_with_scope()
<< ", alloc size: " << device_tensor->GetSize();
}
// Copy data from host tensor to device.
if (!device_tensor->SyncHostToDevice(trans::GetRuntimePaddingShape(node, 0), LongToSize(tensor->data().nbytes()),
tensor->data_type(), tensor->data_c())) {
MS_LOG(EXCEPTION) << "SyncHostToDevice failed, node name: " << node->fullname_with_scope();
}
}
}
// Prepare the device data for persistent device tensor of value node.
void PrepareDataForValueNode(const ValueNodePtr &node, const DeviceContext *device_context) {
MS_EXCEPTION_IF_NULL(node);
MS_EXCEPTION_IF_NULL(device_context);
auto &node_value = node->value();
MS_EXCEPTION_IF_NULL(node_value);
if (node_value->isa<tensor::Tensor>() || node_value->isa<ValueTuple>()) {
// The branch processing that value type is tensor.
PrepareDataForValueNodeTensor(node, node_value, device_context);
} else if (node_value->isa<StringImm>()) {
const auto &device_tensor = AnfAlgo::GetMutableOutputAddr(node, 0);
MS_EXCEPTION_IF_NULL(device_tensor);
// If the ptr of device tensor is not nullptr, it indicates that the device data has been prepared.
if (device_tensor->GetPtr() != nullptr) {
return;
}
MS_LOG(INFO) << "Prepare device data for value node: " << node->fullname_with_scope();
// Allocate device memory.
if (!device_context->AllocateMemory(device_tensor.get(), device_tensor->GetSize())) {
MS_LOG(EXCEPTION) << "Device memory isn't enough and alloc failed, node name: " << node->fullname_with_scope()
<< ", alloc size: " << device_tensor->GetSize();
}
// Copy data from value to device.
auto value = GetValue<std::string>(node_value);
size_t tensor_size = value.size();
ShapeVector shape = {1, SizeToLong(tensor_size)};
if (!device_tensor->SyncHostToDevice(shape, tensor_size, kNumberTypeUInt8, value.data())) {
MS_LOG(EXCEPTION) << "SyncHostToDevice failed, node name: " << node->fullname_with_scope();
}
}
}
// Prepare the device data for persistent device tensor of weight node from host tensor.
void PrepareDataForWeightNode(const AnfNodePtr &node, const TensorPtr &tensor, const DeviceContext *device_context) {
MS_EXCEPTION_IF_NULL(node);
MS_EXCEPTION_IF_NULL(tensor);
const auto &device_tensor = AnfAlgo::GetMutableOutputAddr(node, 0);
MS_EXCEPTION_IF_NULL(device_tensor);
// If the ptr of device tensor is not nullptr, it indicates that the device data has been prepared.
if (device_tensor->GetPtr() != nullptr) {
return;
}
MS_LOG(INFO) << "Prepare device data for weight node: " << node->fullname_with_scope();
tensor->set_device_address(device_tensor);
// Allocate device memory.
if (!device_context->AllocateMemory(device_tensor.get(), device_tensor->GetSize())) {
MS_LOG(EXCEPTION) << "Device memory isn't enough and alloc failed, node name: " << node->fullname_with_scope()
<< ", alloc size: " << device_tensor->GetSize();
}
// Copy data from host tensor to device.
if (!device_tensor->SyncHostToDevice(trans::GetRuntimePaddingShape(node, 0), LongToSize(tensor->data().nbytes()),
tensor->data_type(), tensor->data_c())) {
MS_LOG(EXCEPTION) << "SyncHostToDevice failed, node name: " << node->fullname_with_scope();
}
}
BaseRef CreateOutputTensor(const session::KernelWithIndex &node_output_pair, const KernelGraphPtr &graph,
const std::vector<tensor::TensorPtr> &input_tensors) {
auto &node = node_output_pair.first;
auto output_index = node_output_pair.second;
MS_EXCEPTION_IF_NULL(node);
if (node->isa<ValueNode>()) {
// If node is a value node, return the value.
auto value_node = node->cast<ValueNodePtr>();
MS_EXCEPTION_IF_NULL(value_node);
return value_node->value();
} else if (node->isa<Parameter>()) {
// If node is a parameter node, return tensor from input_tensors.
MS_EXCEPTION_IF_NULL(graph);
const auto &input_nodes = graph->inputs();
auto iter = find(input_nodes.begin(), input_nodes.end(), node);
if (iter == input_nodes.end()) {
MS_LOG(EXCEPTION) << "Parameter node: " << node->fullname_with_scope() << " is not exist.";
}
auto position = IntToSize(std::distance(input_nodes.begin(), iter));
return input_tensors[position];
} else {
// Create tensor.
TypeId type_id = AnfAlgo::GetOutputDeviceDataType(node, output_index);
if (type_id == kTypeUnknown) {
type_id = AnfAlgo::GetOutputInferDataType(node, output_index);
}
std::vector<int64_t> temp_shape;
auto shape = AnfAlgo::GetOutputInferShape(node, output_index);
(void)std::copy(shape.begin(), shape.end(), std::back_inserter(temp_shape));
auto tensor = std::make_shared<tensor::Tensor>(type_id, temp_shape);
MS_EXCEPTION_IF_NULL(tensor);
tensor->set_padding_type(AnfAlgo::GetOutputReshapeType(node, output_index));
// Set device address to tensor.
const auto &device_tensor = AnfAlgo::GetMutableOutputAddr(node, output_index);
MS_EXCEPTION_IF_NULL(device_tensor);
tensor->set_device_address(device_tensor);
return tensor;
}
}
BaseRef CreateOutputTensors(const AnfNodePtr &output_node, const KernelGraphPtr &graph,
const std::vector<tensor::TensorPtr> &input_tensors) {
MS_EXCEPTION_IF_NULL(output_node);
auto item_with_index = AnfAlgo::VisitKernelWithReturnType(output_node, 0);
MS_EXCEPTION_IF_NULL(item_with_index.first);
// Special handle for make tuple.
if (AnfAlgo::CheckPrimitiveType(item_with_index.first, prim::kPrimMakeTuple)) {
auto cnode = item_with_index.first->cast<CNodePtr>();
MS_EXCEPTION_IF_NULL(cnode);
VectorRef ret;
for (size_t i = 1; i < cnode->inputs().size(); ++i) {
if (!AnfAlgo::CheckPrimitiveType(cnode->input(i), prim::kPrimControlDepend)) {
auto out = CreateOutputTensors(cnode->input(i), graph, input_tensors);
ret.push_back(out);
}
}
return ret;
}
// If the node return nothing, return an empty vectorRef.
if (AnfAlgo::GetOutputTensorNum(item_with_index.first) == 0) {
return VectorRef();
}
return CreateOutputTensor(item_with_index, graph, input_tensors);
}
} // namespace } // namespace
void GraphScheduler::Initialize() { void GraphScheduler::Initialize() {
@ -156,6 +329,47 @@ void GraphScheduler::Schedule(const ActorSet *actor_set) {
} }
} }
void GraphScheduler::PrepareRun(const KernelGraphPtr &graph, const DeviceContext *device_context,
const std::vector<TensorPtr> *input_tensors, VectorRef *const &outputs) {
MS_EXCEPTION_IF_NULL(graph);
MS_EXCEPTION_IF_NULL(device_context);
MS_EXCEPTION_IF_NULL(input_tensors);
MS_EXCEPTION_IF_NULL(outputs);
// 1.Prepare the data of device tensor store(value nodes of graph).
for (const auto &value_node : graph->graph_value_nodes()) {
PrepareDataForValueNode(value_node, device_context);
}
// 1.Prepare the data of device tensor store(weights of graph), and fill the host tensors for non weighted parameters.
std::vector<TensorPtr> host_tensors;
const auto &input_nodes = graph->input_nodes();
for (size_t i = 0; i < input_nodes.size(); ++i) {
const auto &input_node = input_nodes[i];
const auto &input_tensor = (*input_tensors)[i];
MS_EXCEPTION_IF_NULL(input_node);
if (IsPersistentDeviceTensor(input_node)) {
// Prepare the device data for weights.
PrepareDataForWeightNode(input_node, input_tensor, device_context);
} else {
// Fill the host tensors for non weighted parameters.
host_tensors.emplace_back(input_tensor);
}
}
// 2.Prepare the data of host tensor queue(non weighted parameters of graph).
const auto &host_tensor_queue = FetchHostQueue(graph);
MS_EXCEPTION_IF_NULL(host_tensor_queue);
host_tensor_queue->PushData(host_tensors);
// 3.Prepare the output tensor of graph.
for (const auto &output_node : graph->outputs()) {
MS_EXCEPTION_IF_NULL(output_node);
MS_LOG(INFO) << "Create node output: " << output_node->fullname_with_scope();
outputs->emplace_back(CreateOutputTensors(output_node, graph, *input_tensors));
}
}
bool GraphScheduler::Run(const ActorSet *actor_set, GraphExecutionStrategy strategy) { bool GraphScheduler::Run(const ActorSet *actor_set, GraphExecutionStrategy strategy) {
MS_EXCEPTION_IF_NULL(actor_set); MS_EXCEPTION_IF_NULL(actor_set);
// Construct OpContext. // Construct OpContext.
@ -477,5 +691,16 @@ void GraphScheduler::PersistDeviceTensor(const KernelGraphPtr &graph) {
} }
} }
HostTensorQueue *GraphScheduler::FetchHostQueue(const KernelGraphPtr &graph) const {
MS_EXCEPTION_IF_NULL(graph);
const auto &iter = graph_to_host_queue_.find(graph);
if (iter != graph_to_host_queue_.end()) {
return iter->second.get();
} else {
MS_LOG(ERROR) << "Can't find the host tensor queue map of graph: " << graph->ToString();
return nullptr;
}
}
} // namespace runtime } // namespace runtime
} // namespace mindspore } // namespace mindspore

@ -22,6 +22,7 @@
#include <memory> #include <memory>
#include <utility> #include <utility>
#include <unordered_map> #include <unordered_map>
#include <algorithm>
#include "runtime/framework/actor/data_source_actor.h" #include "runtime/framework/actor/data_source_actor.h"
#include "runtime/framework/actor/loop_count_actor.h" #include "runtime/framework/actor/loop_count_actor.h"
#include "runtime/framework/actor/kernel_actor.h" #include "runtime/framework/actor/kernel_actor.h"
@ -66,13 +67,20 @@ class GraphScheduler {
// Transform graph to actor DAG, contains build and link. // Transform graph to actor DAG, contains build and link.
ActorSet *Transform(const KernelGraphPtr &graph, const DeviceContext *device_context, ActorSet *Transform(const KernelGraphPtr &graph, const DeviceContext *device_context,
const std::vector<tensor::TensorPtr> *input_tensors = nullptr, const std::vector<TensorPtr> *input_tensors = nullptr,
GraphExecutionStrategy strategy = GraphExecutionStrategy::kPipeline); GraphExecutionStrategy strategy = GraphExecutionStrategy::kPipeline);
// Schedule actors in the actor runtime. Single machine scheduling is supported currently, and distributed scheduling // Schedule actors in the actor runtime. Single machine scheduling is supported currently, and distributed scheduling
// will be supported in the future. // will be supported in the future.
void Schedule(const ActorSet *actor_set); void Schedule(const ActorSet *actor_set);
// The prepare processing before run:
// 1. Prepare the data of device tensor store(such as weights and value nodes of graph).
// 2. Prepare the data of host tensor queue(such as non weighted parameters of graph).
// 3. Prepare the output tensor of graph.
void PrepareRun(const KernelGraphPtr &graph, const DeviceContext *device_context,
const std::vector<TensorPtr> *input_tensors, VectorRef *const &outputs);
// The processing entry of actors running. // The processing entry of actors running.
bool Run(const ActorSet *actor_set, GraphExecutionStrategy strategy = GraphExecutionStrategy::kPipeline); bool Run(const ActorSet *actor_set, GraphExecutionStrategy strategy = GraphExecutionStrategy::kPipeline);
@ -113,6 +121,9 @@ class GraphScheduler {
// Persist device tensors of graph's some nodes(such as weights and value nodes). // Persist device tensors of graph's some nodes(such as weights and value nodes).
void PersistDeviceTensor(const KernelGraphPtr &graph); void PersistDeviceTensor(const KernelGraphPtr &graph);
// Fetch the hsot tensor queue by kernel graph.
HostTensorQueue *FetchHostQueue(const KernelGraphPtr &graph) const;
std::unordered_map<KernelGraphPtr, ActorSetPtr> graph_to_actors_; std::unordered_map<KernelGraphPtr, ActorSetPtr> graph_to_actors_;
std::unordered_map<KernelGraphPtr, HostTensorQueuePtr> graph_to_host_queue_; std::unordered_map<KernelGraphPtr, HostTensorQueuePtr> graph_to_host_queue_;

Loading…
Cancel
Save