support lite mindrt

pull/12494/head
zhangxuetong 4 years ago
parent 3952a57d85
commit 62c0577d80

@ -22,10 +22,13 @@
#include <sstream>
#include <string>
#include <string.h>
#include "actor/buserrcode.h"
#ifdef USE_GLOG
#include "utils/log_adapter.h"
#else
#include "common/log_adapter.h"
#endif
namespace mindspore {
#define BUS_LOG(severity) // LOG(severity)
@ -37,7 +40,6 @@ namespace mindspore {
#define ICTSBASE_LOG_COMMON_CODE
#define HLOG_LEVEL_INFO
#define PID_LITEBUS_LOG
//#define BUS_OOM_EXIT
#define HLOG_LEVEL_DEBUG 1
#define ICTSBASE_LOG0(logig, level, pid, format)
#define ICTSBASE_LOG1(logig, level, pid, format, para)

@ -14,6 +14,7 @@
* limitations under the License.
*/
#include <list>
#include <vector>
#include <memory>
#include <string>
@ -21,36 +22,45 @@
#include "actor/actor.h"
#include "async/uuid_base.h"
#include "async/future.h"
#include "async/async.h"
#include "mindrt/include/async/collect.h"
namespace mindspore {
// OpActor data route.
struct OpArrow {
OpArrow(int from_output_index, AID *to_op_id, int to_input_index)
OpArrow(int from_output_index, AID to_op_id, int to_input_index)
: from_output_index_(from_output_index), to_op_id_(to_op_id), to_input_index_(to_input_index) {}
int from_output_index_;
AID *to_op_id_;
AID to_op_id_;
int to_input_index_;
};
// OpActor data.
template <typename T>
struct OpData {
OpData(T *data, int to_input_index) : data_(data), to_input_index_(to_input_index) {}
OpData(const AID &op_id, T *data, int index) : op_id_(op_id), data_(data), index_(index) {}
AID op_id_;
T *data_;
int to_input_index_;
int index_;
};
using OpArrowPtr = std::shared_ptr<OpArrow>;
template <typename T>
using OpDataPtr = std::shared_ptr<OpData<T>>;
// The context of opActor running.
template <typename T>
struct OpContext {
uuids::uuid *sequential_num_;
std::vector<Promise<T *>> *results_;
std::vector<OpDataPtr<T>> *outputData_;
std::vector<Promise<int>> *results_;
void SetFailed(int32_t code) {
for (auto promise : *results_) {
promise.SetFailed(code);
}
}
void SetResult(size_t index, int value) { results_->at(index).SetValue(value); }
};
using OpArrowPtr = std::shared_ptr<OpArrow>;
template <typename T>
using OpDataPtr = std::shared_ptr<OpData<T>>;
template <typename T>
class OpActor : public ActorBase {
public:
@ -62,4 +72,38 @@ class OpActor : public ActorBase {
std::unordered_map<uuids::uuid *, std::vector<OpDataPtr<T>>> input_op_datas_;
std::vector<OpArrowPtr> output_op_arrow_;
};
template <typename T>
Future<std::list<int>> MindrtAsyncRun(const std::vector<OpDataPtr<T>> &inputData, OpContext<T> *context) {
std::list<Future<int>> futures;
for (auto promise : *(context->results_)) {
futures.push_back(promise.GetFuture());
}
Future<std::list<int>> collect = mindspore::Collect<int>(futures);
for (auto data : inputData) {
Async(data->op_id_, &mindspore::OpActor<T>::OpRun, data, context);
}
return collect;
}
template <typename T>
int MindrtRun(const std::vector<OpDataPtr<T>> &inputData, std::vector<OpDataPtr<T>> *outputData) {
OpContext<T> context;
std::vector<Promise<int>> promises(outputData->size());
uuids::uuid uid;
context.sequential_num_ = &uid;
context.results_ = &promises;
context.outputData_ = outputData;
auto collect = MindrtAsyncRun<T>(inputData, &context);
collect.Wait();
if (!collect.IsOK()) {
return -1;
}
return 0;
}
} // namespace mindspore

@ -1,49 +0,0 @@
/**
* Copyright 2021 Huawei Technologies Co., Ltd
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
#ifndef MINDSPORE_CORE_MINDRT_INCLUDE_ASYNC_ASYNCAFTER_H
#define MINDSPORE_CORE_MINDRT_INCLUDE_ASYNC_ASYNCAFTER_H
#include "async/async.h"
#include "timer/timertools.h"
constexpr mindspore::Duration MILLISECONDS = 1;
constexpr mindspore::Duration SECONDS = 1000;
namespace mindspore {
template <typename T>
Timer AsyncAfter(const Duration &duration, const AID &aid, void (T::*method)()) {
return TimerTools::AddTimer(duration, aid, [=]() { Async(aid, method); });
}
template <typename T, typename Arg0, typename Arg1>
Timer AsyncAfter(const Duration &duration, const AID &aid, void (T::*method)(Arg0), Arg1 &&arg) {
return TimerTools::AddTimer(duration, aid, [=]() { Async(aid, method, arg); });
}
template <typename T, typename... Args0, typename... Args1>
Timer AsyncAfter(const Duration &duration, const AID &aid, void (T::*method)(Args0...), Args1 &&... args) {
std::function<void(Args0...)> f([=](Args0... args0) { Async(aid, method, args0...); });
auto handler = std::bind(f, args...);
return TimerTools::AddTimer(duration, aid, [=]() { Async(aid, std::move(handler)); });
}
}; // namespace mindspore
#endif

@ -20,9 +20,11 @@
#include <future>
#include <iostream>
#include <list>
#include <memory>
#include <tuple>
#include "async/common.h"
#include "async/future.h"
#include "async/defer.h"
#include "async/spinlock.h"
#include "actor/actor.h"

@ -16,7 +16,8 @@
#ifndef MINDSPORE_CORE_MINDRT_INCLUDE_ASYNC_FUTURE_H
#define MINDSPORE_CORE_MINDRT_INCLUDE_ASYNC_FUTURE_H
#include <memory>
#include <utility>
#include <future>
#include <iostream>
#include <list>
@ -30,7 +31,7 @@
#include "litebus.hpp"
#include "future_base.h"
#include "async/future_base.h"
namespace mindspore {
@ -114,22 +115,6 @@ class Future : public FutureBase {
return data->t;
}
Option<T> Get(uint64_t timeMs) const {
if (data->gotten) {
return Option<T>(data->t);
}
if (WaitFor(timeMs).IsError()) {
return Option<T>();
}
if (data->status.IsError()) {
return Option<T>();
}
return Option<T>(Get());
}
bool Valid() const noexcept { return data->future.valid(); }
bool IsInit() const { return data->status.IsInit(); }
@ -157,32 +142,6 @@ class Future : public FutureBase {
data->future.wait();
}
WaitForStatus WaitFor(uint64_t timeMs) const {
if (!data->status.IsInit()) {
return Status::KOK;
}
AID aid = mindspore::Spawn(std::make_shared<internal::WaitActor>(
internal::WAIT_ACTOR_NAME + std::to_string(mindspore::localid_generator::GenLocalActorId())));
mindspore::Timer timer = TimerTools::AddTimer(timeMs, aid, std::bind(&internal::Waitf, aid));
OnComplete(std::bind(&internal::Wait, aid, timer));
// block
mindspore::Await(aid);
data->lock.Lock();
bool ret = data->status.IsInit();
data->lock.Unlock();
if (!ret) {
return Status::KOK;
}
return Status::KERROR;
}
template <typename F>
const Future<T> &OnComplete(internal::DeferredHelper<F> &&deferred) const {
return OnComplete(std::move(deferred).operator std::function<void(const Future<T> &)>());
@ -341,19 +300,6 @@ class Future : public FutureBase {
return OnAbandoned(std::forward<F>(f), FutureBase());
}
Future<T> After(const Duration &timeMs, const std::function<Future<T>(const Future<T> &)> &f) const {
std::shared_ptr<Promise<T>> promise(new (std::nothrow) Promise<T>());
BUS_OOM_EXIT(promise);
Future<T> future = promise->GetFuture();
mindspore::Timer timer =
TimerTools::AddTimer(timeMs, "__After__", std::bind(&internal::Afterf<T>, f, promise, *this));
OnComplete(std::bind(&internal::After<T>, promise, timer, std::placeholders::_1));
return future;
}
private:
template <typename F, typename R = typename internal::Unwrap<typename std::result_of<F(const T &)>::type>::type>
Future<R> Then(internal::DeferredHelper<F> &&f, FutureBase) const {

@ -19,6 +19,9 @@
#include <future>
#include <iostream>
#include <string>
#include <utility>
#include <memory>
#include <list>
#include "actor/actor.h"
@ -26,8 +29,6 @@
#include "async/spinlock.h"
#include "async/status.h"
#include "timer/timertools.h"
namespace mindspore {
template <typename T>
@ -104,15 +105,6 @@ struct FutureData {
namespace internal {
const std::string WAIT_ACTOR_NAME = "WACTOR_";
class WaitActor : public ActorBase {
public:
explicit WaitActor(const std::string &name) : mindspore::ActorBase(name) {}
~WaitActor() override {}
};
template <typename T>
class DeferredHelper;
@ -186,16 +178,8 @@ static void Afterf(const std::function<Future<T>(const Future<T> &)> &f, const s
promise->Associate(f(future));
}
template <typename T>
static void After(const std::shared_ptr<Promise<T>> &promise, const mindspore::Timer &timer, const Future<T> &future) {
(void)mindspore::TimerTools::Cancel(timer);
promise->Associate(future);
}
void Waitf(const AID &aid);
void Wait(const AID &aid, const mindspore::Timer &timer);
} // namespace internal
} // namespace mindspore

@ -26,7 +26,7 @@ namespace mindspore {
template <typename T>
struct InnerSome {
InnerSome(const T &t) : _t(std::move(t)) {}
explicit InnerSome(const T &t) : _t(std::move(t)) {}
T _t;
};
@ -35,7 +35,7 @@ InnerSome<typename std::decay<T>::type> Some(T &&t) {
return InnerSome<typename std::decay<T>::type>(std::forward<T>(t));
}
struct None {};
struct MindrtNone {};
template <typename T>
class Option {
@ -48,7 +48,7 @@ class Option {
Option(const InnerSome<T> &some) : data(some._t), state(SOME) {}
Option(const None &none) : data(), state(NONE) {}
Option(const MindrtNone &none) : data(), state(NONE) {}
Option(const Option<T> &that) : data(), state(that.state) {
if (that.IsSome()) {

@ -1,32 +0,0 @@
/**
* Copyright 2021 Huawei Technologies Co., Ltd
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
#ifndef __LITEBUS_DURATION_HPP__
#define __LITEBUS_DURATION_HPP__
#include <deque>
#include <functional>
#include <list>
#include <map>
#include <string>
#include <thread>
#include "async/spinlock.h"
namespace mindspore {
using Duration = uint64_t;
}
#endif

@ -1,48 +0,0 @@
/**
* Copyright 2021 Huawei Technologies Co., Ltd
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
#ifndef __LITEBUS_TIMER_HPP__
#define __LITEBUS_TIMER_HPP__
#include "actor/aid.h"
#include "timer/timewatch.h"
namespace mindspore {
class Timer {
public:
Timer();
~Timer();
bool operator==(const Timer &that) const;
// run this timer's thunk.
void operator()() const;
TimeWatch GetTimeWatch() const;
AID GetTimerAID() const;
uint64_t GetTimerID() const;
private:
friend class TimerTools;
Timer(uint64_t timerId, const TimeWatch &timeWatch, const AID &timeAid, const std::function<void()> &handler);
uint64_t id;
TimeWatch t;
AID aid;
std::function<void()> thunk;
};
} // namespace mindspore
#endif

@ -1,39 +0,0 @@
/**
* Copyright 2021 Huawei Technologies Co., Ltd
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
#ifndef __LITEBUS_TIMETOOLS_HPP__
#define __LITEBUS_TIMETOOLS_HPP__
#include <atomic>
#include <list>
#include <map>
#include <set>
#include "timer/duration.h"
#include "timer/timer.h"
namespace mindspore {
class TimerTools {
public:
static bool Initialize();
static void Finalize();
static Timer AddTimer(const Duration &duration, const AID &aid, const std::function<void()> &thunk);
static bool Cancel(const Timer &timer);
static std::atomic_bool g_initStatus;
};
} // namespace mindspore
#endif

@ -1,65 +0,0 @@
/**
* Copyright 2021 Huawei Technologies Co., Ltd
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
#ifndef __LITEBUS_TIMEWATCH_HPP__
#define __LITEBUS_TIMEWATCH_HPP__
#include "timer/duration.h"
namespace mindspore {
constexpr Duration MICRTONANO = 1000;
constexpr Duration MILLITOMICR = 1000;
constexpr Duration SECTOMILLI = 1000;
class TimeWatch {
public:
TimeWatch();
TimeWatch(const Duration &duration);
TimeWatch(const TimeWatch &that);
~TimeWatch();
// Constructs a Time instance that is the 'duration' from now.
static TimeWatch In(const Duration &duration);
static Duration Now();
TimeWatch &operator=(const TimeWatch &that);
TimeWatch &operator=(const Duration &duration);
bool operator==(const TimeWatch &that) const;
bool operator<(const TimeWatch &that) const;
bool operator<=(const TimeWatch &that) const;
// Returns the value of the timewatch as a Duration object.
Duration Time() const;
// Returns the amount of time remaining.
Duration Remaining() const;
// return true if the time expired.
bool Expired() const;
private:
Duration duration;
};
} // namespace mindspore
#endif

@ -18,7 +18,6 @@
#include "actor/actormgr.h"
#include "actor/actorpolicyinterface.h"
#include "actor/iomgr.h"
#include "utils/log_adapter.h"
namespace mindspore {

@ -17,7 +17,6 @@
#include "actor/actormgr.h"
#include "actor/actorpolicy.h"
#include "actor/iomgr.h"
#include "utils/log_adapter.h"
namespace mindspore {

@ -17,7 +17,6 @@
#include "actor/actor.h"
#include "actor/actormgr.h"
#include "actor/actorpolicy.h"
#include "utils/log_adapter.h"
namespace mindspore {

@ -16,7 +16,6 @@
#include "actor/actorthread.h"
#include <atomic>
#include "utils/log_adapter.h"
namespace mindspore {
constexpr int MAXTHREADNAMELEN = 12;
@ -97,7 +96,6 @@ void ActorThread::Run() {
terminate = true;
MS_LOG(DEBUG) << "Actor this Threads have finished exiting.";
}
} while (!terminate);
}

@ -15,7 +15,6 @@
*/
#include "actor/aid.h"
#include "utils/log_adapter.h"
namespace mindspore {
@ -121,6 +120,9 @@ std::string AID::GetIp() const {
uint16_t AID::GetPort() const {
size_t index = url.rfind(':');
if (index == std::string::npos) {
return 0;
}
return (uint16_t)std::stoul(url.substr(index + 1));
}

@ -1,85 +0,0 @@
/**
* Copyright 2021 Huawei Technologies Co., Ltd
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
#include <actor/sysmgr_actor.h>
#include "actor/actormgr.h"
#include "actor/iomgr.h"
#include "utils/log_adapter.h"
namespace mindspore {
Duration SysMgrActor::linkRecycleDuration = 10000;
void MetricsMessage::PrintMetrics() {
// print sendMetrics by default, in the future we can add more metrics format
std::ostringstream out;
while (!intTypeMetrics.empty()) {
out << intTypeMetrics.front() << "-";
intTypeMetrics.pop();
}
out << "|";
while (!stringTypeMetrics.empty()) {
std::string stringMetric = stringTypeMetrics.front();
if (stringMetric.empty()) {
out << "null"
<< "-";
} else {
out << stringMetric << "-";
}
stringTypeMetrics.pop();
}
MS_LOG(INFO) << "[format:fd-err-sum-size|to-okmsg-failmsg], value:" << out.str().c_str();
}
void SysMgrActor::SendMetricsDurationCallback() {
std::string protocol = "tcp";
std::shared_ptr<mindspore::IOMgr> ioMgrRef = ActorMgr::GetIOMgrRef(protocol);
if (ioMgrRef == nullptr) {
MS_LOG(INFO) << "tcp protocol is not exist.";
} else {
ioMgrRef->CollectMetrics();
}
(void)AsyncAfter(printSendMetricsDuration, GetAID(), &SysMgrActor::SendMetricsDurationCallback);
}
void SysMgrActor::HandleSendMetricsCallback(const AID &from, std::unique_ptr<MetricsMessage> message) {
if (message == nullptr) {
MS_LOG(WARNING) << "Can't transform to MetricsMessage.";
return;
}
message->PrintMetrics();
return;
}
void SysMgrActor::LinkRecycleDurationCallback() {
std::string protocol = "tcp";
std::shared_ptr<mindspore::IOMgr> ioMgrRef = ActorMgr::GetIOMgrRef(protocol);
if (ioMgrRef == nullptr) {
MS_LOG(INFO) << "tcp protocol is not exist.";
} else {
ioMgrRef->LinkRecycleCheck(linkRecyclePeriod);
}
(void)AsyncAfter(linkRecycleDuration, GetAID(), &SysMgrActor::LinkRecycleDurationCallback);
}
} // namespace mindspore

@ -1,93 +0,0 @@
/**
* Copyright 2021 Huawei Technologies Co., Ltd
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
#ifndef MINDSPORE_CORE_MINDRT_SRC_ACTOR_SYSMGR_ACTOR_H
#define MINDSPORE_CORE_MINDRT_SRC_ACTOR_SYSMGR_ACTOR_H
#include <queue>
#include "async/async.h"
#include "async/asyncafter.h"
#include "actor/actorapp.h"
#include "utils/log_adapter.h"
namespace mindspore {
const std::string SYSMGR_ACTOR_NAME = "SysMgrActor";
const std::string METRICS_SEND_MSGNAME = "SendMetrics";
const int LINK_RECYCLE_PERIOD_MIN = 20;
const int LINK_RECYCLE_PERIOD_MAX = 360;
using IntTypeMetrics = std::queue<int>;
using StringTypeMetrics = std::queue<std::string>;
class MetricsMessage : public MessageBase {
public:
explicit MetricsMessage(const std::string &tfrom, const std::string &tTo, const std::string &tName,
const IntTypeMetrics &tInts = IntTypeMetrics(),
const StringTypeMetrics &tStrings = StringTypeMetrics())
: MessageBase(tfrom, tTo, tName), intTypeMetrics(tInts), stringTypeMetrics(tStrings) {}
~MetricsMessage() override {}
void PrintMetrics();
private:
IntTypeMetrics intTypeMetrics;
StringTypeMetrics stringTypeMetrics;
};
class SysMgrActor : public mindspore::AppActor {
public:
explicit SysMgrActor(const std::string &name, const Duration &duration)
: mindspore::AppActor(name), printSendMetricsDuration(duration) {
linkRecyclePeriod = 0;
}
~SysMgrActor() override {}
protected:
virtual void Init() override {
MS_LOG(INFO) << "Initiaize SysMgrActor";
// register receive handle
Receive("SendMetrics", &SysMgrActor::HandleSendMetricsCallback);
// start sys manager timers
(void)AsyncAfter(printSendMetricsDuration, GetAID(), &SysMgrActor::SendMetricsDurationCallback);
char *linkRecycleEnv = getenv("LITEBUS_LINK_RECYCLE_PERIOD");
if (linkRecycleEnv != nullptr) {
int period = 0;
period = std::stoi(linkRecycleEnv);
if (period >= LINK_RECYCLE_PERIOD_MIN && period <= LINK_RECYCLE_PERIOD_MAX) {
MS_LOG(INFO) << "link recycle set:" << period;
linkRecyclePeriod = period;
(void)AsyncAfter(linkRecycleDuration, GetAID(), &SysMgrActor::LinkRecycleDurationCallback);
}
}
}
private:
void SendMetricsDurationCallback();
void HandleSendMetricsCallback(const AID &from, std::unique_ptr<MetricsMessage> message);
void LinkRecycleDurationCallback();
Duration printSendMetricsDuration;
static Duration linkRecycleDuration;
int linkRecyclePeriod;
};
} // namespace mindspore
#endif

@ -15,7 +15,6 @@
*/
#include "async/future.h"
#include "utils/log_adapter.h"
namespace mindspore {
namespace internal {
@ -25,10 +24,5 @@ void Waitf(const AID &aid) {
MS_LOG(WARNING) << "WaitFor is timeout.";
}
void Wait(const AID &aid, const mindspore::Timer &timer) {
mindspore::TimerTools::Cancel(timer);
mindspore::Terminate(aid);
}
} // namespace internal
} // namespace mindspore

@ -13,11 +13,9 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
#include <atomic>
#include <random>
#include "async/uuid_base.h"
#include <atomic>
#include "utils/log_adapter.h"
namespace mindspore {
namespace uuids {
@ -41,7 +39,7 @@ std::string uuid::ToBytes(const uuid &u) {
Option<uuid> uuid::FromBytes(const std::string &s) {
if (s.size() != UUID_SIZE) {
return None();
return MindrtNone();
}
uuid u;
memcpy(&u.uuidData, s.data(), s.size());
@ -57,7 +55,7 @@ Option<unsigned char> uuid::GetValue(char c) {
size_t pos = std::find(digitsBegin, digitsEnd, c) - digitsBegin;
if (pos >= digitsLen) {
MS_LOG(ERROR) << "invalid char";
return None();
return MindrtNone();
}
return values[pos];
}
@ -65,7 +63,7 @@ Option<unsigned char> uuid::GetValue(char c) {
Option<uuid> uuid::FromString(const std::string &s) {
auto sBegin = s.begin();
if (sBegin == s.end()) {
return None();
return MindrtNone();
}
auto c = *sBegin;
bool hasOpenBrace = (c == '{');
@ -84,12 +82,12 @@ Option<uuid> uuid::FromString(const std::string &s) {
c = *(sBegin++);
} else {
MS_LOG(ERROR) << "str invalid";
return None();
return MindrtNone();
}
}
Option<unsigned char> oc1 = GetValue(c);
if (oc1.IsNone()) {
return None();
return MindrtNone();
}
u.uuidData[i] = oc1.Get();
if (sBegin != s.end()) {
@ -98,13 +96,13 @@ Option<uuid> uuid::FromString(const std::string &s) {
u.uuidData[i] <<= SHIFT_BIT;
Option<unsigned char> oc2 = GetValue(c);
if (oc2.IsNone()) {
return None();
return MindrtNone();
}
u.uuidData[i] |= oc2.Get();
}
if ((hasOpenBrace && (c != '}')) || (sBegin != s.end())) {
MS_LOG(ERROR) << "No } end or leng invalid";
return None();
return MindrtNone();
}
return u;
}
@ -140,14 +138,14 @@ uuid RandomBasedGenerator::GenerateRandomUuid() {
std::mt19937 gen(rd());
// We use uniform distribution
std::uniform_int_distribution<unsigned long> distribution((std::numeric_limits<unsigned long>::min)(),
(std::numeric_limits<unsigned long>::max)());
std::uniform_int_distribution<uint64_t> distribution((std::numeric_limits<uint64_t>::min)(),
(std::numeric_limits<uint64_t>::max)());
unsigned long randomValue = distribution(gen);
uint64_t randomValue = distribution(gen);
unsigned int i = 0;
for (uint8_t *it = tmpUUID.BeginAddress(); it != tmpUUID.EndAddress(); ++it, ++i) {
if (i == sizeof(unsigned long)) {
if (i == sizeof(uint64_t)) {
randomValue = distribution(gen);
i = 0;
}
@ -156,9 +154,9 @@ uuid RandomBasedGenerator::GenerateRandomUuid() {
}
// use atomic ++ to replace random
static std::atomic<unsigned long> ul(1);
unsigned long lCount = ul.fetch_add(1);
unsigned long offSet = distribution(gen) % RIGHT_SHIFT_BITS;
static std::atomic<uint64_t> ul(1);
uint64_t lCount = ul.fetch_add(1);
uint64_t offSet = distribution(gen) % RIGHT_SHIFT_BITS;
auto ret = memcpy(tmpUUID.BeginAddress() + offSet, &lCount, sizeof(lCount));
if (ret != 0) {
MS_LOG(ERROR) << "memcpy_s error.";

@ -1,3 +0,0 @@
target_sources(litebus_obj PRIVATE
${CMAKE_CURRENT_SOURCE_DIR}/evloop.cc
)

File diff suppressed because it is too large Load Diff

@ -1,109 +0,0 @@
/**
* Copyright 2021 Huawei Technologies Co., Ltd
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
#ifndef __LITEBUS_EVLOOP_H__
#define __LITEBUS_EVLOOP_H__
#include <sys/epoll.h>
#include <sys/ioctl.h>
#include <functional>
#include <list>
#include <mutex>
#include <queue>
#include <sys/eventfd.h>
#include <semaphore.h>
#include "timer/duration.h"
namespace mindspore {
/*
* max epoll set size
*/
constexpr auto EPOLL_SIZE = 4096;
/*
* epoll event max size
*/
constexpr auto EPOLL_EVENTS_SIZE = 64;
typedef void (*EventHandler)(int fd, uint32_t events, void *data);
typedef struct EventData {
EventHandler handler;
void *data;
int fd;
} EventData;
class EvLoop {
public:
EvLoop() {
efd = -1;
stopLoop = 0;
queueEventfd = -1;
loopThread = 0;
};
EvLoop(const EvLoop &) = delete;
EvLoop &operator=(const EvLoop &) = delete;
bool Init(const std::string &threadName);
int AddFuncToEvLoop(std::function<void()> &&func);
int AddFdEvent(int fd, uint32_t events, EventHandler handler, void *data);
int ModifyFdEvent(int fd, uint32_t events);
int DelFdEvent(int fd);
void Finish();
~EvLoop();
int EventLoopCreate(void);
void StopEventLoop();
void EventLoopDestroy();
void EventFreeDelEvents();
void AddDeletedEvents(EventData *eventData);
int FindDeletedEvent(const EventData *tev);
void HandleEvent(const struct epoll_event *events, int nevent);
void DeleteEvent(int fd);
EventData *FindEvent(int fd);
void AddEvent(EventData *eventData);
void CleanUp();
int efd;
int stopLoop;
std::mutex loopMutex;
sem_t semId;
pthread_t loopThread;
int queueEventfd;
std::mutex queueMutex;
std::queue<std::function<void()>> queue;
std::mutex eventsLock;
// fd,EventData
std::map<int, EventData *> events;
// Just to be safe, let's use a list to preserve deleted events rather than a map. Because the caller may
// delete events on the same fd twice in once epoll_wait
std::map<int, std::list<EventData *>> deletedEvents;
};
} // namespace mindspore
#endif

@ -15,16 +15,11 @@
*/
#include <cstdlib>
#include "mindrt/src/actor/sysmgr_actor.h"
#include <atomic>
#include "mindrt/src/actor/actormgr.h"
#include "mindrt/src/actor/iomgr.h"
//#include "utils/os_utils.hpp"
#include "litebus.hpp"
#include "timer/timertools.h"
#include "litebus.h"
#include "include/litebus.h"
extern "C" {
int LitebusInitializeC(const struct LitebusConfig *config) {
@ -48,12 +43,6 @@ int LitebusInitializeC(const struct LitebusConfig *config) {
void LitebusFinalizeC() { mindspore::Finalize(); }
}
constexpr auto LITEBUSTHREADMIN = 3;
constexpr auto LITEBUSTHREADMAX = 100;
constexpr auto LITEBUSTHREADS = 10;
constexpr auto SYSMGR_TIMER_DURATION = 600000;
namespace mindspore {
namespace local {
@ -68,62 +57,7 @@ const LitebusAddress &GetLitebusAddress() {
return *local::g_litebusAddress;
}
bool SetServerIo(std::shared_ptr<mindspore::IOMgr> &io, std::string &advertiseUrl, const std::string &protocol,
const std::string &url) {
#if 0
if (protocol == "tcp") {
size_t index = advertiseUrl.find("://");
if (index != std::string::npos) {
advertiseUrl = advertiseUrl.substr(index + URL_PROTOCOL_IP_SEPARATOR.size());
}
ICTSBASE_LOG_STRING(ICTSBASE_LOG_COMMON_CODE, HLOG_LEVEL_INFO, PID_LITEBUS_LOG, "create tcp iomgr. (%s)",
"Url=%s,advertiseUrl=%s", url.c_str(), advertiseUrl.c_str());
if (local::g_litebusAddress == nullptr) {
ICTSBASE_LOG0(ICTSBASE_LOG_COMMON_CODE, HLOG_LEVEL_ERROR, PID_LITEBUS_LOG,
"Couldn't allocate memory for LitebusAddress");
return false;
}
local::g_litebusAddress->scheme = protocol;
local::g_litebusAddress->ip = AID("test@" + advertiseUrl).GetIp();
local::g_litebusAddress->port = AID("test@" + advertiseUrl).GetPort();
#ifdef HTTP_ENABLED
mindspore::HttpIOMgr::EnableHttp();
#endif
io.reset(new (std::nothrow) mindspore::TCPMgr());
if (io == nullptr) {
ICTSBASE_LOG0(ICTSBASE_LOG_COMMON_CODE, HLOG_LEVEL_ERROR, PID_LITEBUS_LOG,
"Couldn't allocate memory for TCPMgr");
return false;
}
}
#ifdef UDP_ENABLED
else if (protocol == "udp") {
ICTSBASE_LOG_STRING(ICTSBASE_LOG_COMMON_CODE, HLOG_LEVEL_INFO, PID_LITEBUS_LOG, "create udp iomgr. (%s)",
"Url=%s,advertiseUrl=%s", url.c_str(), advertiseUrl.c_str());
io.reset(new (std::nothrow) mindspore::UDPMgr());
if (io == nullptr) {
ICTSBASE_LOG0(ICTSBASE_LOG_COMMON_CODE, HLOG_LEVEL_ERROR, PID_LITEBUS_LOG,
"Couldn't allocate memory for UDPMgr");
return false;
}
}
#endif
else {
ICTSBASE_LOG_STRING(ICTSBASE_LOG_COMMON_CODE, HLOG_LEVEL_INFO, PID_LITEBUS_LOG, "unsupported protocol. (%s)",
"%s", protocol.c_str());
return false;
}
#endif
return true;
}
void SetThreadCount(int threadCount) {
int tmpThreadCount = LITEBUSTHREADS;
ActorMgr::GetActorMgrRef()->Initialize(tmpThreadCount);
}
void SetThreadCount(int threadCount) { ActorMgr::GetActorMgrRef()->Initialize(threadCount); }
class LiteBusExit {
public:
@ -141,16 +75,9 @@ int InitializeImp(const std::string &tcpUrl, const std::string &tcpUrlAdv, const
ICTSBASE_LOG0(ICTSBASE_LOG_COMMON_CODE, HLOG_LEVEL_INFO, PID_LITEBUS_LOG, "litebus starts ......");
signal(SIGPIPE, SIG_IGN);
if (!TimerTools::Initialize()) {
ICTSBASE_LOG0(ICTSBASE_LOG_COMMON_CODE, HLOG_LEVEL_ERROR, PID_LITEBUS_LOG, "Failed to initialize timer tools");
return BUS_ERROR;
}
// start actor's thread
SetThreadCount(threadCount);
mindspore::Spawn(std::make_shared<SysMgrActor>(SYSMGR_ACTOR_NAME, SYSMGR_TIMER_DURATION));
ICTSBASE_LOG0(ICTSBASE_LOG_COMMON_CODE, HLOG_LEVEL_INFO, PID_LITEBUS_LOG, "litebus has started.");
return BUS_OK;
}
@ -205,7 +132,6 @@ void Finalize() {
ICTSBASE_LOG0(ICTSBASE_LOG_COMMON_CODE, HLOG_LEVEL_INFO, PID_LITEBUS_LOG, "litebus starts to finalize.");
mindspore::ActorMgr::GetActorMgrRef()->Finalize();
TimerTools::Finalize();
ICTSBASE_LOG0(ICTSBASE_LOG_COMMON_CODE, HLOG_LEVEL_INFO, PID_LITEBUS_LOG, "litebus has been finalized.");
// flush the log in cache to disk before exiting.

@ -1,5 +0,0 @@
target_sources(litebus_obj PRIVATE
${CMAKE_CURRENT_SOURCE_DIR}/timer.cc
${CMAKE_CURRENT_SOURCE_DIR}/timertools.cc
${CMAKE_CURRENT_SOURCE_DIR}/timewatch.cc
)

Some files were not shown because too many files have changed in this diff Show More

Loading…
Cancel
Save