event_pb.cpp 6.8 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223
  1. #include "event_pb.h"
  2. namespace NActors {
  3. bool TRopeStream::Next(const void** data, int* size) {
  4. *data = Iter.ContiguousData();
  5. *size = Iter.ContiguousSize();
  6. if (size_t(*size + TotalByteCount) > Size) {
  7. *size = Size - TotalByteCount;
  8. Iter += *size;
  9. } else if (Iter.Valid()) {
  10. Iter.AdvanceToNextContiguousBlock();
  11. }
  12. TotalByteCount += *size;
  13. return *size != 0;
  14. }
  15. void TRopeStream::BackUp(int count) {
  16. Y_VERIFY(count <= TotalByteCount);
  17. Iter -= count;
  18. TotalByteCount -= count;
  19. }
  20. bool TRopeStream::Skip(int count) {
  21. if (static_cast<size_t>(TotalByteCount + count) > Size) {
  22. count = Size - TotalByteCount;
  23. }
  24. Iter += count;
  25. TotalByteCount += count;
  26. return static_cast<size_t>(TotalByteCount) != Size;
  27. }
  28. TCoroutineChunkSerializer::TCoroutineChunkSerializer()
  29. : TotalSerializedDataSize(0)
  30. , Stack(64 * 1024)
  31. , SelfClosure{this, TArrayRef(Stack.Begin(), Stack.End())}
  32. , InnerContext(SelfClosure)
  33. {}
  34. TCoroutineChunkSerializer::~TCoroutineChunkSerializer() {
  35. CancelFlag = true;
  36. Resume();
  37. Y_VERIFY(Finished);
  38. }
  39. bool TCoroutineChunkSerializer::AllowsAliasing() const {
  40. return true;
  41. }
  42. bool TCoroutineChunkSerializer::Produce(const void *data, size_t size) {
  43. Y_VERIFY(size <= SizeRemain);
  44. SizeRemain -= size;
  45. TotalSerializedDataSize += size;
  46. if (NumChunks) {
  47. auto& last = Chunks[NumChunks - 1];
  48. if (last.first + last.second == data) {
  49. last.second += size; // just extend the last buffer
  50. return true;
  51. }
  52. }
  53. if (NumChunks == MaxChunks) {
  54. InnerContext.SwitchTo(BufFeedContext);
  55. if (CancelFlag || AbortFlag) {
  56. return false;
  57. }
  58. }
  59. Y_VERIFY(NumChunks < MaxChunks);
  60. Chunks[NumChunks++] = {static_cast<const char*>(data), size};
  61. return true;
  62. }
  63. bool TCoroutineChunkSerializer::WriteAliasedRaw(const void* data, int size) {
  64. Y_VERIFY(size >= 0);
  65. while (size) {
  66. if (CancelFlag || AbortFlag) {
  67. return false;
  68. } else if (const size_t bytesToAppend = Min<size_t>(size, SizeRemain)) {
  69. if (!Produce(data, bytesToAppend)) {
  70. return false;
  71. }
  72. data = static_cast<const char*>(data) + bytesToAppend;
  73. size -= bytesToAppend;
  74. } else {
  75. InnerContext.SwitchTo(BufFeedContext);
  76. }
  77. }
  78. return true;
  79. }
  80. bool TCoroutineChunkSerializer::Next(void** data, int* size) {
  81. if (CancelFlag || AbortFlag) {
  82. return false;
  83. }
  84. if (!SizeRemain) {
  85. InnerContext.SwitchTo(BufFeedContext);
  86. if (CancelFlag || AbortFlag) {
  87. return false;
  88. }
  89. }
  90. Y_VERIFY(SizeRemain);
  91. *data = BufferPtr;
  92. *size = SizeRemain;
  93. BufferPtr += SizeRemain;
  94. return Produce(*data, *size);
  95. }
  96. void TCoroutineChunkSerializer::BackUp(int count) {
  97. if (!count) {
  98. return;
  99. }
  100. Y_VERIFY(count > 0);
  101. Y_VERIFY(NumChunks);
  102. TChunk& buf = Chunks[NumChunks - 1];
  103. Y_VERIFY((size_t)count <= buf.second);
  104. Y_VERIFY(buf.first + buf.second == BufferPtr);
  105. buf.second -= count;
  106. if (!buf.second) {
  107. --NumChunks;
  108. }
  109. BufferPtr -= count;
  110. SizeRemain += count;
  111. TotalSerializedDataSize -= count;
  112. }
  113. void TCoroutineChunkSerializer::Resume() {
  114. TContMachineContext feedContext;
  115. BufFeedContext = &feedContext;
  116. feedContext.SwitchTo(&InnerContext);
  117. BufFeedContext = nullptr;
  118. }
  119. bool TCoroutineChunkSerializer::WriteRope(const TRope *rope) {
  120. for (auto iter = rope->Begin(); iter.Valid(); iter.AdvanceToNextContiguousBlock()) {
  121. if (!WriteAliasedRaw(iter.ContiguousData(), iter.ContiguousSize())) {
  122. return false;
  123. }
  124. }
  125. return true;
  126. }
  127. bool TCoroutineChunkSerializer::WriteString(const TString *s) {
  128. return WriteAliasedRaw(s->data(), s->length());
  129. }
  130. std::pair<TCoroutineChunkSerializer::TChunk*, TCoroutineChunkSerializer::TChunk*> TCoroutineChunkSerializer::FeedBuf(void* data, size_t size) {
  131. // fill in base params
  132. BufferPtr = static_cast<char*>(data);
  133. SizeRemain = size;
  134. // transfer control to the coroutine
  135. Y_VERIFY(Event);
  136. NumChunks = 0;
  137. Resume();
  138. return {Chunks, Chunks + NumChunks};
  139. }
  140. void TCoroutineChunkSerializer::SetSerializingEvent(const IEventBase *event) {
  141. Y_VERIFY(Event == nullptr);
  142. Event = event;
  143. TotalSerializedDataSize = 0;
  144. AbortFlag = false;
  145. }
  146. void TCoroutineChunkSerializer::Abort() {
  147. Y_VERIFY(Event);
  148. AbortFlag = true;
  149. Resume();
  150. }
  151. void TCoroutineChunkSerializer::DoRun() {
  152. while (!CancelFlag) {
  153. Y_VERIFY(Event);
  154. SerializationSuccess = Event->SerializeToArcadiaStream(this);
  155. Event = nullptr;
  156. if (!CancelFlag) { // cancel flag may have been received during serialization
  157. InnerContext.SwitchTo(BufFeedContext);
  158. }
  159. }
  160. Finished = true;
  161. InnerContext.SwitchTo(BufFeedContext);
  162. }
  163. bool TAllocChunkSerializer::Next(void** pdata, int* psize) {
  164. if (Backup) {
  165. // we have some data in backup rope -- move the first chunk from the backup rope to the buffer and return
  166. // pointer to the buffer; it is safe to remove 'const' here as we uniquely own this buffer
  167. TRope::TIterator iter = Backup.Begin();
  168. *pdata = const_cast<char*>(iter.ContiguousData());
  169. *psize = iter.ContiguousSize();
  170. iter.AdvanceToNextContiguousBlock();
  171. Buffers->Append(Backup.Extract(Backup.Begin(), iter));
  172. } else {
  173. // no backup buffer, so we have to create new one
  174. auto item = TRopeAlignedBuffer::Allocate(4096);
  175. *pdata = item->GetBuffer();
  176. *psize = item->GetCapacity();
  177. Buffers->Append(TRope(std::move(item)));
  178. }
  179. return true;
  180. }
  181. void TAllocChunkSerializer::BackUp(int count) {
  182. Backup.Insert(Backup.Begin(), Buffers->EraseBack(count));
  183. }
  184. bool TAllocChunkSerializer::WriteAliasedRaw(const void*, int) {
  185. Y_VERIFY(false);
  186. return false;
  187. }
  188. bool TAllocChunkSerializer::WriteRope(const TRope *rope) {
  189. Buffers->Append(TRope(*rope));
  190. return true;
  191. }
  192. bool TAllocChunkSerializer::WriteString(const TString *s) {
  193. Buffers->Append(*s);
  194. return true;
  195. }
  196. }