diff --git a/mindspore/ccsrc/runtime/framework/graph_scheduler.cc b/mindspore/ccsrc/runtime/framework/graph_scheduler.cc index 93260f9940..72a74f95a7 100644 --- a/mindspore/ccsrc/runtime/framework/graph_scheduler.cc +++ b/mindspore/ccsrc/runtime/framework/graph_scheduler.cc @@ -22,6 +22,8 @@ #include "backend/optimizer/common/helper.h" #include "utils/config_manager.h" #include "utils/log_adapter.h" +#include "utils/convert_utils.h" +#include "common/trans.h" namespace mindspore { namespace runtime { @@ -101,6 +103,177 @@ void UpdateRefCount(const AnfNodePtr &node, size_t output_idx) { device_tensor->IncreaseRefCount(); device_tensor->ResetRefCountUsed(); } + +// The branch processing of PrepareDataForValueNode that value type is tensor. +void PrepareDataForValueNodeTensor(const ValueNodePtr &node, const ValuePtr &node_value, + const DeviceContext *device_context) { + MS_EXCEPTION_IF_NULL(node); + MS_EXCEPTION_IF_NULL(node_value); + MS_EXCEPTION_IF_NULL(device_context); + + std::vector tensors; + TensorValueToTensor(node_value, &tensors); + + for (size_t i = 0; i < tensors.size(); i++) { + const auto &tensor = tensors[i]; + if (tensor == nullptr) { + MS_LOG(WARNING) << "Tensor is null"; + return; + } + + const auto &device_tensor = AnfAlgo::GetMutableOutputAddr(node, i); + MS_EXCEPTION_IF_NULL(device_tensor); + // If the ptr of device tensor is not nullptr, it indicates that the device data has been prepared. + if (device_tensor->GetPtr() != nullptr) { + return; + } + MS_LOG(INFO) << "Prepare device data for value node: " << node->fullname_with_scope() << ", output index: " << i; + tensor->set_device_address(device_tensor); + + // Allocate device memory. + if (!device_context->AllocateMemory(device_tensor.get(), device_tensor->GetSize())) { + MS_LOG(EXCEPTION) << "Device memory isn't enough and alloc failed, node name: " << node->fullname_with_scope() + << ", alloc size: " << device_tensor->GetSize(); + } + + // Copy data from host tensor to device. + if (!device_tensor->SyncHostToDevice(trans::GetRuntimePaddingShape(node, 0), LongToSize(tensor->data().nbytes()), + tensor->data_type(), tensor->data_c())) { + MS_LOG(EXCEPTION) << "SyncHostToDevice failed, node name: " << node->fullname_with_scope(); + } + } +} + +// Prepare the device data for persistent device tensor of value node. +void PrepareDataForValueNode(const ValueNodePtr &node, const DeviceContext *device_context) { + MS_EXCEPTION_IF_NULL(node); + MS_EXCEPTION_IF_NULL(device_context); + auto &node_value = node->value(); + MS_EXCEPTION_IF_NULL(node_value); + + if (node_value->isa() || node_value->isa()) { + // The branch processing that value type is tensor. + PrepareDataForValueNodeTensor(node, node_value, device_context); + } else if (node_value->isa()) { + const auto &device_tensor = AnfAlgo::GetMutableOutputAddr(node, 0); + MS_EXCEPTION_IF_NULL(device_tensor); + // If the ptr of device tensor is not nullptr, it indicates that the device data has been prepared. + if (device_tensor->GetPtr() != nullptr) { + return; + } + MS_LOG(INFO) << "Prepare device data for value node: " << node->fullname_with_scope(); + + // Allocate device memory. + if (!device_context->AllocateMemory(device_tensor.get(), device_tensor->GetSize())) { + MS_LOG(EXCEPTION) << "Device memory isn't enough and alloc failed, node name: " << node->fullname_with_scope() + << ", alloc size: " << device_tensor->GetSize(); + } + + // Copy data from value to device. + auto value = GetValue(node_value); + size_t tensor_size = value.size(); + ShapeVector shape = {1, SizeToLong(tensor_size)}; + if (!device_tensor->SyncHostToDevice(shape, tensor_size, kNumberTypeUInt8, value.data())) { + MS_LOG(EXCEPTION) << "SyncHostToDevice failed, node name: " << node->fullname_with_scope(); + } + } +} + +// Prepare the device data for persistent device tensor of weight node from host tensor. +void PrepareDataForWeightNode(const AnfNodePtr &node, const TensorPtr &tensor, const DeviceContext *device_context) { + MS_EXCEPTION_IF_NULL(node); + MS_EXCEPTION_IF_NULL(tensor); + const auto &device_tensor = AnfAlgo::GetMutableOutputAddr(node, 0); + MS_EXCEPTION_IF_NULL(device_tensor); + // If the ptr of device tensor is not nullptr, it indicates that the device data has been prepared. + if (device_tensor->GetPtr() != nullptr) { + return; + } + MS_LOG(INFO) << "Prepare device data for weight node: " << node->fullname_with_scope(); + tensor->set_device_address(device_tensor); + + // Allocate device memory. + if (!device_context->AllocateMemory(device_tensor.get(), device_tensor->GetSize())) { + MS_LOG(EXCEPTION) << "Device memory isn't enough and alloc failed, node name: " << node->fullname_with_scope() + << ", alloc size: " << device_tensor->GetSize(); + } + + // Copy data from host tensor to device. + if (!device_tensor->SyncHostToDevice(trans::GetRuntimePaddingShape(node, 0), LongToSize(tensor->data().nbytes()), + tensor->data_type(), tensor->data_c())) { + MS_LOG(EXCEPTION) << "SyncHostToDevice failed, node name: " << node->fullname_with_scope(); + } +} + +BaseRef CreateOutputTensor(const session::KernelWithIndex &node_output_pair, const KernelGraphPtr &graph, + const std::vector &input_tensors) { + auto &node = node_output_pair.first; + auto output_index = node_output_pair.second; + MS_EXCEPTION_IF_NULL(node); + + if (node->isa()) { + // If node is a value node, return the value. + auto value_node = node->cast(); + MS_EXCEPTION_IF_NULL(value_node); + return value_node->value(); + } else if (node->isa()) { + // If node is a parameter node, return tensor from input_tensors. + MS_EXCEPTION_IF_NULL(graph); + const auto &input_nodes = graph->inputs(); + auto iter = find(input_nodes.begin(), input_nodes.end(), node); + if (iter == input_nodes.end()) { + MS_LOG(EXCEPTION) << "Parameter node: " << node->fullname_with_scope() << " is not exist."; + } + auto position = IntToSize(std::distance(input_nodes.begin(), iter)); + return input_tensors[position]; + } else { + // Create tensor. + TypeId type_id = AnfAlgo::GetOutputDeviceDataType(node, output_index); + if (type_id == kTypeUnknown) { + type_id = AnfAlgo::GetOutputInferDataType(node, output_index); + } + std::vector temp_shape; + auto shape = AnfAlgo::GetOutputInferShape(node, output_index); + (void)std::copy(shape.begin(), shape.end(), std::back_inserter(temp_shape)); + auto tensor = std::make_shared(type_id, temp_shape); + MS_EXCEPTION_IF_NULL(tensor); + tensor->set_padding_type(AnfAlgo::GetOutputReshapeType(node, output_index)); + + // Set device address to tensor. + const auto &device_tensor = AnfAlgo::GetMutableOutputAddr(node, output_index); + MS_EXCEPTION_IF_NULL(device_tensor); + tensor->set_device_address(device_tensor); + return tensor; + } +} + +BaseRef CreateOutputTensors(const AnfNodePtr &output_node, const KernelGraphPtr &graph, + const std::vector &input_tensors) { + MS_EXCEPTION_IF_NULL(output_node); + auto item_with_index = AnfAlgo::VisitKernelWithReturnType(output_node, 0); + MS_EXCEPTION_IF_NULL(item_with_index.first); + + // Special handle for make tuple. + if (AnfAlgo::CheckPrimitiveType(item_with_index.first, prim::kPrimMakeTuple)) { + auto cnode = item_with_index.first->cast(); + MS_EXCEPTION_IF_NULL(cnode); + VectorRef ret; + for (size_t i = 1; i < cnode->inputs().size(); ++i) { + if (!AnfAlgo::CheckPrimitiveType(cnode->input(i), prim::kPrimControlDepend)) { + auto out = CreateOutputTensors(cnode->input(i), graph, input_tensors); + ret.push_back(out); + } + } + return ret; + } + + // If the node return nothing, return an empty vectorRef. + if (AnfAlgo::GetOutputTensorNum(item_with_index.first) == 0) { + return VectorRef(); + } + + return CreateOutputTensor(item_with_index, graph, input_tensors); +} } // namespace void GraphScheduler::Initialize() { @@ -156,6 +329,47 @@ void GraphScheduler::Schedule(const ActorSet *actor_set) { } } +void GraphScheduler::PrepareRun(const KernelGraphPtr &graph, const DeviceContext *device_context, + const std::vector *input_tensors, VectorRef *const &outputs) { + MS_EXCEPTION_IF_NULL(graph); + MS_EXCEPTION_IF_NULL(device_context); + MS_EXCEPTION_IF_NULL(input_tensors); + MS_EXCEPTION_IF_NULL(outputs); + + // 1.Prepare the data of device tensor store(value nodes of graph). + for (const auto &value_node : graph->graph_value_nodes()) { + PrepareDataForValueNode(value_node, device_context); + } + + // 1.Prepare the data of device tensor store(weights of graph), and fill the host tensors for non weighted parameters. + std::vector host_tensors; + const auto &input_nodes = graph->input_nodes(); + for (size_t i = 0; i < input_nodes.size(); ++i) { + const auto &input_node = input_nodes[i]; + const auto &input_tensor = (*input_tensors)[i]; + MS_EXCEPTION_IF_NULL(input_node); + if (IsPersistentDeviceTensor(input_node)) { + // Prepare the device data for weights. + PrepareDataForWeightNode(input_node, input_tensor, device_context); + } else { + // Fill the host tensors for non weighted parameters. + host_tensors.emplace_back(input_tensor); + } + } + + // 2.Prepare the data of host tensor queue(non weighted parameters of graph). + const auto &host_tensor_queue = FetchHostQueue(graph); + MS_EXCEPTION_IF_NULL(host_tensor_queue); + host_tensor_queue->PushData(host_tensors); + + // 3.Prepare the output tensor of graph. + for (const auto &output_node : graph->outputs()) { + MS_EXCEPTION_IF_NULL(output_node); + MS_LOG(INFO) << "Create node output: " << output_node->fullname_with_scope(); + outputs->emplace_back(CreateOutputTensors(output_node, graph, *input_tensors)); + } +} + bool GraphScheduler::Run(const ActorSet *actor_set, GraphExecutionStrategy strategy) { MS_EXCEPTION_IF_NULL(actor_set); // Construct OpContext. @@ -477,5 +691,16 @@ void GraphScheduler::PersistDeviceTensor(const KernelGraphPtr &graph) { } } +HostTensorQueue *GraphScheduler::FetchHostQueue(const KernelGraphPtr &graph) const { + MS_EXCEPTION_IF_NULL(graph); + const auto &iter = graph_to_host_queue_.find(graph); + if (iter != graph_to_host_queue_.end()) { + return iter->second.get(); + } else { + MS_LOG(ERROR) << "Can't find the host tensor queue map of graph: " << graph->ToString(); + return nullptr; + } +} + } // namespace runtime } // namespace mindspore diff --git a/mindspore/ccsrc/runtime/framework/graph_scheduler.h b/mindspore/ccsrc/runtime/framework/graph_scheduler.h index d3927d3392..e7ed24e188 100644 --- a/mindspore/ccsrc/runtime/framework/graph_scheduler.h +++ b/mindspore/ccsrc/runtime/framework/graph_scheduler.h @@ -22,6 +22,7 @@ #include #include #include +#include #include "runtime/framework/actor/data_source_actor.h" #include "runtime/framework/actor/loop_count_actor.h" #include "runtime/framework/actor/kernel_actor.h" @@ -66,13 +67,20 @@ class GraphScheduler { // Transform graph to actor DAG, contains build and link. ActorSet *Transform(const KernelGraphPtr &graph, const DeviceContext *device_context, - const std::vector *input_tensors = nullptr, + const std::vector *input_tensors = nullptr, GraphExecutionStrategy strategy = GraphExecutionStrategy::kPipeline); // Schedule actors in the actor runtime. Single machine scheduling is supported currently, and distributed scheduling // will be supported in the future. void Schedule(const ActorSet *actor_set); + // The prepare processing before run: + // 1. Prepare the data of device tensor store(such as weights and value nodes of graph). + // 2. Prepare the data of host tensor queue(such as non weighted parameters of graph). + // 3. Prepare the output tensor of graph. + void PrepareRun(const KernelGraphPtr &graph, const DeviceContext *device_context, + const std::vector *input_tensors, VectorRef *const &outputs); + // The processing entry of actors running. bool Run(const ActorSet *actor_set, GraphExecutionStrategy strategy = GraphExecutionStrategy::kPipeline); @@ -113,6 +121,9 @@ class GraphScheduler { // Persist device tensors of graph's some nodes(such as weights and value nodes). void PersistDeviceTensor(const KernelGraphPtr &graph); + // Fetch the hsot tensor queue by kernel graph. + HostTensorQueue *FetchHostQueue(const KernelGraphPtr &graph) const; + std::unordered_map graph_to_actors_; std::unordered_map graph_to_host_queue_;