actorsystem.h 13 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367
  1. #pragma once
  2. #include "defs.h"
  3. #include "actor.h"
  4. #include "balancer.h"
  5. #include "config.h"
  6. #include "event.h"
  7. #include "log_settings.h"
  8. #include "scheduler_cookie.h"
  9. #include "mon_stats.h"
  10. #include <library/cpp/threading/future/future.h>
  11. #include <library/cpp/actors/util/ticket_lock.h>
  12. #include <util/generic/vector.h>
  13. #include <util/datetime/base.h>
  14. #include <util/system/mutex.h>
  15. namespace NActors {
  16. class TActorSystem;
  17. class TCpuManager;
  18. class IExecutorPool;
  19. struct TWorkerContext;
  20. inline TActorId MakeInterconnectProxyId(ui32 destNodeId) {
  21. char data[12];
  22. memcpy(data, "ICProxy@", 8);
  23. memcpy(data + 8, &destNodeId, sizeof(ui32));
  24. return TActorId(0, TStringBuf(data, 12));
  25. }
  26. inline bool IsInterconnectProxyId(const TActorId& actorId) {
  27. return actorId.IsService() && !memcmp(actorId.ServiceId().data(), "ICProxy@", 8);
  28. }
  29. inline ui32 GetInterconnectProxyNode(const TActorId& actorId) {
  30. ui32 nodeId;
  31. memcpy(&nodeId, actorId.ServiceId().data() + 8, sizeof(ui32));
  32. return nodeId;
  33. }
  34. namespace NSchedulerQueue {
  35. class TReader;
  36. struct TQueueType;
  37. }
  38. class IExecutorPool : TNonCopyable {
  39. public:
  40. const ui32 PoolId;
  41. TAtomic ActorRegistrations;
  42. TAtomic DestroyedActors;
  43. IExecutorPool(ui32 poolId)
  44. : PoolId(poolId)
  45. , ActorRegistrations(0)
  46. , DestroyedActors(0)
  47. {
  48. }
  49. virtual ~IExecutorPool() {
  50. }
  51. // for workers
  52. virtual ui32 GetReadyActivation(TWorkerContext& wctx, ui64 revolvingCounter) = 0;
  53. virtual void ReclaimMailbox(TMailboxType::EType mailboxType, ui32 hint, TWorkerId workerId, ui64 revolvingCounter) = 0;
  54. /**
  55. * Schedule one-shot event that will be send at given time point in the future.
  56. *
  57. * @param deadline the wallclock time point in future when event must be send
  58. * @param ev the event to send
  59. * @param cookie cookie that will be piggybacked with event
  60. * @param workerId index of thread which will perform event dispatching
  61. */
  62. virtual void Schedule(TInstant deadline, TAutoPtr<IEventHandle> ev, ISchedulerCookie* cookie, TWorkerId workerId) = 0;
  63. /**
  64. * Schedule one-shot event that will be send at given time point in the future.
  65. *
  66. * @param deadline the monotonic time point in future when event must be send
  67. * @param ev the event to send
  68. * @param cookie cookie that will be piggybacked with event
  69. * @param workerId index of thread which will perform event dispatching
  70. */
  71. virtual void Schedule(TMonotonic deadline, TAutoPtr<IEventHandle> ev, ISchedulerCookie* cookie, TWorkerId workerId) = 0;
  72. /**
  73. * Schedule one-shot event that will be send after given delay.
  74. *
  75. * @param delta the time from now to delay event sending
  76. * @param ev the event to send
  77. * @param cookie cookie that will be piggybacked with event
  78. * @param workerId index of thread which will perform event dispatching
  79. */
  80. virtual void Schedule(TDuration delta, TAutoPtr<IEventHandle> ev, ISchedulerCookie* cookie, TWorkerId workerId) = 0;
  81. // for actorsystem
  82. virtual bool Send(TAutoPtr<IEventHandle>& ev) = 0;
  83. virtual void ScheduleActivation(ui32 activation) = 0;
  84. virtual void ScheduleActivationEx(ui32 activation, ui64 revolvingCounter) = 0;
  85. virtual TActorId Register(IActor* actor, TMailboxType::EType mailboxType, ui64 revolvingCounter, const TActorId& parentId) = 0;
  86. virtual TActorId Register(IActor* actor, TMailboxHeader* mailbox, ui32 hint, const TActorId& parentId) = 0;
  87. // lifecycle stuff
  88. virtual void Prepare(TActorSystem* actorSystem, NSchedulerQueue::TReader** scheduleReaders, ui32* scheduleSz) = 0;
  89. virtual void Start() = 0;
  90. virtual void PrepareStop() = 0;
  91. virtual void Shutdown() = 0;
  92. virtual bool Cleanup() = 0;
  93. virtual void GetCurrentStats(TExecutorPoolStats& poolStats, TVector<TExecutorThreadStats>& statsCopy) const {
  94. // TODO: make pure virtual and override everywhere
  95. Y_UNUSED(poolStats);
  96. Y_UNUSED(statsCopy);
  97. }
  98. virtual TString GetName() const {
  99. return TString();
  100. }
  101. virtual ui32 GetThreads() const {
  102. return 1;
  103. }
  104. // generic
  105. virtual TAffinity* Affinity() const = 0;
  106. virtual void SetRealTimeMode() const {}
  107. };
  108. // could be proxy to in-pool schedulers (for NUMA-aware executors)
  109. class ISchedulerThread : TNonCopyable {
  110. public:
  111. virtual ~ISchedulerThread() {
  112. }
  113. virtual void Prepare(TActorSystem* actorSystem, volatile ui64* currentTimestamp, volatile ui64* currentMonotonic) = 0;
  114. virtual void PrepareSchedules(NSchedulerQueue::TReader** readers, ui32 scheduleReadersCount) = 0;
  115. virtual void PrepareStart() { /* empty */ }
  116. virtual void Start() = 0;
  117. virtual void PrepareStop() = 0;
  118. virtual void Stop() = 0;
  119. };
  120. struct TActorSetupCmd {
  121. TMailboxType::EType MailboxType;
  122. ui32 PoolId;
  123. IActor* Actor;
  124. TActorSetupCmd()
  125. : MailboxType(TMailboxType::HTSwap)
  126. , PoolId(0)
  127. , Actor(nullptr)
  128. {
  129. }
  130. TActorSetupCmd(IActor* actor, TMailboxType::EType mailboxType, ui32 poolId)
  131. : MailboxType(mailboxType)
  132. , PoolId(poolId)
  133. , Actor(actor)
  134. {
  135. }
  136. void Set(IActor* actor, TMailboxType::EType mailboxType, ui32 poolId) {
  137. MailboxType = mailboxType;
  138. PoolId = poolId;
  139. Actor = actor;
  140. }
  141. };
  142. using TProxyWrapperFactory = std::function<TActorId(TActorSystem*, ui32)>;
  143. struct TInterconnectSetup {
  144. TVector<TActorSetupCmd> ProxyActors;
  145. TProxyWrapperFactory ProxyWrapperFactory;
  146. };
  147. struct TActorSystemSetup {
  148. ui32 NodeId = 0;
  149. // Either Executors or CpuManager must be initialized
  150. ui32 ExecutorsCount = 0;
  151. TArrayHolder<TAutoPtr<IExecutorPool>> Executors;
  152. TAutoPtr<IBalancer> Balancer; // main implementation will be implicitly created if not set
  153. TCpuManagerConfig CpuManager;
  154. TAutoPtr<ISchedulerThread> Scheduler;
  155. ui32 MaxActivityType = 5; // for default entries
  156. TInterconnectSetup Interconnect;
  157. using TLocalServices = TVector<std::pair<TActorId, TActorSetupCmd>>;
  158. TLocalServices LocalServices;
  159. ui32 GetExecutorsCount() const {
  160. return Executors ? ExecutorsCount : CpuManager.GetExecutorsCount();
  161. }
  162. TString GetPoolName(ui32 poolId) const {
  163. return Executors ? Executors[poolId]->GetName() : CpuManager.GetPoolName(poolId);
  164. }
  165. ui32 GetThreads(ui32 poolId) const {
  166. return Executors ? Executors[poolId]->GetThreads() : CpuManager.GetThreads(poolId);
  167. }
  168. };
  169. class TActorSystem : TNonCopyable {
  170. struct TServiceMap;
  171. public:
  172. const ui32 NodeId;
  173. private:
  174. THolder<TCpuManager> CpuManager;
  175. const ui32 ExecutorPoolCount;
  176. TAutoPtr<ISchedulerThread> Scheduler;
  177. THolder<TServiceMap> ServiceMap;
  178. const ui32 InterconnectCount;
  179. TArrayHolder<TActorId> Interconnect;
  180. volatile ui64 CurrentTimestamp;
  181. volatile ui64 CurrentMonotonic;
  182. volatile ui64 CurrentIDCounter;
  183. THolder<NSchedulerQueue::TQueueType> ScheduleQueue;
  184. mutable TTicketLock ScheduleLock;
  185. friend class TExecutorThread;
  186. THolder<TActorSystemSetup> SystemSetup;
  187. TActorId DefSelfID;
  188. void* AppData0;
  189. TIntrusivePtr<NLog::TSettings> LoggerSettings0;
  190. TProxyWrapperFactory ProxyWrapperFactory;
  191. TMutex ProxyCreationLock;
  192. bool StartExecuted;
  193. bool StopExecuted;
  194. bool CleanupExecuted;
  195. std::deque<std::function<void()>> DeferredPreStop;
  196. public:
  197. TActorSystem(THolder<TActorSystemSetup>& setup, void* appData = nullptr,
  198. TIntrusivePtr<NLog::TSettings> loggerSettings = TIntrusivePtr<NLog::TSettings>(nullptr));
  199. ~TActorSystem();
  200. void Start();
  201. void Stop();
  202. void Cleanup();
  203. TActorId Register(IActor* actor, TMailboxType::EType mailboxType = TMailboxType::HTSwap, ui32 executorPool = 0,
  204. ui64 revolvingCounter = 0, const TActorId& parentId = TActorId());
  205. bool Send(TAutoPtr<IEventHandle> ev) const;
  206. bool Send(const TActorId& recipient, IEventBase* ev, ui32 flags = 0) const;
  207. /**
  208. * Schedule one-shot event that will be send at given time point in the future.
  209. *
  210. * @param deadline the wallclock time point in future when event must be send
  211. * @param ev the event to send
  212. * @param cookie cookie that will be piggybacked with event
  213. */
  214. void Schedule(TInstant deadline, TAutoPtr<IEventHandle> ev, ISchedulerCookie* cookie = nullptr) const;
  215. /**
  216. * Schedule one-shot event that will be send at given time point in the future.
  217. *
  218. * @param deadline the monotonic time point in future when event must be send
  219. * @param ev the event to send
  220. * @param cookie cookie that will be piggybacked with event
  221. */
  222. void Schedule(TMonotonic deadline, TAutoPtr<IEventHandle> ev, ISchedulerCookie* cookie = nullptr) const;
  223. /**
  224. * Schedule one-shot event that will be send after given delay.
  225. *
  226. * @param delta the time from now to delay event sending
  227. * @param ev the event to send
  228. * @param cookie cookie that will be piggybacked with event
  229. */
  230. void Schedule(TDuration delta, TAutoPtr<IEventHandle> ev, ISchedulerCookie* cookie = nullptr) const;
  231. /**
  232. * A way to interact with actors from non-actor context.
  233. *
  234. * This method will send the `event` to the `recipient` and then will wait for a response. When response arrives,
  235. * it will be passed to the future. If response is not of type `T`, the future will resolve into an exception.
  236. *
  237. * @tparam T expected response type. Must be derived from `TEventBase`,
  238. * or use `IEventBase` to catch any response.
  239. * @param actorSystem actor system that will be used to register an actor that'll wait for response.
  240. * @param recipient who will get a request.
  241. * @param event a request message.
  242. * @return future that will be resolved when a message from `recipient` arrives.
  243. */
  244. template <typename T>
  245. [[nodiscard]]
  246. NThreading::TFuture<THolder<T>> Ask(TActorId recipient, THolder<IEventBase> event, TDuration timeout = TDuration::Max()) {
  247. if constexpr (std::is_same_v<T, IEventBase>) {
  248. return AskGeneric(Nothing(), recipient, std::move(event), timeout);
  249. } else {
  250. return AskGeneric(T::EventType, recipient, std::move(event), timeout)
  251. .Apply([](const NThreading::TFuture<THolder<IEventBase>>& ev) {
  252. return THolder<T>(static_cast<T*>(const_cast<THolder<IEventBase>&>(ev.GetValueSync()).Release())); // =(
  253. });
  254. }
  255. }
  256. [[nodiscard]]
  257. NThreading::TFuture<THolder<IEventBase>> AskGeneric(
  258. TMaybe<ui32> expectedEventType,
  259. TActorId recipient,
  260. THolder<IEventBase> event,
  261. TDuration timeout);
  262. ui64 AllocateIDSpace(ui64 count);
  263. TActorId InterconnectProxy(ui32 destinationNode) const;
  264. ui32 BroadcastToProxies(const std::function<IEventHandle*(const TActorId&)>&);
  265. void UpdateLinkStatus(ui8 status, ui32 destinationNode);
  266. ui8 LinkStatus(ui32 destinationNode);
  267. TActorId LookupLocalService(const TActorId& x) const;
  268. TActorId RegisterLocalService(const TActorId& serviceId, const TActorId& actorId);
  269. ui32 GetMaxActivityType() const {
  270. return SystemSetup ? SystemSetup->MaxActivityType : 1;
  271. }
  272. TInstant Timestamp() const {
  273. return TInstant::MicroSeconds(RelaxedLoad(&CurrentTimestamp));
  274. }
  275. TMonotonic Monotonic() const {
  276. return TMonotonic::MicroSeconds(RelaxedLoad(&CurrentMonotonic));
  277. }
  278. template <typename T>
  279. T* AppData() const {
  280. return (T*)AppData0;
  281. }
  282. NLog::TSettings* LoggerSettings() const {
  283. return LoggerSettings0.Get();
  284. }
  285. void GetPoolStats(ui32 poolId, TExecutorPoolStats& poolStats, TVector<TExecutorThreadStats>& statsCopy) const;
  286. void DeferPreStop(std::function<void()> fn) {
  287. DeferredPreStop.push_back(std::move(fn));
  288. }
  289. /* This is the base for memory profiling tags.
  290. System sets memory profiling tag for debug version of lfalloc.
  291. The tag is set as "base_tag + actor_activity_type". */
  292. static ui32 MemProfActivityBase;
  293. };
  294. }