mailbox.cpp 22 KB


  1. #include "mailbox.h"
  2. #include "actorsystem.h"
  3. #include <library/cpp/actors/util/datetime.h>
  4. #include <util/system/sanitizers.h>
  5. namespace NActors {
  6. TMailboxTable::TMailboxTable()
  7. : LastAllocatedLine(0)
  8. , AllocatedMailboxCount(0)
  9. , CachedSimpleMailboxes(0)
  10. , CachedRevolvingMailboxes(0)
  11. , CachedHTSwapMailboxes(0)
  12. , CachedReadAsFilledMailboxes(0)
  13. , CachedTinyReadAsFilledMailboxes(0)
  14. {
  15. memset((void*)Lines, 0, sizeof(Lines));
  16. }
  17. bool IsGoodForCleanup(const TMailboxHeader* header) {
  18. switch (AtomicLoad(&header->ExecutionState)) {
  19. case TMailboxHeader::TExecutionState::Inactive:
  20. case TMailboxHeader::TExecutionState::Scheduled:
  21. return true;
  22. case TMailboxHeader::TExecutionState::Leaving:
  23. case TMailboxHeader::TExecutionState::Executing:
  24. case TMailboxHeader::TExecutionState::LeavingMarked:
  25. return false;
  26. case TMailboxHeader::TExecutionState::Free:
  27. case TMailboxHeader::TExecutionState::FreeScheduled:
  28. return true;
  29. case TMailboxHeader::TExecutionState::FreeLeaving:
  30. case TMailboxHeader::TExecutionState::FreeExecuting:
  31. case TMailboxHeader::TExecutionState::FreeLeavingMarked:
  32. return false;
  33. default:
  34. Y_FAIL();
  35. }
  36. }
  37. template <typename TMailbox>
  38. void DestructMailboxLine(ui8* begin, ui8* end) {
  39. const ui32 sx = TMailbox::AlignedSize();
  40. for (ui8* x = begin; x + sx <= end; x += sx) {
  41. TMailbox* mailbox = reinterpret_cast<TMailbox*>(x);
  42. Y_VERIFY(IsGoodForCleanup(mailbox));
  43. mailbox->ExecutionState = Max<ui32>();
  44. mailbox->~TMailbox();
  45. }
  46. }
  47. template <typename TMailbox>
  48. bool CleanupMailboxLine(ui8* begin, ui8* end) {
  49. const ui32 sx = TMailbox::AlignedSize();
  50. bool done = true;
  51. for (ui8* x = begin; x + sx <= end; x += sx) {
  52. TMailbox* mailbox = reinterpret_cast<TMailbox*>(x);
  53. Y_VERIFY(IsGoodForCleanup(mailbox));
  54. done &= mailbox->CleanupActors() && mailbox->CleanupEvents();
  55. }
  56. return done;
  57. }
  58. TMailboxTable::~TMailboxTable() {
  59. // on cleanup we must traverse everything and free stuff
  60. for (ui32 i = 0; i < LastAllocatedLine; ++i) {
  61. if (TMailboxLineHeader* lineHeader = Lines[i]) {
  62. switch (lineHeader->MailboxType) {
  63. case TMailboxType::Simple:
  64. DestructMailboxLine<TSimpleMailbox>((ui8*)lineHeader + 64, (ui8*)lineHeader + LineSize);
  65. break;
  66. case TMailboxType::Revolving:
  67. DestructMailboxLine<TRevolvingMailbox>((ui8*)lineHeader + 64, (ui8*)lineHeader + LineSize);
  68. break;
  69. case TMailboxType::HTSwap:
  70. DestructMailboxLine<THTSwapMailbox>((ui8*)lineHeader + 64, (ui8*)lineHeader + LineSize);
  71. break;
  72. case TMailboxType::ReadAsFilled:
  73. DestructMailboxLine<TReadAsFilledMailbox>((ui8*)lineHeader + 64, (ui8*)lineHeader + LineSize);
  74. break;
  75. case TMailboxType::TinyReadAsFilled:
  76. DestructMailboxLine<TTinyReadAsFilledMailbox>((ui8*)lineHeader + 64, (ui8*)lineHeader + LineSize);
  77. break;
  78. default:
  79. Y_FAIL();
  80. }
  81. lineHeader->~TMailboxLineHeader();
  82. free(lineHeader);
  83. Lines[i] = nullptr;
  84. }
  85. }
  86. while (MailboxCacheSimple.Pop(0))
  87. ;
  88. while (MailboxCacheRevolving.Pop(0))
  89. ;
  90. while (MailboxCacheHTSwap.Pop(0))
  91. ;
  92. while (MailboxCacheReadAsFilled.Pop(0))
  93. ;
  94. while (MailboxCacheTinyReadAsFilled.Pop(0))
  95. ;
  96. }
  97. bool TMailboxTable::Cleanup() {
  98. bool done = true;
  99. for (ui32 i = 0; i < LastAllocatedLine; ++i) {
  100. if (TMailboxLineHeader* lineHeader = Lines[i]) {
  101. switch (lineHeader->MailboxType) {
  102. case TMailboxType::Simple:
  103. done &= CleanupMailboxLine<TSimpleMailbox>((ui8*)lineHeader + 64, (ui8*)lineHeader + LineSize);
  104. break;
  105. case TMailboxType::Revolving:
  106. done &= CleanupMailboxLine<TRevolvingMailbox>((ui8*)lineHeader + 64, (ui8*)lineHeader + LineSize);
  107. break;
  108. case TMailboxType::HTSwap:
  109. done &= CleanupMailboxLine<THTSwapMailbox>((ui8*)lineHeader + 64, (ui8*)lineHeader + LineSize);
  110. break;
  111. case TMailboxType::ReadAsFilled:
  112. done &= CleanupMailboxLine<TReadAsFilledMailbox>((ui8*)lineHeader + 64, (ui8*)lineHeader + LineSize);
  113. break;
  114. case TMailboxType::TinyReadAsFilled:
  115. done &= CleanupMailboxLine<TTinyReadAsFilledMailbox>((ui8*)lineHeader + 64, (ui8*)lineHeader + LineSize);
  116. break;
  117. default:
  118. Y_FAIL();
  119. }
  120. }
  121. }
  122. return done;
  123. }
  124. TMailboxHeader* TMailboxTable::Get(ui32 hint) {
  125. // get line
  126. const ui32 lineIndex = (hint & LineIndexMask) >> LineIndexShift;
  127. const ui32 lineHint = hint & LineHintMask;
  128. Y_VERIFY((lineIndex < MaxLines) && (lineHint < LineSize / 64));
  129. if (lineHint == 0)
  130. return nullptr;
  131. if (TMailboxLineHeader* const x = AtomicLoad(Lines + lineIndex)) {
  132. switch (x->MailboxType) {
  133. case TMailboxType::Simple:
  134. return TSimpleMailbox::Get(lineHint, x);
  135. case TMailboxType::Revolving:
  136. return TRevolvingMailbox::Get(lineHint, x);
  137. case TMailboxType::HTSwap:
  138. return THTSwapMailbox::Get(lineHint, x);
  139. case TMailboxType::ReadAsFilled:
  140. return TReadAsFilledMailbox::Get(lineHint, x);
  141. case TMailboxType::TinyReadAsFilled:
  142. return TTinyReadAsFilledMailbox::Get(lineHint, x);
  143. default:
  144. Y_VERIFY_DEBUG(false);
  145. break;
  146. }
  147. }
  148. return nullptr;
  149. }
  150. bool TMailboxTable::SendTo(TAutoPtr<IEventHandle>& ev, IExecutorPool* executorPool) {
  151. const TActorId& recipient = ev->GetRecipientRewrite();
  152. const ui32 hint = recipient.Hint();
  153. // copy-paste from Get to avoid duplicated type-switches
  154. const ui32 lineIndex = (hint & LineIndexMask) >> LineIndexShift;
  155. const ui32 lineHint = hint & LineHintMask;
  156. Y_VERIFY((lineIndex < MaxLines) && (lineHint < LineSize / 64));
  157. if (lineHint == 0)
  158. return false;
  159. if (TMailboxLineHeader* const x = AtomicLoad(Lines + lineIndex)) {
  160. switch (x->MailboxType) {
  161. case TMailboxType::Simple: {
  162. TSimpleMailbox* const mailbox = TSimpleMailbox::Get(lineHint, x);
  163. #if (!defined(_tsan_enabled_))
  164. Y_VERIFY_DEBUG(mailbox->Type == (ui32)x->MailboxType);
  165. #endif
  166. mailbox->Queue.Push(ev.Release());
  167. if (mailbox->MarkForSchedule()) {
  168. RelaxedStore<NHPTimer::STime>(&mailbox->ScheduleMoment, GetCycleCountFast());
  169. executorPool->ScheduleActivation(hint);
  170. }
  171. }
  172. return true;
  173. case TMailboxType::Revolving: {
  174. // The actorid could be stale and coming from a different machine. If local process has restarted than
  175. // the stale actorid coming from a remote machine might be referencing an actor with simple mailbox
  176. // which is smaller than revolving mailbox. In this cases 'lineHint' index might be greater than actual
  177. // array size. Normally its ok to store stale event to other actor's valid mailbox beacuse Receive will
  178. // compare receiver actor id and discard stale event. But in this case we should discard the event right away
  179. // instead of trying to enque it to a mailbox at invalid address.
  180. // NOTE: lineHint is 1-based
  181. static_assert(TSimpleMailbox::AlignedSize() <= TRevolvingMailbox::AlignedSize(),
  182. "We expect that one line can store more simple mailboxes than revolving mailboxes");
  183. if (lineHint > TRevolvingMailbox::MaxMailboxesInLine())
  184. return false;
  185. TRevolvingMailbox* const mailbox = TRevolvingMailbox::Get(lineHint, x);
  186. #if (!defined(_tsan_enabled_))
  187. Y_VERIFY_DEBUG(mailbox->Type == (ui32)x->MailboxType);
  188. #endif
  189. mailbox->QueueWriter.Push(ev.Release());
  190. if (mailbox->MarkForSchedule()) {
  191. RelaxedStore<NHPTimer::STime>(&mailbox->ScheduleMoment, GetCycleCountFast());
  192. executorPool->ScheduleActivation(hint);
  193. }
  194. }
  195. return true;
  196. case TMailboxType::HTSwap: {
  197. THTSwapMailbox* const mailbox = THTSwapMailbox::Get(lineHint, x);
  198. #if (!defined(_tsan_enabled_))
  199. Y_VERIFY_DEBUG(mailbox->Type == (ui32)x->MailboxType);
  200. #endif
  201. mailbox->Queue.Push(ev.Release());
  202. if (mailbox->MarkForSchedule()) {
  203. RelaxedStore<NHPTimer::STime>(&mailbox->ScheduleMoment, GetCycleCountFast());
  204. executorPool->ScheduleActivation(hint);
  205. }
  206. }
  207. return true;
  208. case TMailboxType::ReadAsFilled: {
  209. if (lineHint > TReadAsFilledMailbox::MaxMailboxesInLine())
  210. return false;
  211. TReadAsFilledMailbox* const mailbox = TReadAsFilledMailbox::Get(lineHint, x);
  212. #if (!defined(_tsan_enabled_))
  213. Y_VERIFY_DEBUG(mailbox->Type == (ui32)x->MailboxType);
  214. #endif
  215. mailbox->Queue.Push(ev.Release());
  216. if (mailbox->MarkForSchedule()) {
  217. RelaxedStore<NHPTimer::STime>(&mailbox->ScheduleMoment, GetCycleCountFast());
  218. executorPool->ScheduleActivation(hint);
  219. }
  220. }
  221. return true;
  222. case TMailboxType::TinyReadAsFilled: {
  223. if (lineHint > TTinyReadAsFilledMailbox::MaxMailboxesInLine())
  224. return false;
  225. TTinyReadAsFilledMailbox* const mailbox = TTinyReadAsFilledMailbox::Get(lineHint, x);
  226. #if (!defined(_tsan_enabled_))
  227. Y_VERIFY_DEBUG(mailbox->Type == (ui32)x->MailboxType);
  228. #endif
  229. mailbox->Queue.Push(ev.Release());
  230. if (mailbox->MarkForSchedule()) {
  231. RelaxedStore<NHPTimer::STime>(&mailbox->ScheduleMoment, GetCycleCountFast());
  232. executorPool->ScheduleActivation(hint);
  233. }
  234. }
  235. return true;
  236. default:
  237. Y_FAIL("unknown mailbox type");
  238. }
  239. }
  240. return false;
  241. }
  242. ui32 TMailboxTable::AllocateMailbox(TMailboxType::EType type, ui64 revolvingCounter) {
  243. ui32 x = TryAllocateMailbox(type, revolvingCounter);
  244. if (x == 0)
  245. x = AllocateNewLine(type);
  246. return x;
  247. }
  248. ui32 TMailboxTable::TryAllocateMailbox(TMailboxType::EType type, ui64 revolvingCounter) {
  249. switch (type) {
  250. case TMailboxType::Simple:
  251. do {
  252. if (ui32 ret = MailboxCacheSimple.Pop(revolvingCounter)) {
  253. AtomicDecrement(CachedSimpleMailboxes);
  254. return ret;
  255. }
  256. } while (AtomicGet(CachedSimpleMailboxes) > (MailboxCacheSimple.Concurrency * 512));
  257. return 0;
  258. case TMailboxType::Revolving:
  259. do {
  260. if (ui32 ret = MailboxCacheRevolving.Pop(revolvingCounter)) {
  261. AtomicDecrement(CachedRevolvingMailboxes);
  262. return ret;
  263. }
  264. } while (AtomicGet(CachedRevolvingMailboxes) > (MailboxCacheRevolving.Concurrency * 512));
  265. return 0;
  266. case TMailboxType::HTSwap:
  267. do {
  268. if (ui32 ret = MailboxCacheHTSwap.Pop(revolvingCounter)) {
  269. AtomicDecrement(CachedHTSwapMailboxes);
  270. return ret;
  271. }
  272. } while (AtomicGet(CachedHTSwapMailboxes) > (MailboxCacheHTSwap.Concurrency * 512));
  273. return 0;
  274. case TMailboxType::ReadAsFilled:
  275. do {
  276. if (ui32 ret = MailboxCacheReadAsFilled.Pop(revolvingCounter)) {
  277. AtomicDecrement(CachedReadAsFilledMailboxes);
  278. return ret;
  279. }
  280. } while (AtomicGet(CachedReadAsFilledMailboxes) > (MailboxCacheReadAsFilled.Concurrency * 512));
  281. return 0;
  282. case TMailboxType::TinyReadAsFilled:
  283. do {
  284. if (ui32 ret = MailboxCacheTinyReadAsFilled.Pop(revolvingCounter)) {
  285. AtomicDecrement(CachedTinyReadAsFilledMailboxes);
  286. return ret;
  287. }
  288. } while (AtomicGet(CachedTinyReadAsFilledMailboxes) > (MailboxCacheTinyReadAsFilled.Concurrency * 512));
  289. return 0;
  290. default:
  291. Y_FAIL("Unknown mailbox type");
  292. }
  293. }
  294. void TMailboxTable::ReclaimMailbox(TMailboxType::EType type, ui32 hint, ui64 revolvingCounter) {
  295. if (hint != 0) {
  296. switch (type) {
  297. case TMailboxType::Simple:
  298. MailboxCacheSimple.Push(hint, revolvingCounter);
  299. AtomicIncrement(CachedSimpleMailboxes);
  300. break;
  301. case TMailboxType::Revolving:
  302. MailboxCacheRevolving.Push(hint, revolvingCounter);
  303. AtomicIncrement(CachedRevolvingMailboxes);
  304. break;
  305. case TMailboxType::HTSwap:
  306. MailboxCacheHTSwap.Push(hint, revolvingCounter);
  307. AtomicIncrement(CachedHTSwapMailboxes);
  308. break;
  309. case TMailboxType::ReadAsFilled:
  310. MailboxCacheReadAsFilled.Push(hint, revolvingCounter);
  311. AtomicIncrement(CachedReadAsFilledMailboxes);
  312. break;
  313. case TMailboxType::TinyReadAsFilled:
  314. MailboxCacheTinyReadAsFilled.Push(hint, revolvingCounter);
  315. AtomicIncrement(CachedTinyReadAsFilledMailboxes);
  316. break;
  317. default:
  318. Y_FAIL();
  319. }
  320. }
  321. }
  322. TMailboxHeader::TMailboxHeader(TMailboxType::EType type)
  323. : ExecutionState(TExecutionState::Free)
  324. , Reserved(0)
  325. , Type(type)
  326. , ActorPack(TMailboxActorPack::Simple)
  327. , Knobs(0)
  328. {
  329. ActorsInfo.Simple.ActorId = 0;
  330. ActorsInfo.Simple.Actor = nullptr;
  331. }
  332. TMailboxHeader::~TMailboxHeader() {
  333. CleanupActors();
  334. }
  335. bool TMailboxHeader::CleanupActors() {
  336. bool done = true;
  337. switch (ActorPack) {
  338. case TMailboxActorPack::Simple: {
  339. if (ActorsInfo.Simple.ActorId != 0) {
  340. delete ActorsInfo.Simple.Actor;
  341. done = false;
  342. }
  343. break;
  344. }
  345. case TMailboxActorPack::Map: {
  346. for (auto& [actorId, actor] : *ActorsInfo.Map.ActorsMap) {
  347. delete actor;
  348. }
  349. delete ActorsInfo.Map.ActorsMap;
  350. done = false;
  351. break;
  352. }
  353. case TMailboxActorPack::Array: {
  354. for (ui64 i = 0; i < ActorsInfo.Array.ActorsCount; ++i) {
  355. delete ActorsInfo.Array.ActorsArray->Actors[i].Actor;
  356. }
  357. delete ActorsInfo.Array.ActorsArray;
  358. done = false;
  359. break;
  360. }
  361. }
  362. ActorPack = TMailboxActorPack::Simple;
  363. ActorsInfo.Simple.ActorId = 0;
  364. ActorsInfo.Simple.Actor = nullptr;
  365. return done;
  366. }
  367. std::pair<ui32, ui32> TMailboxHeader::CountMailboxEvents(ui64 localActorId, ui32 maxTraverse) {
  368. switch (Type) {
  369. case TMailboxType::Simple:
  370. return static_cast<TMailboxTable::TSimpleMailbox*>(this)->CountSimpleMailboxEvents(localActorId, maxTraverse);
  371. case TMailboxType::Revolving:
  372. return static_cast<TMailboxTable::TRevolvingMailbox*>(this)->CountRevolvingMailboxEvents(localActorId, maxTraverse);
  373. default:
  374. return {0, 0};
  375. }
  376. }
  377. TMailboxTable::TSimpleMailbox::TSimpleMailbox()
  378. : TMailboxHeader(TMailboxType::Simple)
  379. , ScheduleMoment(0)
  380. {
  381. }
  382. TMailboxTable::TSimpleMailbox::~TSimpleMailbox() {
  383. CleanupEvents();
  384. }
  385. bool TMailboxTable::TSimpleMailbox::CleanupEvents() {
  386. const bool done = (Queue.Head() == nullptr);
  387. while (IEventHandle* ev = Queue.Pop())
  388. delete ev;
  389. return done;
  390. }
  391. std::pair<ui32, ui32> TMailboxTable::TSimpleMailbox::CountSimpleMailboxEvents(ui64 localActorId, ui32 maxTraverse) {
  392. ui32 local = 0;
  393. ui32 total = 0;
  394. auto it = Queue.ReadIterator();
  395. while (IEventHandle* x = it.Next()) {
  396. ++total;
  397. if (x->GetRecipientRewrite().LocalId() == localActorId)
  398. ++local;
  399. if (total >= maxTraverse)
  400. break;
  401. }
  402. return std::make_pair(local, total);
  403. }
  404. TMailboxTable::TRevolvingMailbox::TRevolvingMailbox()
  405. : TMailboxHeader(TMailboxType::Revolving)
  406. , QueueWriter(QueueReader)
  407. , Reserved1(0)
  408. , Reserved2(0)
  409. , ScheduleMoment(0)
  410. {
  411. }
  412. TMailboxTable::TRevolvingMailbox::~TRevolvingMailbox() {
  413. CleanupEvents();
  414. }
  415. bool TMailboxTable::TRevolvingMailbox::CleanupEvents() {
  416. const bool done = (QueueReader.Head() == nullptr);
  417. while (IEventHandle* ev = QueueReader.Pop())
  418. delete ev;
  419. return done;
  420. }
  421. std::pair<ui32, ui32> TMailboxTable::TRevolvingMailbox::CountRevolvingMailboxEvents(ui64 localActorId, ui32 maxTraverse) {
  422. ui32 local = 0;
  423. ui32 total = 0;
  424. auto it = QueueReader.Iterator();
  425. while (IEventHandle* x = it.Next()) {
  426. ++total;
  427. if (x->GetRecipientRewrite().LocalId() == localActorId)
  428. ++local;
  429. if (total >= maxTraverse)
  430. break;
  431. }
  432. return std::make_pair(local, total);
  433. }
  434. template <typename T>
  435. static ui32 InitNewLine(ui8* x, ui8* end) {
  436. const ui32 sx = T::AlignedSize();
  437. for (ui32 index = 1; x + sx <= end; x += sx, ++index)
  438. ::new (x) T();
  439. return sx;
  440. }
  441. ui32 TMailboxTable::AllocateNewLine(TMailboxType::EType type) {
  442. ui8* ptr = (ui8*)malloc(LineSize);
  443. ui8* end = ptr + LineSize;
  444. const ui32 lineIndex = (ui32)AtomicIncrement(LastAllocatedLine) - 1;
  445. const ui32 lineIndexMask = (lineIndex << LineIndexShift) & LineIndexMask;
  446. // first 64 bytes is TMailboxLineHeader
  447. TMailboxLineHeader* header = ::new (ptr) TMailboxLineHeader(type, lineIndex);
  448. ui8* x = ptr + 64;
  449. ui32 sx = 0;
  450. TMailboxCache* cache = nullptr;
  451. TAtomic* counter = nullptr;
  452. switch (type) {
  453. case TMailboxType::Simple:
  454. sx = InitNewLine<TSimpleMailbox>(x, end);
  455. cache = &MailboxCacheSimple;
  456. counter = &CachedSimpleMailboxes;
  457. break;
  458. case TMailboxType::Revolving:
  459. sx = InitNewLine<TRevolvingMailbox>(x, end);
  460. cache = &MailboxCacheRevolving;
  461. counter = &CachedRevolvingMailboxes;
  462. break;
  463. case TMailboxType::HTSwap:
  464. sx = InitNewLine<THTSwapMailbox>(x, end);
  465. cache = &MailboxCacheHTSwap;
  466. counter = &CachedHTSwapMailboxes;
  467. break;
  468. case TMailboxType::ReadAsFilled:
  469. sx = InitNewLine<TReadAsFilledMailbox>(x, end);
  470. cache = &MailboxCacheReadAsFilled;
  471. counter = &CachedReadAsFilledMailboxes;
  472. break;
  473. case TMailboxType::TinyReadAsFilled:
  474. sx = InitNewLine<TTinyReadAsFilledMailbox>(x, end);
  475. cache = &MailboxCacheTinyReadAsFilled;
  476. counter = &CachedTinyReadAsFilledMailboxes;
  477. break;
  478. default:
  479. Y_FAIL();
  480. }
  481. AtomicStore(Lines + lineIndex, header);
  482. ui32 ret = lineIndexMask | 1;
  483. ui32 index = 2;
  484. for (ui32 endIndex = LineSize / sx; index != endIndex;) {
  485. const ui32 bufSize = 8;
  486. ui32 buf[bufSize];
  487. ui32 bufIndex;
  488. for (bufIndex = 0; index != endIndex && bufIndex != bufSize; ++bufIndex, ++index)
  489. buf[bufIndex] = lineIndexMask | index;
  490. cache->PushBulk(buf, bufIndex, index);
  491. AtomicAdd(*counter, bufIndex);
  492. }
  493. AtomicAdd(AllocatedMailboxCount, index - 1);
  494. return ret;
  495. }
  496. }