event_pb.h 22 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654
  1. #pragma once
  2. #include "event.h"
  3. #include "event_load.h"
  4. #include <google/protobuf/io/zero_copy_stream.h>
  5. #include <google/protobuf/arena.h>
  6. #include <library/cpp/actors/protos/actors.pb.h>
  7. #include <library/cpp/containers/stack_vector/stack_vec.h>
  8. #include <util/generic/deque.h>
  9. #include <util/system/context.h>
  10. #include <util/system/filemap.h>
  11. #include <util/string/builder.h>
  12. #include <util/thread/lfstack.h>
  13. #include <array>
  14. #include <span>
  15. // enable only when patch with this macro was successfully deployed
  16. #define USE_EXTENDED_PAYLOAD_FORMAT 0
  17. namespace NActors {
  18. class TRopeStream : public NProtoBuf::io::ZeroCopyInputStream {
  19. TRope::TConstIterator Iter;
  20. const size_t Size;
  21. public:
  22. TRopeStream(TRope::TConstIterator iter, size_t size)
  23. : Iter(iter)
  24. , Size(size)
  25. {}
  26. bool Next(const void** data, int* size) override;
  27. void BackUp(int count) override;
  28. bool Skip(int count) override;
  29. int64_t ByteCount() const override {
  30. return TotalByteCount;
  31. }
  32. private:
  33. int64_t TotalByteCount = 0;
  34. };
  35. class TChunkSerializer : public NProtoBuf::io::ZeroCopyOutputStream {
  36. public:
  37. TChunkSerializer() = default;
  38. virtual ~TChunkSerializer() = default;
  39. virtual bool WriteRope(const TRope *rope) = 0;
  40. virtual bool WriteString(const TString *s) = 0;
  41. };
  42. class TAllocChunkSerializer final : public TChunkSerializer {
  43. public:
  44. bool Next(void** data, int* size) override;
  45. void BackUp(int count) override;
  46. int64_t ByteCount() const override {
  47. return Buffers->GetSize();
  48. }
  49. bool WriteAliasedRaw(const void* data, int size) override;
  50. // WARNING: these methods require owner to retain ownership and immutability of passed objects
  51. bool WriteRope(const TRope *rope) override;
  52. bool WriteString(const TString *s) override;
  53. inline TIntrusivePtr<TEventSerializedData> Release(TEventSerializationInfo&& serializationInfo) {
  54. Buffers->SetSerializationInfo(std::move(serializationInfo));
  55. return std::move(Buffers);
  56. }
  57. protected:
  58. TIntrusivePtr<TEventSerializedData> Buffers = new TEventSerializedData;
  59. TRope Backup;
  60. };
  61. class TCoroutineChunkSerializer final : public TChunkSerializer, protected ITrampoLine {
  62. public:
  63. using TChunk = std::pair<const char*, size_t>;
  64. TCoroutineChunkSerializer();
  65. ~TCoroutineChunkSerializer();
  66. void SetSerializingEvent(const IEventBase *event);
  67. void Abort();
  68. std::span<TChunk> FeedBuf(void* data, size_t size);
  69. bool IsComplete() const {
  70. return !Event;
  71. }
  72. bool IsSuccessfull() const {
  73. return SerializationSuccess;
  74. }
  75. const IEventBase *GetCurrentEvent() const {
  76. return Event;
  77. }
  78. bool Next(void** data, int* size) override;
  79. void BackUp(int count) override;
  80. int64_t ByteCount() const override {
  81. return TotalSerializedDataSize;
  82. }
  83. bool WriteAliasedRaw(const void* data, int size) override;
  84. bool AllowsAliasing() const override;
  85. bool WriteRope(const TRope *rope) override;
  86. bool WriteString(const TString *s) override;
  87. protected:
  88. void DoRun() override;
  89. void Resume();
  90. void Produce(const void *data, size_t size);
  91. i64 TotalSerializedDataSize;
  92. TMappedAllocation Stack;
  93. TContClosure SelfClosure;
  94. TContMachineContext InnerContext;
  95. TContMachineContext *BufFeedContext = nullptr;
  96. char *BufferPtr;
  97. size_t SizeRemain;
  98. std::vector<TChunk> Chunks;
  99. const IEventBase *Event = nullptr;
  100. bool CancelFlag = false;
  101. bool AbortFlag;
  102. bool SerializationSuccess;
  103. bool Finished = false;
  104. };
  105. struct TProtoArenaHolder : public TAtomicRefCount<TProtoArenaHolder> {
  106. google::protobuf::Arena Arena;
  107. TProtoArenaHolder() = default;
  108. explicit TProtoArenaHolder(const google::protobuf::ArenaOptions& arenaOptions)
  109. : Arena(arenaOptions)
  110. {};
  111. google::protobuf::Arena* Get() {
  112. return &Arena;
  113. }
  114. template<typename TRecord>
  115. TRecord* Allocate() {
  116. return google::protobuf::Arena::CreateMessage<TRecord>(&Arena);
  117. }
  118. };
  119. static const size_t EventMaxByteSize = 140 << 20; // (140MB)
  120. template <typename TEv, typename TRecord /*protobuf record*/, ui32 TEventType, typename TRecHolder>
  121. class TEventPBBase: public TEventBase<TEv, TEventType> , public TRecHolder {
  122. // a vector of data buffers referenced by record; if filled, then extended serialization mechanism applies
  123. TVector<TRope> Payload;
  124. size_t TotalPayloadSize = 0;
  125. public:
  126. using TRecHolder::Record;
  127. public:
  128. using ProtoRecordType = TRecord;
  129. TEventPBBase() = default;
  130. explicit TEventPBBase(const TRecord& rec)
  131. : TRecHolder(rec)
  132. {}
  133. explicit TEventPBBase(TRecord&& rec)
  134. : TRecHolder(rec)
  135. {}
  136. explicit TEventPBBase(TIntrusivePtr<TProtoArenaHolder> arena)
  137. : TRecHolder(std::move(arena))
  138. {}
  139. TString ToStringHeader() const override {
  140. return Record.GetTypeName();
  141. }
  142. TString ToString() const override {
  143. TStringStream ss;
  144. ss << ToStringHeader() << " " << Record.ShortDebugString();
  145. return ss.Str();
  146. }
  147. bool IsSerializable() const override {
  148. return true;
  149. }
  150. bool SerializeToArcadiaStream(TChunkSerializer* chunker) const override {
  151. return SerializeToArcadiaStreamImpl(chunker, TString());
  152. }
  153. ui32 CalculateSerializedSize() const override {
  154. ssize_t result = Record.ByteSize();
  155. if (result >= 0 && Payload) {
  156. ++result; // marker
  157. char buf[MaxNumberBytes];
  158. result += SerializeNumber(Payload.size(), buf);
  159. for (const TRope& rope : Payload) {
  160. result += SerializeNumber(rope.GetSize(), buf);
  161. }
  162. result += TotalPayloadSize;
  163. }
  164. return result;
  165. }
  166. static IEventBase* Load(TEventSerializedData *input) {
  167. THolder<TEventPBBase> ev(new TEv());
  168. if (!input->GetSize()) {
  169. Y_PROTOBUF_SUPPRESS_NODISCARD ev->Record.ParseFromString(TString());
  170. } else {
  171. TRope::TConstIterator iter = input->GetBeginIter();
  172. ui64 size = input->GetSize();
  173. if (const auto& info = input->GetSerializationInfo(); info.IsExtendedFormat) {
  174. // check marker
  175. if (!iter.Valid() || (*iter.ContiguousData() != PayloadMarker && *iter.ContiguousData() != ExtendedPayloadMarker)) {
  176. Y_FAIL("invalid event");
  177. }
  178. const bool dataIsSeparate = *iter.ContiguousData() == ExtendedPayloadMarker; // ropes go after sizes
  179. auto fetchRope = [&](size_t len) {
  180. TRope::TConstIterator begin = iter;
  181. iter += len;
  182. size -= len;
  183. ev->Payload.emplace_back(begin, iter);
  184. ev->TotalPayloadSize += len;
  185. };
  186. // skip marker
  187. iter += 1;
  188. --size;
  189. // parse number of payload ropes
  190. size_t numRopes = DeserializeNumber(iter, size);
  191. if (numRopes == Max<size_t>()) {
  192. Y_FAIL("invalid event");
  193. }
  194. TStackVec<size_t, 16> ropeLens;
  195. if (dataIsSeparate) {
  196. ropeLens.reserve(numRopes);
  197. }
  198. while (numRopes--) {
  199. // parse length of the rope
  200. const size_t len = DeserializeNumber(iter, size);
  201. if (len == Max<size_t>() || size < len) {
  202. Y_FAIL("invalid event len# %zu size# %" PRIu64, len, size);
  203. }
  204. // extract the rope
  205. if (dataIsSeparate) {
  206. ropeLens.push_back(len);
  207. } else {
  208. fetchRope(len);
  209. }
  210. }
  211. for (size_t len : ropeLens) {
  212. fetchRope(len);
  213. }
  214. }
  215. // parse the protobuf
  216. TRopeStream stream(iter, size);
  217. if (!ev->Record.ParseFromZeroCopyStream(&stream)) {
  218. Y_FAIL("Failed to parse protobuf event type %" PRIu32 " class %s", TEventType, TypeName(ev->Record).data());
  219. }
  220. }
  221. ev->CachedByteSize = input->GetSize();
  222. return ev.Release();
  223. }
  224. size_t GetCachedByteSize() const {
  225. if (CachedByteSize == 0) {
  226. CachedByteSize = CalculateSerializedSize();
  227. }
  228. return CachedByteSize;
  229. }
  230. ui32 CalculateSerializedSizeCached() const override {
  231. return GetCachedByteSize();
  232. }
  233. void InvalidateCachedByteSize() {
  234. CachedByteSize = 0;
  235. }
  236. TEventSerializationInfo CreateSerializationInfo() const override {
  237. return CreateSerializationInfoImpl(0);
  238. }
  239. bool AllowExternalDataChannel() const {
  240. return TotalPayloadSize >= 4096;
  241. }
  242. public:
  243. void ReservePayload(size_t size) {
  244. Payload.reserve(size);
  245. }
  246. ui32 AddPayload(TRope&& rope) {
  247. const ui32 id = Payload.size();
  248. TotalPayloadSize += rope.size();
  249. Payload.push_back(std::move(rope));
  250. InvalidateCachedByteSize();
  251. return id;
  252. }
  253. const TRope& GetPayload(ui32 id) const {
  254. Y_VERIFY(id < Payload.size());
  255. return Payload[id];
  256. }
  257. ui32 GetPayloadCount() const {
  258. return Payload.size();
  259. }
  260. void StripPayload() {
  261. Payload.clear();
  262. TotalPayloadSize = 0;
  263. }
  264. protected:
  265. TEventSerializationInfo CreateSerializationInfoImpl(size_t preserializedSize) const {
  266. TEventSerializationInfo info;
  267. info.IsExtendedFormat = static_cast<bool>(Payload);
  268. if (static_cast<const TEv&>(*this).AllowExternalDataChannel()) {
  269. if (Payload) {
  270. char temp[MaxNumberBytes];
  271. #if USE_EXTENDED_PAYLOAD_FORMAT
  272. size_t headerLen = 1 + SerializeNumber(Payload.size(), temp);
  273. for (const TRope& rope : Payload) {
  274. headerLen += SerializeNumber(rope.size(), temp);
  275. }
  276. info.Sections.push_back(TEventSectionInfo{0, headerLen, 0, 0, true});
  277. for (const TRope& rope : Payload) {
  278. info.Sections.push_back(TEventSectionInfo{0, rope.size(), 0, 0, false});
  279. }
  280. #else
  281. info.Sections.push_back(TEventSectionInfo{0, 1 + SerializeNumber(Payload.size(), temp), 0, 0, true}); // payload marker and rope count
  282. for (const TRope& rope : Payload) {
  283. const size_t ropeSize = rope.GetSize();
  284. info.Sections.back().Size += SerializeNumber(ropeSize, temp);
  285. info.Sections.push_back(TEventSectionInfo{0, ropeSize, 0, 0, false}); // data as a separate section
  286. }
  287. #endif
  288. }
  289. const size_t byteSize = Max<ssize_t>(0, Record.ByteSize()) + preserializedSize;
  290. info.Sections.push_back(TEventSectionInfo{0, byteSize, 0, 0, true}); // protobuf itself
  291. #ifndef NDEBUG
  292. size_t total = 0;
  293. for (const auto& section : info.Sections) {
  294. total += section.Size;
  295. }
  296. size_t serialized = CalculateSerializedSize();
  297. Y_VERIFY(total == serialized, "total# %zu serialized# %zu byteSize# %zd Payload.size# %zu", total,
  298. serialized, byteSize, Payload.size());
  299. #endif
  300. }
  301. return info;
  302. }
  303. bool SerializeToArcadiaStreamImpl(TChunkSerializer* chunker, const TString& preserialized) const {
  304. // serialize payload first
  305. if (Payload) {
  306. void *data;
  307. int size = 0;
  308. auto append = [&](const char *p, size_t len) {
  309. while (len) {
  310. if (size) {
  311. const size_t numBytesToCopy = std::min<size_t>(size, len);
  312. memcpy(data, p, numBytesToCopy);
  313. data = static_cast<char*>(data) + numBytesToCopy;
  314. size -= numBytesToCopy;
  315. p += numBytesToCopy;
  316. len -= numBytesToCopy;
  317. } else if (!chunker->Next(&data, &size)) {
  318. return false;
  319. }
  320. }
  321. return true;
  322. };
  323. auto appendNumber = [&](size_t number) {
  324. char buf[MaxNumberBytes];
  325. return append(buf, SerializeNumber(number, buf));
  326. };
  327. #if USE_EXTENDED_PAYLOAD_FORMAT
  328. char marker = ExtendedPayloadMarker;
  329. append(&marker, 1);
  330. if (!appendNumber(Payload.size())) {
  331. return false;
  332. }
  333. for (const TRope& rope : Payload) {
  334. if (!appendNumber(rope.GetSize())) {
  335. return false;
  336. }
  337. }
  338. if (size) {
  339. chunker->BackUp(std::exchange(size, 0));
  340. }
  341. for (const TRope& rope : Payload) {
  342. if (!chunker->WriteRope(&rope)) {
  343. return false;
  344. }
  345. }
  346. #else
  347. char marker = PayloadMarker;
  348. append(&marker, 1);
  349. if (!appendNumber(Payload.size())) {
  350. return false;
  351. }
  352. for (const TRope& rope : Payload) {
  353. if (!appendNumber(rope.GetSize())) {
  354. return false;
  355. }
  356. if (rope) {
  357. if (size) {
  358. chunker->BackUp(std::exchange(size, 0));
  359. }
  360. if (!chunker->WriteRope(&rope)) {
  361. return false;
  362. }
  363. }
  364. }
  365. if (size) {
  366. chunker->BackUp(size);
  367. }
  368. #endif
  369. }
  370. if (preserialized && !chunker->WriteString(&preserialized)) {
  371. return false;
  372. }
  373. return Record.SerializeToZeroCopyStream(chunker);
  374. }
  375. protected:
  376. mutable size_t CachedByteSize = 0;
  377. static constexpr char ExtendedPayloadMarker = 0x06;
  378. static constexpr char PayloadMarker = 0x07;
  379. static constexpr size_t MaxNumberBytes = (sizeof(size_t) * CHAR_BIT + 6) / 7;
  380. static size_t SerializeNumber(size_t num, char *buffer) {
  381. char *begin = buffer;
  382. do {
  383. *buffer++ = (num & 0x7F) | (num >= 128 ? 0x80 : 0x00);
  384. num >>= 7;
  385. } while (num);
  386. return buffer - begin;
  387. }
  388. static size_t DeserializeNumber(const char **ptr, const char *end) {
  389. const char *p = *ptr;
  390. size_t res = 0;
  391. size_t offset = 0;
  392. for (;;) {
  393. if (p == end) {
  394. return Max<size_t>();
  395. }
  396. const char byte = *p++;
  397. res |= (static_cast<size_t>(byte) & 0x7F) << offset;
  398. offset += 7;
  399. if (!(byte & 0x80)) {
  400. break;
  401. }
  402. }
  403. *ptr = p;
  404. return res;
  405. }
  406. static size_t DeserializeNumber(TRope::TConstIterator& iter, ui64& size) {
  407. size_t res = 0;
  408. size_t offset = 0;
  409. for (;;) {
  410. if (!iter.Valid()) {
  411. return Max<size_t>();
  412. }
  413. const char byte = *iter.ContiguousData();
  414. iter += 1;
  415. --size;
  416. res |= (static_cast<size_t>(byte) & 0x7F) << offset;
  417. offset += 7;
  418. if (!(byte & 0x80)) {
  419. break;
  420. }
  421. }
  422. return res;
  423. }
  424. };
  425. // Protobuf record not using arena
  426. template <typename TRecord>
  427. struct TRecordHolder {
  428. TRecord Record;
  429. TRecordHolder() = default;
  430. TRecordHolder(const TRecord& rec)
  431. : Record(rec)
  432. {}
  433. TRecordHolder(TRecord&& rec)
  434. : Record(std::move(rec))
  435. {}
  436. };
  437. // Protobuf arena and a record allocated on it
  438. template <typename TRecord, size_t InitialBlockSize, size_t MaxBlockSize>
  439. struct TArenaRecordHolder {
  440. TIntrusivePtr<TProtoArenaHolder> Arena;
  441. TRecord& Record;
  442. // Arena depends on block size to be a multiple of 8 for correctness
  443. // FIXME: uncomment these asserts when code is synchronized between repositories
  444. // static_assert((InitialBlockSize & 7) == 0, "Misaligned InitialBlockSize");
  445. // static_assert((MaxBlockSize & 7) == 0, "Misaligned MaxBlockSize");
  446. static const google::protobuf::ArenaOptions GetArenaOptions() {
  447. google::protobuf::ArenaOptions opts;
  448. opts.initial_block_size = InitialBlockSize;
  449. opts.max_block_size = MaxBlockSize;
  450. return opts;
  451. }
  452. TArenaRecordHolder()
  453. : Arena(MakeIntrusive<TProtoArenaHolder>(GetArenaOptions()))
  454. , Record(*Arena->Allocate<TRecord>())
  455. {};
  456. TArenaRecordHolder(const TRecord& rec)
  457. : TArenaRecordHolder()
  458. {
  459. Record.CopyFrom(rec);
  460. }
  461. // not allowed to move from another protobuf, it's a potenial copying
  462. TArenaRecordHolder(TRecord&& rec) = delete;
  463. TArenaRecordHolder(TIntrusivePtr<TProtoArenaHolder> arena)
  464. : Arena(std::move(arena))
  465. , Record(*Arena->Allocate<TRecord>())
  466. {};
  467. };
  468. template <typename TEv, typename TRecord, ui32 TEventType>
  469. class TEventPB : public TEventPBBase<TEv, TRecord, TEventType, TRecordHolder<TRecord> > {
  470. typedef TEventPBBase<TEv, TRecord, TEventType, TRecordHolder<TRecord> > TPbBase;
  471. // NOTE: No extra fields allowed: TEventPB must be a "template typedef"
  472. public:
  473. using TPbBase::TPbBase;
  474. };
  475. template <typename TEv, typename TRecord, ui32 TEventType, size_t InitialBlockSize = 512, size_t MaxBlockSize = 16*1024>
  476. using TEventPBWithArena = TEventPBBase<TEv, TRecord, TEventType, TArenaRecordHolder<TRecord, InitialBlockSize, MaxBlockSize> >;
  477. template <typename TEv, typename TRecord, ui32 TEventType>
  478. class TEventShortDebugPB: public TEventPB<TEv, TRecord, TEventType> {
  479. public:
  480. using TBase = TEventPB<TEv, TRecord, TEventType>;
  481. TEventShortDebugPB() = default;
  482. explicit TEventShortDebugPB(const TRecord& rec)
  483. : TBase(rec)
  484. {
  485. }
  486. explicit TEventShortDebugPB(TRecord&& rec)
  487. : TBase(std::move(rec))
  488. {
  489. }
  490. TString ToString() const override {
  491. return TypeName<TEv>() + " { " + TBase::Record.ShortDebugString() + " }";
  492. }
  493. };
  494. template <typename TEv, typename TRecord, ui32 TEventType>
  495. class TEventPreSerializedPB: public TEventPB<TEv, TRecord, TEventType> {
  496. protected:
  497. using TBase = TEventPB<TEv, TRecord, TEventType>;
  498. using TSelf = TEventPreSerializedPB<TEv, TRecord, TEventType>;
  499. using TBase::Record;
  500. public:
  501. TString PreSerializedData; // already serialized PB data (using message::SerializeToString)
  502. TEventPreSerializedPB() = default;
  503. explicit TEventPreSerializedPB(const TRecord& rec)
  504. : TBase(rec)
  505. {
  506. }
  507. explicit TEventPreSerializedPB(TRecord&& rec)
  508. : TBase(std::move(rec))
  509. {
  510. }
  511. // when remote event received locally this method will merge preserialized data
  512. const TRecord& GetRecord() {
  513. TRecord& base(TBase::Record);
  514. if (!PreSerializedData.empty()) {
  515. TRecord copy;
  516. Y_PROTOBUF_SUPPRESS_NODISCARD copy.ParseFromString(PreSerializedData);
  517. copy.MergeFrom(base);
  518. base.Swap(&copy);
  519. PreSerializedData.clear();
  520. }
  521. return TBase::Record;
  522. }
  523. const TRecord& GetRecord() const {
  524. return const_cast<TSelf*>(this)->GetRecord();
  525. }
  526. TRecord* MutableRecord() {
  527. GetRecord(); // Make sure PreSerializedData is parsed
  528. return &(TBase::Record);
  529. }
  530. TString ToString() const override {
  531. return GetRecord().ShortDebugString();
  532. }
  533. bool SerializeToArcadiaStream(TChunkSerializer* chunker) const override {
  534. return TBase::SerializeToArcadiaStreamImpl(chunker, PreSerializedData);
  535. }
  536. ui32 CalculateSerializedSize() const override {
  537. return PreSerializedData.size() + TBase::CalculateSerializedSize();
  538. }
  539. size_t GetCachedByteSize() const {
  540. return PreSerializedData.size() + TBase::GetCachedByteSize();
  541. }
  542. ui32 CalculateSerializedSizeCached() const override {
  543. return GetCachedByteSize();
  544. }
  545. TEventSerializationInfo CreateSerializationInfo() const override {
  546. return TBase::CreateSerializationInfoImpl(PreSerializedData.size());
  547. }
  548. };
  549. inline TActorId ActorIdFromProto(const NActorsProto::TActorId& actorId) {
  550. return TActorId(actorId.GetRawX1(), actorId.GetRawX2());
  551. }
  552. inline void ActorIdToProto(const TActorId& src, NActorsProto::TActorId* dest) {
  553. Y_VERIFY_DEBUG(dest);
  554. dest->SetRawX1(src.RawX1());
  555. dest->SetRawX2(src.RawX2());
  556. }
  557. }