actorsystem.cpp 11 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277
  1. #include "defs.h"
  2. #include "actorsystem.h"
  3. #include "callstack.h"
  4. #include "cpu_manager.h"
  5. #include "mailbox.h"
  6. #include "events.h"
  7. #include "interconnect.h"
  8. #include "servicemap.h"
  9. #include "scheduler_queue.h"
  10. #include "scheduler_actor.h"
  11. #include "log.h"
  12. #include "probes.h"
  13. #include "ask.h"
  14. #include <library/cpp/actors/util/affinity.h>
  15. #include <library/cpp/actors/util/datetime.h>
  16. #include <util/generic/hash.h>
  17. #include <util/system/rwlock.h>
  18. #include <util/random/random.h>
  19. namespace NActors {
  20. LWTRACE_USING(ACTORLIB_PROVIDER);
  21. struct TActorSystem::TServiceMap : TNonCopyable {
  22. NActors::TServiceMap<TActorId, TActorId, TActorId::THash> LocalMap;
  23. TTicketLock Lock;
  24. TActorId RegisterLocalService(const TActorId& serviceId, const TActorId& actorId) {
  25. TTicketLock::TGuard guard(&Lock);
  26. const TActorId old = LocalMap.Update(serviceId, actorId);
  27. return old;
  28. }
  29. TActorId LookupLocal(const TActorId& x) {
  30. return LocalMap.Find(x);
  31. }
  32. };
  33. TActorSystem::TActorSystem(THolder<TActorSystemSetup>& setup, void* appData,
  34. TIntrusivePtr<NLog::TSettings> loggerSettings)
  35. : NodeId(setup->NodeId)
  36. , CpuManager(new TCpuManager(setup))
  37. , ExecutorPoolCount(CpuManager->GetExecutorsCount())
  38. , Scheduler(setup->Scheduler)
  39. , InterconnectCount((ui32)setup->Interconnect.ProxyActors.size())
  40. , CurrentTimestamp(0)
  41. , CurrentMonotonic(0)
  42. , CurrentIDCounter(RandomNumber<ui64>())
  43. , SystemSetup(setup.Release())
  44. , DefSelfID(NodeId, "actorsystem")
  45. , AppData0(appData)
  46. , LoggerSettings0(loggerSettings)
  47. , StartExecuted(false)
  48. , StopExecuted(false)
  49. , CleanupExecuted(false)
  50. {
  51. ServiceMap.Reset(new TServiceMap());
  52. }
  53. TActorSystem::~TActorSystem() {
  54. Cleanup();
  55. }
  56. bool TActorSystem::Send(TAutoPtr<IEventHandle> ev) const {
  57. if (Y_UNLIKELY(!ev))
  58. return false;
  59. #ifdef USE_ACTOR_CALLSTACK
  60. ev->Callstack.TraceIfEmpty();
  61. #endif
  62. TActorId recipient = ev->GetRecipientRewrite();
  63. const ui32 recpNodeId = recipient.NodeId();
  64. if (recpNodeId != NodeId && recpNodeId != 0) {
  65. // if recipient is not local one - rewrite with forward instruction
  66. Y_VERIFY_DEBUG(!ev->HasEvent() || ev->GetBase()->IsSerializable());
  67. Y_VERIFY(ev->Recipient == recipient,
  68. "Event rewrite from %s to %s would be lost via interconnect",
  69. ev->Recipient.ToString().c_str(),
  70. recipient.ToString().c_str());
  71. recipient = InterconnectProxy(recpNodeId);
  72. ev->Rewrite(TEvInterconnect::EvForward, recipient);
  73. }
  74. if (recipient.IsService()) {
  75. TActorId target = ServiceMap->LookupLocal(recipient);
  76. if (!target && IsInterconnectProxyId(recipient) && ProxyWrapperFactory) {
  77. const TActorId actorId = ProxyWrapperFactory(const_cast<TActorSystem*>(this),
  78. GetInterconnectProxyNode(recipient));
  79. with_lock(ProxyCreationLock) {
  80. target = ServiceMap->LookupLocal(recipient);
  81. if (!target) {
  82. target = actorId;
  83. ServiceMap->RegisterLocalService(recipient, target);
  84. }
  85. }
  86. if (target != actorId) {
  87. // a race has occured, terminate newly created actor
  88. Send(new IEventHandle(TEvents::TSystem::Poison, 0, actorId, {}, nullptr, 0));
  89. }
  90. }
  91. recipient = target;
  92. ev->Rewrite(ev->GetTypeRewrite(), recipient);
  93. }
  94. Y_VERIFY_DEBUG(recipient == ev->GetRecipientRewrite());
  95. const ui32 recpPool = recipient.PoolID();
  96. if (recipient && recpPool < ExecutorPoolCount) {
  97. if (CpuManager->GetExecutorPool(recpPool)->Send(ev)) {
  98. return true;
  99. }
  100. }
  101. Send(ev->ForwardOnNondelivery(TEvents::TEvUndelivered::ReasonActorUnknown));
  102. return false;
  103. }
  104. bool TActorSystem::Send(const TActorId& recipient, IEventBase* ev, ui32 flags) const {
  105. return this->Send(new IEventHandle(recipient, DefSelfID, ev, flags));
  106. }
  107. void TActorSystem::Schedule(TInstant deadline, TAutoPtr<IEventHandle> ev, ISchedulerCookie* cookie) const {
  108. Schedule(deadline - Timestamp(), ev, cookie);
  109. }
  110. void TActorSystem::Schedule(TMonotonic deadline, TAutoPtr<IEventHandle> ev, ISchedulerCookie* cookie) const {
  111. const auto current = Monotonic();
  112. if (deadline < current)
  113. deadline = current;
  114. TTicketLock::TGuard guard(&ScheduleLock);
  115. ScheduleQueue->Writer.Push(deadline.MicroSeconds(), ev.Release(), cookie);
  116. }
  117. void TActorSystem::Schedule(TDuration delta, TAutoPtr<IEventHandle> ev, ISchedulerCookie* cookie) const {
  118. const auto deadline = Monotonic() + delta;
  119. TTicketLock::TGuard guard(&ScheduleLock);
  120. ScheduleQueue->Writer.Push(deadline.MicroSeconds(), ev.Release(), cookie);
  121. }
  122. TActorId TActorSystem::Register(IActor* actor, TMailboxType::EType mailboxType, ui32 executorPool, ui64 revolvingCounter,
  123. const TActorId& parentId) {
  124. Y_VERIFY(executorPool < ExecutorPoolCount, "executorPool# %" PRIu32 ", ExecutorPoolCount# %" PRIu32,
  125. (ui32)executorPool, (ui32)ExecutorPoolCount);
  126. return CpuManager->GetExecutorPool(executorPool)->Register(actor, mailboxType, revolvingCounter, parentId);
  127. }
  128. NThreading::TFuture<THolder<IEventBase>> TActorSystem::AskGeneric(TMaybe<ui32> expectedEventType,
  129. TActorId recipient, THolder<IEventBase> event,
  130. TDuration timeout) {
  131. auto promise = NThreading::NewPromise<THolder<IEventBase>>();
  132. Register(MakeAskActor(expectedEventType, recipient, std::move(event), timeout, promise).Release());
  133. return promise.GetFuture();
  134. }
  135. ui64 TActorSystem::AllocateIDSpace(ui64 count) {
  136. Y_VERIFY_DEBUG(count < Max<ui32>() / 65536);
  137. static_assert(sizeof(TAtomic) == sizeof(ui64), "expect sizeof(TAtomic) == sizeof(ui64)");
  138. // get high 32 bits as seconds from epoch
  139. // it could wrap every century, but we don't expect any actor-reference to live this long so such wrap will do no harm
  140. const ui64 timeFromEpoch = TInstant::MicroSeconds(RelaxedLoad(&CurrentTimestamp)).Seconds();
  141. // get low 32 bits as counter value
  142. ui32 lowPartEnd = (ui32)(AtomicAdd(CurrentIDCounter, count));
  143. while (lowPartEnd < count) // if our request crosses 32bit boundary - retry
  144. lowPartEnd = (ui32)(AtomicAdd(CurrentIDCounter, count));
  145. const ui64 lowPart = lowPartEnd - count;
  146. const ui64 ret = (timeFromEpoch << 32) | lowPart;
  147. return ret;
  148. }
  149. TActorId TActorSystem::InterconnectProxy(ui32 destinationNode) const {
  150. if (destinationNode < InterconnectCount)
  151. return Interconnect[destinationNode];
  152. else if (destinationNode != NodeId)
  153. return MakeInterconnectProxyId(destinationNode);
  154. else
  155. return TActorId();
  156. }
  157. ui32 TActorSystem::BroadcastToProxies(const std::function<IEventHandle*(const TActorId&)>& eventFabric) {
  158. // TODO: get rid of this method
  159. for (ui32 i = 0; i < InterconnectCount; ++i) {
  160. Send(eventFabric(Interconnect[i]));
  161. }
  162. return InterconnectCount;
  163. }
  164. TActorId TActorSystem::LookupLocalService(const TActorId& x) const {
  165. return ServiceMap->LookupLocal(x);
  166. }
  167. TActorId TActorSystem::RegisterLocalService(const TActorId& serviceId, const TActorId& actorId) {
  168. // TODO: notify old actor about demotion
  169. return ServiceMap->RegisterLocalService(serviceId, actorId);
  170. }
  171. void TActorSystem::GetPoolStats(ui32 poolId, TExecutorPoolStats& poolStats, TVector<TExecutorThreadStats>& statsCopy) const {
  172. CpuManager->GetPoolStats(poolId, poolStats, statsCopy);
  173. }
  174. void TActorSystem::Start() {
  175. Y_VERIFY(StartExecuted == false);
  176. StartExecuted = true;
  177. ScheduleQueue.Reset(new NSchedulerQueue::TQueueType());
  178. TVector<NSchedulerQueue::TReader*> scheduleReaders;
  179. scheduleReaders.push_back(&ScheduleQueue->Reader);
  180. CpuManager->PrepareStart(scheduleReaders, this);
  181. Scheduler->Prepare(this, &CurrentTimestamp, &CurrentMonotonic);
  182. Scheduler->PrepareSchedules(&scheduleReaders.front(), (ui32)scheduleReaders.size());
  183. // setup interconnect proxies
  184. {
  185. const TInterconnectSetup& setup = SystemSetup->Interconnect;
  186. Interconnect.Reset(new TActorId[InterconnectCount + 1]);
  187. for (ui32 i = 0, e = InterconnectCount; i != e; ++i) {
  188. const TActorSetupCmd& x = setup.ProxyActors[i];
  189. if (x.Actor) {
  190. Interconnect[i] = Register(x.Actor, x.MailboxType, x.PoolId, i);
  191. Y_VERIFY(!!Interconnect[i]);
  192. }
  193. }
  194. ProxyWrapperFactory = std::move(SystemSetup->Interconnect.ProxyWrapperFactory);
  195. }
  196. // setup local services
  197. {
  198. for (ui32 i = 0, e = (ui32)SystemSetup->LocalServices.size(); i != e; ++i) {
  199. const std::pair<TActorId, TActorSetupCmd>& x = SystemSetup->LocalServices[i];
  200. const TActorId xid = Register(x.second.Actor, x.second.MailboxType, x.second.PoolId, i);
  201. Y_VERIFY(!!xid);
  202. if (!!x.first)
  203. RegisterLocalService(x.first, xid);
  204. }
  205. }
  206. // ok, setup complete, we could destroy setup config
  207. SystemSetup.Destroy();
  208. Scheduler->PrepareStart();
  209. CpuManager->Start();
  210. Send(MakeSchedulerActorId(), new TEvSchedulerInitialize(scheduleReaders, &CurrentTimestamp, &CurrentMonotonic));
  211. Scheduler->Start();
  212. }
  213. void TActorSystem::Stop() {
  214. if (StopExecuted || !StartExecuted)
  215. return;
  216. StopExecuted = true;
  217. for (auto&& fn : std::exchange(DeferredPreStop, {})) {
  218. fn();
  219. }
  220. Scheduler->PrepareStop();
  221. CpuManager->PrepareStop();
  222. Scheduler->Stop();
  223. CpuManager->Shutdown();
  224. }
  225. void TActorSystem::Cleanup() {
  226. Stop();
  227. if (CleanupExecuted || !StartExecuted)
  228. return;
  229. CleanupExecuted = true;
  230. CpuManager->Cleanup();
  231. Scheduler.Destroy();
  232. }
  233. ui32 TActorSystem::MemProfActivityBase;
  234. }