event_pb.h 16 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500
  1. #pragma once
  2. #include "event.h"
  3. #include "event_load.h"
  4. #include <google/protobuf/io/zero_copy_stream.h>
  5. #include <google/protobuf/arena.h>
  6. #include <library/cpp/actors/protos/actors.pb.h>
  7. #include <util/generic/deque.h>
  8. #include <util/system/context.h>
  9. #include <util/system/filemap.h>
  10. #include <array>
  11. namespace NActors {
  12. class TRopeStream : public NProtoBuf::io::ZeroCopyInputStream {
  13. TRope::TConstIterator Iter;
  14. const size_t Size;
  15. public:
  16. TRopeStream(TRope::TConstIterator iter, size_t size)
  17. : Iter(iter)
  18. , Size(size)
  19. {}
  20. bool Next(const void** data, int* size) override;
  21. void BackUp(int count) override;
  22. bool Skip(int count) override;
  23. int64_t ByteCount() const override {
  24. return TotalByteCount;
  25. }
  26. private:
  27. int64_t TotalByteCount = 0;
  28. };
  29. class TChunkSerializer : public NProtoBuf::io::ZeroCopyOutputStream {
  30. public:
  31. TChunkSerializer() = default;
  32. virtual ~TChunkSerializer() = default;
  33. virtual bool WriteRope(const TRope *rope) = 0;
  34. virtual bool WriteString(const TString *s) = 0;
  35. };
  36. class TAllocChunkSerializer final : public TChunkSerializer {
  37. public:
  38. bool Next(void** data, int* size) override;
  39. void BackUp(int count) override;
  40. int64_t ByteCount() const override {
  41. return Buffers->GetSize();
  42. }
  43. bool WriteAliasedRaw(const void* data, int size) override;
  44. // WARNING: these methods require owner to retain ownership and immutability of passed objects
  45. bool WriteRope(const TRope *rope) override;
  46. bool WriteString(const TString *s) override;
  47. inline TIntrusivePtr<TEventSerializedData> Release(bool extendedFormat) {
  48. if (extendedFormat) {
  49. Buffers->SetExtendedFormat();
  50. }
  51. return std::move(Buffers);
  52. }
  53. protected:
  54. TIntrusivePtr<TEventSerializedData> Buffers = new TEventSerializedData;
  55. TRope Backup;
  56. };
  57. class TCoroutineChunkSerializer final : public TChunkSerializer, protected ITrampoLine {
  58. public:
  59. using TChunk = std::pair<const char*, size_t>;
  60. TCoroutineChunkSerializer();
  61. ~TCoroutineChunkSerializer();
  62. void SetSerializingEvent(const IEventBase *event);
  63. void Abort();
  64. std::pair<TChunk*, TChunk*> FeedBuf(void* data, size_t size);
  65. bool IsComplete() const {
  66. return !Event;
  67. }
  68. bool IsSuccessfull() const {
  69. return SerializationSuccess;
  70. }
  71. const IEventBase *GetCurrentEvent() const {
  72. return Event;
  73. }
  74. bool Next(void** data, int* size) override;
  75. void BackUp(int count) override;
  76. int64_t ByteCount() const override {
  77. return TotalSerializedDataSize;
  78. }
  79. bool WriteAliasedRaw(const void* data, int size) override;
  80. bool AllowsAliasing() const override;
  81. bool WriteRope(const TRope *rope) override;
  82. bool WriteString(const TString *s) override;
  83. protected:
  84. void DoRun() override;
  85. void Resume();
  86. bool Produce(const void *data, size_t size);
  87. i64 TotalSerializedDataSize;
  88. TMappedAllocation Stack;
  89. TContClosure SelfClosure;
  90. TContMachineContext InnerContext;
  91. TContMachineContext *BufFeedContext = nullptr;
  92. char *BufferPtr;
  93. size_t SizeRemain;
  94. static constexpr size_t MaxChunks = 16;
  95. TChunk Chunks[MaxChunks];
  96. size_t NumChunks = 0;
  97. const IEventBase *Event = nullptr;
  98. bool CancelFlag = false;
  99. bool AbortFlag;
  100. bool SerializationSuccess;
  101. bool Finished = false;
  102. };
  103. #ifdef ACTORLIB_HUGE_PB_SIZE
  104. static const size_t EventMaxByteSize = 140 << 20; // (140MB)
  105. #else
  106. static const size_t EventMaxByteSize = 67108000;
  107. #endif
  108. template <typename TEv, typename TRecord /*protobuf record*/, ui32 TEventType, typename TRecHolder>
  109. class TEventPBBase: public TEventBase<TEv, TEventType> , public TRecHolder {
  110. // a vector of data buffers referenced by record; if filled, then extended serialization mechanism applies
  111. TVector<TRope> Payload;
  112. public:
  113. using TRecHolder::Record;
  114. public:
  115. using ProtoRecordType = TRecord;
  116. TEventPBBase() = default;
  117. explicit TEventPBBase(const TRecord& rec)
  118. {
  119. Record = rec;
  120. }
  121. explicit TEventPBBase(TRecord&& rec)
  122. {
  123. Record = std::move(rec);
  124. }
  125. TString ToStringHeader() const override {
  126. return Record.GetTypeName();
  127. }
  128. TString ToString() const override {
  129. return Record.ShortDebugString();
  130. }
  131. bool IsSerializable() const override {
  132. return true;
  133. }
  134. bool IsExtendedFormat() const override {
  135. return static_cast<bool>(Payload);
  136. }
  137. bool SerializeToArcadiaStream(TChunkSerializer* chunker) const override {
  138. // serialize payload first
  139. if (Payload) {
  140. void *data;
  141. int size = 0;
  142. auto append = [&](const char *p, size_t len) {
  143. while (len) {
  144. if (size) {
  145. const size_t numBytesToCopy = std::min<size_t>(size, len);
  146. memcpy(data, p, numBytesToCopy);
  147. data = static_cast<char*>(data) + numBytesToCopy;
  148. size -= numBytesToCopy;
  149. p += numBytesToCopy;
  150. len -= numBytesToCopy;
  151. } else if (!chunker->Next(&data, &size)) {
  152. return false;
  153. }
  154. }
  155. return true;
  156. };
  157. auto appendNumber = [&](size_t number) {
  158. char buf[MaxNumberBytes];
  159. return append(buf, SerializeNumber(number, buf));
  160. };
  161. char marker = PayloadMarker;
  162. append(&marker, 1);
  163. if (!appendNumber(Payload.size())) {
  164. return false;
  165. }
  166. for (const TRope& rope : Payload) {
  167. if (!appendNumber(rope.GetSize())) {
  168. return false;
  169. }
  170. if (rope) {
  171. if (size) {
  172. chunker->BackUp(std::exchange(size, 0));
  173. }
  174. if (!chunker->WriteRope(&rope)) {
  175. return false;
  176. }
  177. }
  178. }
  179. if (size) {
  180. chunker->BackUp(size);
  181. }
  182. }
  183. return Record.SerializeToZeroCopyStream(chunker);
  184. }
  185. ui32 CalculateSerializedSize() const override {
  186. ssize_t result = Record.ByteSize();
  187. if (result >= 0 && Payload) {
  188. ++result; // marker
  189. char buf[MaxNumberBytes];
  190. result += SerializeNumber(Payload.size(), buf);
  191. for (const TRope& rope : Payload) {
  192. result += SerializeNumber(rope.GetSize(), buf);
  193. result += rope.GetSize();
  194. }
  195. }
  196. return result;
  197. }
  198. static IEventBase* Load(TIntrusivePtr<TEventSerializedData> input) {
  199. THolder<TEventPBBase> ev(new TEv());
  200. if (!input->GetSize()) {
  201. Y_PROTOBUF_SUPPRESS_NODISCARD ev->Record.ParseFromString(TString());
  202. } else {
  203. TRope::TConstIterator iter = input->GetBeginIter();
  204. ui64 size = input->GetSize();
  205. if (input->IsExtendedFormat()) {
  206. // check marker
  207. if (!iter.Valid() || *iter.ContiguousData() != PayloadMarker) {
  208. Y_FAIL("invalid event");
  209. }
  210. // skip marker
  211. iter += 1;
  212. --size;
  213. // parse number of payload ropes
  214. size_t numRopes = DeserializeNumber(iter, size);
  215. if (numRopes == Max<size_t>()) {
  216. Y_FAIL("invalid event");
  217. }
  218. while (numRopes--) {
  219. // parse length of the rope
  220. const size_t len = DeserializeNumber(iter, size);
  221. if (len == Max<size_t>() || size < len) {
  222. Y_FAIL("invalid event len# %zu size# %" PRIu64, len, size);
  223. }
  224. // extract the rope
  225. TRope::TConstIterator begin = iter;
  226. iter += len;
  227. size -= len;
  228. ev->Payload.emplace_back(begin, iter);
  229. }
  230. }
  231. // parse the protobuf
  232. TRopeStream stream(iter, size);
  233. if (!ev->Record.ParseFromZeroCopyStream(&stream)) {
  234. Y_FAIL("Failed to parse protobuf event type %" PRIu32 " class %s", TEventType, TypeName(ev->Record).data());
  235. }
  236. }
  237. ev->CachedByteSize = input->GetSize();
  238. return ev.Release();
  239. }
  240. size_t GetCachedByteSize() const {
  241. if (CachedByteSize == 0) {
  242. CachedByteSize = CalculateSerializedSize();
  243. }
  244. return CachedByteSize;
  245. }
  246. ui32 CalculateSerializedSizeCached() const override {
  247. return GetCachedByteSize();
  248. }
  249. void InvalidateCachedByteSize() {
  250. CachedByteSize = 0;
  251. }
  252. public:
  253. void ReservePayload(size_t size) {
  254. Payload.reserve(size);
  255. }
  256. ui32 AddPayload(TRope&& rope) {
  257. const ui32 id = Payload.size();
  258. Payload.push_back(std::move(rope));
  259. InvalidateCachedByteSize();
  260. return id;
  261. }
  262. const TRope& GetPayload(ui32 id) const {
  263. Y_VERIFY(id < Payload.size());
  264. return Payload[id];
  265. }
  266. ui32 GetPayloadCount() const {
  267. return Payload.size();
  268. }
  269. void StripPayload() {
  270. Payload.clear();
  271. }
  272. protected:
  273. mutable size_t CachedByteSize = 0;
  274. static constexpr char PayloadMarker = 0x07;
  275. static constexpr size_t MaxNumberBytes = (sizeof(size_t) * CHAR_BIT + 6) / 7;
  276. static size_t SerializeNumber(size_t num, char *buffer) {
  277. char *begin = buffer;
  278. do {
  279. *buffer++ = (num & 0x7F) | (num >= 128 ? 0x80 : 0x00);
  280. num >>= 7;
  281. } while (num);
  282. return buffer - begin;
  283. }
  284. static size_t DeserializeNumber(const char **ptr, const char *end) {
  285. const char *p = *ptr;
  286. size_t res = 0;
  287. size_t offset = 0;
  288. for (;;) {
  289. if (p == end) {
  290. return Max<size_t>();
  291. }
  292. const char byte = *p++;
  293. res |= (static_cast<size_t>(byte) & 0x7F) << offset;
  294. offset += 7;
  295. if (!(byte & 0x80)) {
  296. break;
  297. }
  298. }
  299. *ptr = p;
  300. return res;
  301. }
  302. static size_t DeserializeNumber(TRope::TConstIterator& iter, ui64& size) {
  303. size_t res = 0;
  304. size_t offset = 0;
  305. for (;;) {
  306. if (!iter.Valid()) {
  307. return Max<size_t>();
  308. }
  309. const char byte = *iter.ContiguousData();
  310. iter += 1;
  311. --size;
  312. res |= (static_cast<size_t>(byte) & 0x7F) << offset;
  313. offset += 7;
  314. if (!(byte & 0x80)) {
  315. break;
  316. }
  317. }
  318. return res;
  319. }
  320. };
  321. // Protobuf record not using arena
  322. template <typename TRecord>
  323. struct TRecordHolder {
  324. TRecord Record;
  325. };
  326. // Protobuf arena and a record allocated on it
  327. template <typename TRecord, size_t InitialBlockSize, size_t MaxBlockSize>
  328. struct TArenaRecordHolder {
  329. google::protobuf::Arena PbArena;
  330. TRecord& Record;
  331. static const google::protobuf::ArenaOptions GetArenaOptions() {
  332. google::protobuf::ArenaOptions opts;
  333. opts.initial_block_size = InitialBlockSize;
  334. opts.max_block_size = MaxBlockSize;
  335. return opts;
  336. }
  337. TArenaRecordHolder()
  338. : PbArena(GetArenaOptions())
  339. , Record(*google::protobuf::Arena::CreateMessage<TRecord>(&PbArena))
  340. {}
  341. };
  342. template <typename TEv, typename TRecord, ui32 TEventType>
  343. class TEventPB : public TEventPBBase<TEv, TRecord, TEventType, TRecordHolder<TRecord> > {
  344. typedef TEventPBBase<TEv, TRecord, TEventType, TRecordHolder<TRecord> > TPbBase;
  345. // NOTE: No extra fields allowed: TEventPB must be a "template typedef"
  346. public:
  347. using TPbBase::TPbBase;
  348. };
  349. template <typename TEv, typename TRecord, ui32 TEventType, size_t InitialBlockSize = 512, size_t MaxBlockSize = 16*1024>
  350. using TEventPBWithArena = TEventPBBase<TEv, TRecord, TEventType, TArenaRecordHolder<TRecord, InitialBlockSize, MaxBlockSize> >;
  351. template <typename TEv, typename TRecord, ui32 TEventType>
  352. class TEventShortDebugPB: public TEventPB<TEv, TRecord, TEventType> {
  353. public:
  354. using TBase = TEventPB<TEv, TRecord, TEventType>;
  355. TEventShortDebugPB() = default;
  356. explicit TEventShortDebugPB(const TRecord& rec)
  357. : TBase(rec)
  358. {
  359. }
  360. explicit TEventShortDebugPB(TRecord&& rec)
  361. : TBase(std::move(rec))
  362. {
  363. }
  364. TString ToString() const override {
  365. return TypeName<TEv>() + " { " + TBase::Record.ShortDebugString() + " }";
  366. }
  367. };
  368. template <typename TEv, typename TRecord, ui32 TEventType>
  369. class TEventPreSerializedPB: public TEventPB<TEv, TRecord, TEventType> {
  370. protected:
  371. using TBase = TEventPB<TEv, TRecord, TEventType>;
  372. using TSelf = TEventPreSerializedPB<TEv, TRecord, TEventType>;
  373. using TBase::Record;
  374. public:
  375. TString PreSerializedData; // already serialized PB data (using message::SerializeToString)
  376. TEventPreSerializedPB() = default;
  377. explicit TEventPreSerializedPB(const TRecord& rec)
  378. : TBase(rec)
  379. {
  380. }
  381. explicit TEventPreSerializedPB(TRecord&& rec)
  382. : TBase(std::move(rec))
  383. {
  384. }
  385. // when remote event received locally this method will merge preserialized data
  386. const TRecord& GetRecord() {
  387. TRecord& base(TBase::Record);
  388. if (!PreSerializedData.empty()) {
  389. TRecord copy;
  390. Y_PROTOBUF_SUPPRESS_NODISCARD copy.ParseFromString(PreSerializedData);
  391. copy.MergeFrom(base);
  392. base.Swap(&copy);
  393. PreSerializedData.clear();
  394. }
  395. return TBase::Record;
  396. }
  397. const TRecord& GetRecord() const {
  398. return const_cast<TSelf*>(this)->GetRecord();
  399. }
  400. TRecord* MutableRecord() {
  401. GetRecord(); // Make sure PreSerializedData is parsed
  402. return &(TBase::Record);
  403. }
  404. TString ToString() const override {
  405. return GetRecord().ShortDebugString();
  406. }
  407. bool SerializeToArcadiaStream(TChunkSerializer* chunker) const override {
  408. return chunker->WriteString(&PreSerializedData) && TBase::SerializeToArcadiaStream(chunker);
  409. }
  410. ui32 CalculateSerializedSize() const override {
  411. return PreSerializedData.size() + TBase::CalculateSerializedSize();
  412. }
  413. size_t GetCachedByteSize() const {
  414. return PreSerializedData.size() + TBase::GetCachedByteSize();
  415. }
  416. ui32 CalculateSerializedSizeCached() const override {
  417. return GetCachedByteSize();
  418. }
  419. };
  420. inline TActorId ActorIdFromProto(const NActorsProto::TActorId& actorId) {
  421. return TActorId(actorId.GetRawX1(), actorId.GetRawX2());
  422. }
  423. inline void ActorIdToProto(const TActorId& src, NActorsProto::TActorId* dest) {
  424. Y_VERIFY_DEBUG(dest);
  425. dest->SetRawX1(src.RawX1());
  426. dest->SetRawX2(src.RawX2());
  427. }
  428. }