actorsystem.cpp 12 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323
  1. #include "defs.h"
  2. #include "actorsystem.h"
  3. #include "callstack.h"
  4. #include "cpu_manager.h"
  5. #include "mailbox.h"
  6. #include "events.h"
  7. #include "interconnect.h"
  8. #include "servicemap.h"
  9. #include "scheduler_queue.h"
  10. #include "scheduler_actor.h"
  11. #include "log.h"
  12. #include "probes.h"
  13. #include "ask.h"
  14. #include <library/cpp/actors/util/affinity.h>
  15. #include <library/cpp/actors/util/datetime.h>
  16. #include <util/generic/hash.h>
  17. #include <util/system/rwlock.h>
  18. #include <util/random/random.h>
  19. namespace NActors {
  20. LWTRACE_USING(ACTORLIB_PROVIDER);
  21. TActorSetupCmd::TActorSetupCmd()
  22. : MailboxType(TMailboxType::HTSwap)
  23. , PoolId(0)
  24. , Actor(nullptr)
  25. {
  26. }
  27. TActorSetupCmd::TActorSetupCmd(TActorSetupCmd&&) = default;
  28. TActorSetupCmd& TActorSetupCmd::operator=(TActorSetupCmd&&) = default;
  29. TActorSetupCmd::~TActorSetupCmd() = default;
  30. TActorSetupCmd::TActorSetupCmd(IActor* actor, TMailboxType::EType mailboxType, ui32 poolId)
  31. : MailboxType(mailboxType)
  32. , PoolId(poolId)
  33. , Actor(actor)
  34. {
  35. }
  36. TActorSetupCmd::TActorSetupCmd(std::unique_ptr<IActor> actor, TMailboxType::EType mailboxType, ui32 poolId)
  37. : MailboxType(mailboxType)
  38. , PoolId(poolId)
  39. , Actor(std::move(actor))
  40. {
  41. }
  42. void TActorSetupCmd::Set(std::unique_ptr<IActor> actor, TMailboxType::EType mailboxType, ui32 poolId) {
  43. MailboxType = mailboxType;
  44. PoolId = poolId;
  45. Actor = std::move(actor);
  46. }
  47. struct TActorSystem::TServiceMap : TNonCopyable {
  48. NActors::TServiceMap<TActorId, TActorId, TActorId::THash> LocalMap;
  49. TTicketLock Lock;
  50. TActorId RegisterLocalService(const TActorId& serviceId, const TActorId& actorId) {
  51. TTicketLock::TGuard guard(&Lock);
  52. const TActorId old = LocalMap.Update(serviceId, actorId);
  53. return old;
  54. }
  55. TActorId LookupLocal(const TActorId& x) {
  56. return LocalMap.Find(x);
  57. }
  58. };
  59. TActorSystem::TActorSystem(THolder<TActorSystemSetup>& setup, void* appData,
  60. TIntrusivePtr<NLog::TSettings> loggerSettings)
  61. : NodeId(setup->NodeId)
  62. , CpuManager(new TCpuManager(setup))
  63. , ExecutorPoolCount(CpuManager->GetExecutorsCount())
  64. , Scheduler(setup->Scheduler)
  65. , InterconnectCount((ui32)setup->Interconnect.ProxyActors.size())
  66. , CurrentTimestamp(0)
  67. , CurrentMonotonic(0)
  68. , CurrentIDCounter(RandomNumber<ui64>())
  69. , SystemSetup(setup.Release())
  70. , DefSelfID(NodeId, "actorsystem")
  71. , AppData0(appData)
  72. , LoggerSettings0(loggerSettings)
  73. , StartExecuted(false)
  74. , StopExecuted(false)
  75. , CleanupExecuted(false)
  76. {
  77. ServiceMap.Reset(new TServiceMap());
  78. }
  79. TActorSystem::~TActorSystem() {
  80. Cleanup();
  81. }
  82. template <TActorSystem::TEPSendFunction EPSpecificSend>
  83. bool TActorSystem::GenericSend(TAutoPtr<IEventHandle> ev) const {
  84. if (Y_UNLIKELY(!ev))
  85. return false;
  86. #ifdef USE_ACTOR_CALLSTACK
  87. ev->Callstack.TraceIfEmpty();
  88. #endif
  89. TActorId recipient = ev->GetRecipientRewrite();
  90. const ui32 recpNodeId = recipient.NodeId();
  91. if (recpNodeId != NodeId && recpNodeId != 0) {
  92. // if recipient is not local one - rewrite with forward instruction
  93. Y_DEBUG_ABORT_UNLESS(!ev->HasEvent() || ev->GetBase()->IsSerializable());
  94. Y_ABORT_UNLESS(ev->Recipient == recipient,
  95. "Event rewrite from %s to %s would be lost via interconnect",
  96. ev->Recipient.ToString().c_str(),
  97. recipient.ToString().c_str());
  98. recipient = InterconnectProxy(recpNodeId);
  99. ev->Rewrite(TEvInterconnect::EvForward, recipient);
  100. }
  101. if (recipient.IsService()) {
  102. TActorId target = ServiceMap->LookupLocal(recipient);
  103. if (!target && IsInterconnectProxyId(recipient) && ProxyWrapperFactory) {
  104. const TActorId actorId = ProxyWrapperFactory(const_cast<TActorSystem*>(this),
  105. GetInterconnectProxyNode(recipient));
  106. with_lock(ProxyCreationLock) {
  107. target = ServiceMap->LookupLocal(recipient);
  108. if (!target) {
  109. target = actorId;
  110. ServiceMap->RegisterLocalService(recipient, target);
  111. }
  112. }
  113. if (target != actorId) {
  114. // a race has occured, terminate newly created actor
  115. Send(new IEventHandle(TEvents::TSystem::Poison, 0, actorId, {}, nullptr, 0));
  116. }
  117. }
  118. recipient = target;
  119. ev->Rewrite(ev->GetTypeRewrite(), recipient);
  120. }
  121. Y_DEBUG_ABORT_UNLESS(recipient == ev->GetRecipientRewrite());
  122. const ui32 recpPool = recipient.PoolID();
  123. if (recipient && recpPool < ExecutorPoolCount) {
  124. if ((CpuManager->GetExecutorPool(recpPool)->*EPSpecificSend)(ev)) {
  125. return true;
  126. }
  127. }
  128. if (ev) {
  129. Send(IEventHandle::ForwardOnNondelivery(ev, TEvents::TEvUndelivered::ReasonActorUnknown));
  130. }
  131. return false;
  132. }
  133. template
  134. bool TActorSystem::GenericSend<&IExecutorPool::Send>(TAutoPtr<IEventHandle> ev) const;
  135. template
  136. bool TActorSystem::GenericSend<&IExecutorPool::SpecificSend>(TAutoPtr<IEventHandle> ev) const;
  137. bool TActorSystem::Send(const TActorId& recipient, IEventBase* ev, ui32 flags, ui64 cookie) const {
  138. return this->Send(new IEventHandle(recipient, DefSelfID, ev, flags, cookie));
  139. }
  140. bool TActorSystem::SpecificSend(TAutoPtr<IEventHandle> ev) const {
  141. return this->GenericSend<&IExecutorPool::SpecificSend>(ev);
  142. }
  143. bool TActorSystem::SpecificSend(TAutoPtr<IEventHandle> ev, ESendingType sendingType) const {
  144. if (!TlsThreadContext) {
  145. return this->GenericSend<&IExecutorPool::Send>(ev);
  146. } else {
  147. ESendingType previousType = std::exchange(TlsThreadContext->SendingType, sendingType);
  148. bool isSent = this->GenericSend<&IExecutorPool::SpecificSend>(ev);
  149. TlsThreadContext->SendingType = previousType;
  150. return isSent;
  151. }
  152. }
  153. void TActorSystem::Schedule(TInstant deadline, TAutoPtr<IEventHandle> ev, ISchedulerCookie* cookie) const {
  154. Schedule(deadline - Timestamp(), ev, cookie);
  155. }
  156. void TActorSystem::Schedule(TMonotonic deadline, TAutoPtr<IEventHandle> ev, ISchedulerCookie* cookie) const {
  157. const auto current = Monotonic();
  158. if (deadline < current)
  159. deadline = current;
  160. TTicketLock::TGuard guard(&ScheduleLock);
  161. ScheduleQueue->Writer.Push(deadline.MicroSeconds(), ev.Release(), cookie);
  162. }
  163. void TActorSystem::Schedule(TDuration delta, TAutoPtr<IEventHandle> ev, ISchedulerCookie* cookie) const {
  164. const auto deadline = Monotonic() + delta;
  165. TTicketLock::TGuard guard(&ScheduleLock);
  166. ScheduleQueue->Writer.Push(deadline.MicroSeconds(), ev.Release(), cookie);
  167. }
  168. NThreading::TFuture<THolder<IEventBase>> TActorSystem::AskGeneric(TMaybe<ui32> expectedEventType,
  169. TActorId recipient, THolder<IEventBase> event,
  170. TDuration timeout) {
  171. auto promise = NThreading::NewPromise<THolder<IEventBase>>();
  172. Register(MakeAskActor(expectedEventType, recipient, std::move(event), timeout, promise).Release());
  173. return promise.GetFuture();
  174. }
  175. ui64 TActorSystem::AllocateIDSpace(ui64 count) {
  176. Y_DEBUG_ABORT_UNLESS(count < Max<ui32>() / 65536);
  177. static_assert(sizeof(TAtomic) == sizeof(ui64), "expect sizeof(TAtomic) == sizeof(ui64)");
  178. // get high 32 bits as seconds from epoch
  179. // it could wrap every century, but we don't expect any actor-reference to live this long so such wrap will do no harm
  180. const ui64 timeFromEpoch = TInstant::MicroSeconds(RelaxedLoad(&CurrentTimestamp)).Seconds();
  181. // get low 32 bits as counter value
  182. ui32 lowPartEnd = (ui32)(AtomicAdd(CurrentIDCounter, count));
  183. while (lowPartEnd < count) // if our request crosses 32bit boundary - retry
  184. lowPartEnd = (ui32)(AtomicAdd(CurrentIDCounter, count));
  185. const ui64 lowPart = lowPartEnd - count;
  186. const ui64 ret = (timeFromEpoch << 32) | lowPart;
  187. return ret;
  188. }
  189. TActorId TActorSystem::InterconnectProxy(ui32 destinationNode) const {
  190. if (destinationNode < InterconnectCount)
  191. return Interconnect[destinationNode];
  192. else if (destinationNode != NodeId)
  193. return MakeInterconnectProxyId(destinationNode);
  194. else
  195. return TActorId();
  196. }
  197. ui32 TActorSystem::BroadcastToProxies(const std::function<IEventHandle*(const TActorId&)>& eventFabric) {
  198. // TODO: get rid of this method
  199. for (ui32 i = 0; i < InterconnectCount; ++i) {
  200. Send(eventFabric(Interconnect[i]));
  201. }
  202. return InterconnectCount;
  203. }
  204. TActorId TActorSystem::LookupLocalService(const TActorId& x) const {
  205. return ServiceMap->LookupLocal(x);
  206. }
  207. TActorId TActorSystem::RegisterLocalService(const TActorId& serviceId, const TActorId& actorId) {
  208. // TODO: notify old actor about demotion
  209. return ServiceMap->RegisterLocalService(serviceId, actorId);
  210. }
  211. void TActorSystem::GetPoolStats(ui32 poolId, TExecutorPoolStats& poolStats, TVector<TExecutorThreadStats>& statsCopy) const {
  212. CpuManager->GetPoolStats(poolId, poolStats, statsCopy);
  213. }
  214. THarmonizerStats TActorSystem::GetHarmonizerStats() const {
  215. return CpuManager->GetHarmonizerStats();
  216. }
  217. void TActorSystem::Start() {
  218. Y_ABORT_UNLESS(StartExecuted == false);
  219. StartExecuted = true;
  220. ScheduleQueue.Reset(new NSchedulerQueue::TQueueType());
  221. TVector<NSchedulerQueue::TReader*> scheduleReaders;
  222. scheduleReaders.push_back(&ScheduleQueue->Reader);
  223. CpuManager->PrepareStart(scheduleReaders, this);
  224. Scheduler->Prepare(this, &CurrentTimestamp, &CurrentMonotonic);
  225. Scheduler->PrepareSchedules(&scheduleReaders.front(), (ui32)scheduleReaders.size());
  226. // setup interconnect proxies
  227. {
  228. TInterconnectSetup& setup = SystemSetup->Interconnect;
  229. Interconnect.Reset(new TActorId[InterconnectCount + 1]);
  230. for (ui32 i = 0, e = InterconnectCount; i != e; ++i) {
  231. TActorSetupCmd& x = setup.ProxyActors[i];
  232. if (x.Actor) {
  233. Interconnect[i] = Register(x.Actor.release(), x.MailboxType, x.PoolId, i);
  234. Y_ABORT_UNLESS(!!Interconnect[i]);
  235. }
  236. }
  237. ProxyWrapperFactory = std::move(SystemSetup->Interconnect.ProxyWrapperFactory);
  238. }
  239. // setup local services
  240. {
  241. for (ui32 i = 0, e = (ui32)SystemSetup->LocalServices.size(); i != e; ++i) {
  242. std::pair<TActorId, TActorSetupCmd>& x = SystemSetup->LocalServices[i];
  243. const TActorId xid = Register(x.second.Actor.release(), x.second.MailboxType, x.second.PoolId, i);
  244. Y_ABORT_UNLESS(!!xid);
  245. if (!!x.first)
  246. RegisterLocalService(x.first, xid);
  247. }
  248. }
  249. Scheduler->PrepareStart();
  250. CpuManager->Start();
  251. Send(MakeSchedulerActorId(), new TEvSchedulerInitialize(scheduleReaders, &CurrentTimestamp, &CurrentMonotonic));
  252. Scheduler->Start();
  253. }
  254. void TActorSystem::Stop() {
  255. if (StopExecuted || !StartExecuted)
  256. return;
  257. StopExecuted = true;
  258. for (auto&& fn : std::exchange(DeferredPreStop, {})) {
  259. fn();
  260. }
  261. Scheduler->PrepareStop();
  262. CpuManager->PrepareStop();
  263. Scheduler->Stop();
  264. CpuManager->Shutdown();
  265. }
  266. void TActorSystem::Cleanup() {
  267. Stop();
  268. if (CleanupExecuted || !StartExecuted)
  269. return;
  270. CleanupExecuted = true;
  271. CpuManager->Cleanup();
  272. Scheduler.Destroy();
  273. }
  274. }