event_pb.cpp 7.1 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224
  1. #include "event_pb.h"
  2. namespace NActors {
  3. bool TRopeStream::Next(const void** data, int* size) {
  4. *data = Iter.ContiguousData();
  5. *size = Iter.ContiguousSize();
  6. if (size_t(*size + TotalByteCount) > Size) {
  7. *size = Size - TotalByteCount;
  8. Iter += *size;
  9. } else if (Iter.Valid()) {
  10. Iter.AdvanceToNextContiguousBlock();
  11. }
  12. TotalByteCount += *size;
  13. return *size != 0;
  14. }
  15. void TRopeStream::BackUp(int count) {
  16. Y_VERIFY(count <= TotalByteCount);
  17. Iter -= count;
  18. TotalByteCount -= count;
  19. }
  20. bool TRopeStream::Skip(int count) {
  21. if (static_cast<size_t>(TotalByteCount + count) > Size) {
  22. count = Size - TotalByteCount;
  23. }
  24. Iter += count;
  25. TotalByteCount += count;
  26. return static_cast<size_t>(TotalByteCount) != Size;
  27. }
  28. TCoroutineChunkSerializer::TCoroutineChunkSerializer()
  29. : TotalSerializedDataSize(0)
  30. , Stack(64 * 1024)
  31. , SelfClosure{this, TArrayRef(Stack.Begin(), Stack.End())}
  32. , InnerContext(SelfClosure)
  33. {}
  34. TCoroutineChunkSerializer::~TCoroutineChunkSerializer() {
  35. CancelFlag = true;
  36. Resume();
  37. Y_VERIFY(Finished);
  38. }
  39. bool TCoroutineChunkSerializer::AllowsAliasing() const {
  40. return true;
  41. }
  42. void TCoroutineChunkSerializer::Produce(const void *data, size_t size) {
  43. Y_VERIFY(size <= SizeRemain);
  44. SizeRemain -= size;
  45. TotalSerializedDataSize += size;
  46. if (!Chunks.empty()) {
  47. auto& last = Chunks.back();
  48. if (last.first + last.second == data) {
  49. last.second += size; // just extend the last buffer
  50. return;
  51. }
  52. }
  53. Chunks.emplace_back(static_cast<const char*>(data), size);
  54. }
  55. bool TCoroutineChunkSerializer::WriteAliasedRaw(const void* data, int size) {
  56. Y_VERIFY(!CancelFlag);
  57. Y_VERIFY(!AbortFlag);
  58. Y_VERIFY(size >= 0);
  59. while (size) {
  60. if (const size_t bytesToAppend = Min<size_t>(size, SizeRemain)) {
  61. const void *produce = data;
  62. if ((reinterpret_cast<uintptr_t>(data) & 63) + bytesToAppend <= 64 &&
  63. (Chunks.empty() || data != Chunks.back().first + Chunks.back().second)) {
  64. memcpy(BufferPtr, data, bytesToAppend);
  65. produce = BufferPtr;
  66. BufferPtr += bytesToAppend;
  67. }
  68. Produce(produce, bytesToAppend);
  69. data = static_cast<const char*>(data) + bytesToAppend;
  70. size -= bytesToAppend;
  71. } else {
  72. InnerContext.SwitchTo(BufFeedContext);
  73. if (CancelFlag || AbortFlag) {
  74. return false;
  75. }
  76. }
  77. }
  78. return true;
  79. }
  80. bool TCoroutineChunkSerializer::Next(void** data, int* size) {
  81. Y_VERIFY(!CancelFlag);
  82. Y_VERIFY(!AbortFlag);
  83. if (!SizeRemain) {
  84. InnerContext.SwitchTo(BufFeedContext);
  85. if (CancelFlag || AbortFlag) {
  86. return false;
  87. }
  88. }
  89. Y_VERIFY(SizeRemain);
  90. *data = BufferPtr;
  91. *size = SizeRemain;
  92. BufferPtr += SizeRemain;
  93. Produce(*data, *size);
  94. return true;
  95. }
  96. void TCoroutineChunkSerializer::BackUp(int count) {
  97. if (!count) {
  98. return;
  99. }
  100. Y_VERIFY(count > 0);
  101. Y_VERIFY(!Chunks.empty());
  102. TChunk& buf = Chunks.back();
  103. Y_VERIFY((size_t)count <= buf.second);
  104. Y_VERIFY(buf.first + buf.second == BufferPtr, "buf# %p:%zu BufferPtr# %p SizeRemain# %zu NumChunks# %zu",
  105. buf.first, buf.second, BufferPtr, SizeRemain, Chunks.size());
  106. buf.second -= count;
  107. if (!buf.second) {
  108. Chunks.pop_back();
  109. }
  110. BufferPtr -= count;
  111. SizeRemain += count;
  112. TotalSerializedDataSize -= count;
  113. }
  114. void TCoroutineChunkSerializer::Resume() {
  115. TContMachineContext feedContext;
  116. BufFeedContext = &feedContext;
  117. feedContext.SwitchTo(&InnerContext);
  118. BufFeedContext = nullptr;
  119. }
  120. bool TCoroutineChunkSerializer::WriteRope(const TRope *rope) {
  121. for (auto iter = rope->Begin(); iter.Valid(); iter.AdvanceToNextContiguousBlock()) {
  122. if (!WriteAliasedRaw(iter.ContiguousData(), iter.ContiguousSize())) {
  123. return false;
  124. }
  125. }
  126. return true;
  127. }
  128. bool TCoroutineChunkSerializer::WriteString(const TString *s) {
  129. return WriteAliasedRaw(s->data(), s->length());
  130. }
  131. std::span<TCoroutineChunkSerializer::TChunk> TCoroutineChunkSerializer::FeedBuf(void* data, size_t size) {
  132. // fill in base params
  133. BufferPtr = static_cast<char*>(data);
  134. SizeRemain = size;
  135. Y_VERIFY_DEBUG(size);
  136. // transfer control to the coroutine
  137. Y_VERIFY(Event);
  138. Chunks.clear();
  139. Resume();
  140. return Chunks;
  141. }
  142. void TCoroutineChunkSerializer::SetSerializingEvent(const IEventBase *event) {
  143. Y_VERIFY(Event == nullptr);
  144. Event = event;
  145. TotalSerializedDataSize = 0;
  146. AbortFlag = false;
  147. }
  148. void TCoroutineChunkSerializer::Abort() {
  149. Y_VERIFY(Event);
  150. AbortFlag = true;
  151. Resume();
  152. }
  153. void TCoroutineChunkSerializer::DoRun() {
  154. while (!CancelFlag) {
  155. Y_VERIFY(Event);
  156. SerializationSuccess = !AbortFlag && Event->SerializeToArcadiaStream(this);
  157. Event = nullptr;
  158. if (!CancelFlag) { // cancel flag may have been received during serialization
  159. InnerContext.SwitchTo(BufFeedContext);
  160. }
  161. }
  162. Finished = true;
  163. InnerContext.SwitchTo(BufFeedContext);
  164. }
  165. bool TAllocChunkSerializer::Next(void** pdata, int* psize) {
  166. if (Backup) {
  167. // we have some data in backup rope -- move the first chunk from the backup rope to the buffer and return
  168. // pointer to the buffer; it is safe to remove 'const' here as we uniquely own this buffer
  169. TRope::TIterator iter = Backup.Begin();
  170. *pdata = const_cast<char*>(iter.ContiguousData());
  171. *psize = iter.ContiguousSize();
  172. iter.AdvanceToNextContiguousBlock();
  173. Buffers->Append(Backup.Extract(Backup.Begin(), iter));
  174. } else {
  175. // no backup buffer, so we have to create new one
  176. auto item = TRopeAlignedBuffer::Allocate(4096);
  177. *pdata = item->GetBuffer();
  178. *psize = item->GetCapacity();
  179. Buffers->Append(TRope(std::move(item)));
  180. }
  181. return true;
  182. }
  183. void TAllocChunkSerializer::BackUp(int count) {
  184. Backup.Insert(Backup.Begin(), Buffers->EraseBack(count));
  185. }
  186. bool TAllocChunkSerializer::WriteAliasedRaw(const void*, int) {
  187. Y_VERIFY(false);
  188. return false;
  189. }
  190. bool TAllocChunkSerializer::WriteRope(const TRope *rope) {
  191. Buffers->Append(TRope(*rope));
  192. return true;
  193. }
  194. bool TAllocChunkSerializer::WriteString(const TString *s) {
  195. Buffers->Append(*s);
  196. return true;
  197. }
  198. }