event_load.h 3.8 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133
  1. #pragma once
  2. #include <util/stream/walk.h>
  3. #include <util/system/types.h>
  4. #include <util/generic/string.h>
  5. #include <library/cpp/actors/util/rope.h>
  6. #include <library/cpp/actors/wilson/wilson_trace.h>
  7. namespace NActors {
  8. class IEventHandle;
  9. struct TConstIoVec {
  10. const void* Data;
  11. size_t Size;
  12. };
  13. struct TIoVec {
  14. void* Data;
  15. size_t Size;
  16. };
  17. struct TEventSectionInfo {
  18. size_t Headroom = 0; // headroom to be created on the receiving side
  19. size_t Size = 0; // full size of serialized event section (a chunk in rope)
  20. size_t Tailroom = 0; // tailroom for the chunk
  21. size_t Alignment = 0; // required alignment
  22. bool IsInline = false; // if true, goes through ordinary channel
  23. };
  24. struct TEventSerializationInfo {
  25. bool IsExtendedFormat = {};
  26. std::vector<TEventSectionInfo> Sections;
  27. // total sum of Size for every section must match actual serialized size of the event
  28. };
  29. class TEventSerializedData
  30. : public TThrRefBase
  31. {
  32. TRope Rope;
  33. TEventSerializationInfo SerializationInfo;
  34. public:
  35. TEventSerializedData() = default;
  36. TEventSerializedData(TRope&& rope, TEventSerializationInfo&& serializationInfo)
  37. : Rope(std::move(rope))
  38. , SerializationInfo(std::move(serializationInfo))
  39. {}
  40. TEventSerializedData(const TEventSerializedData& original, TString extraBuffer)
  41. : Rope(original.Rope)
  42. , SerializationInfo(original.SerializationInfo)
  43. {
  44. if (!SerializationInfo.Sections.empty()) {
  45. SerializationInfo.Sections.push_back(TEventSectionInfo{0, extraBuffer.size(), 0, 0, true});
  46. }
  47. Append(std::move(extraBuffer));
  48. }
  49. TEventSerializedData(TString buffer, TEventSerializationInfo&& serializationInfo)
  50. : SerializationInfo(std::move(serializationInfo))
  51. {
  52. Append(std::move(buffer));
  53. }
  54. void SetSerializationInfo(TEventSerializationInfo&& serializationInfo) {
  55. SerializationInfo = std::move(serializationInfo);
  56. }
  57. const TEventSerializationInfo& GetSerializationInfo() const {
  58. return SerializationInfo;
  59. }
  60. TRope::TConstIterator GetBeginIter() const {
  61. return Rope.Begin();
  62. }
  63. size_t GetSize() const {
  64. return Rope.GetSize();
  65. }
  66. TString GetString() const {
  67. TString result;
  68. result.reserve(GetSize());
  69. for (auto it = Rope.Begin(); it.Valid(); it.AdvanceToNextContiguousBlock()) {
  70. result.append(it.ContiguousData(), it.ContiguousSize());
  71. }
  72. return result;
  73. }
  74. TRope GetRope() const {
  75. return TRope(Rope);
  76. }
  77. TRope EraseBack(size_t count) {
  78. Y_ABORT_UNLESS(count <= Rope.GetSize());
  79. TRope::TIterator iter = Rope.End();
  80. iter -= count;
  81. return Rope.Extract(iter, Rope.End());
  82. }
  83. void Append(TRope&& from) {
  84. Rope.Insert(Rope.End(), std::move(from));
  85. }
  86. void Append(TString buffer) {
  87. if (buffer) {
  88. Rope.Insert(Rope.End(), TRope(std::move(buffer)));
  89. }
  90. }
  91. };
  92. }
  93. class TChainBufWalk : public IWalkInput {
  94. TIntrusivePtr<NActors::TEventSerializedData> Buffer;
  95. TRope::TConstIterator Iter;
  96. public:
  97. TChainBufWalk(TIntrusivePtr<NActors::TEventSerializedData> buffer)
  98. : Buffer(std::move(buffer))
  99. , Iter(Buffer->GetBeginIter())
  100. {}
  101. private:
  102. size_t DoUnboundedNext(const void **ptr) override {
  103. const size_t size = Iter.ContiguousSize();
  104. *ptr = Iter.ContiguousData();
  105. if (Iter.Valid()) {
  106. Iter.AdvanceToNextContiguousBlock();
  107. }
  108. return size;
  109. }
  110. };