123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223 |
- #include "event_pb.h"
- namespace NActors {
- bool TRopeStream::Next(const void** data, int* size) {
- *data = Iter.ContiguousData();
- *size = Iter.ContiguousSize();
- if (size_t(*size + TotalByteCount) > Size) {
- *size = Size - TotalByteCount;
- Iter += *size;
- } else if (Iter.Valid()) {
- Iter.AdvanceToNextContiguousBlock();
- }
- TotalByteCount += *size;
- return *size != 0;
- }
- void TRopeStream::BackUp(int count) {
- Y_VERIFY(count <= TotalByteCount);
- Iter -= count;
- TotalByteCount -= count;
- }
- bool TRopeStream::Skip(int count) {
- if (static_cast<size_t>(TotalByteCount + count) > Size) {
- count = Size - TotalByteCount;
- }
- Iter += count;
- TotalByteCount += count;
- return static_cast<size_t>(TotalByteCount) != Size;
- }
- TCoroutineChunkSerializer::TCoroutineChunkSerializer()
- : TotalSerializedDataSize(0)
- , Stack(64 * 1024)
- , SelfClosure{this, TArrayRef(Stack.Begin(), Stack.End())}
- , InnerContext(SelfClosure)
- {}
- TCoroutineChunkSerializer::~TCoroutineChunkSerializer() {
- CancelFlag = true;
- Resume();
- Y_VERIFY(Finished);
- }
- bool TCoroutineChunkSerializer::AllowsAliasing() const {
- return true;
- }
- bool TCoroutineChunkSerializer::Produce(const void *data, size_t size) {
- Y_VERIFY(size <= SizeRemain);
- SizeRemain -= size;
- TotalSerializedDataSize += size;
- if (NumChunks) {
- auto& last = Chunks[NumChunks - 1];
- if (last.first + last.second == data) {
- last.second += size;
- return true;
- }
- }
- if (NumChunks == MaxChunks) {
- InnerContext.SwitchTo(BufFeedContext);
- if (CancelFlag || AbortFlag) {
- return false;
- }
- }
- Y_VERIFY(NumChunks < MaxChunks);
- Chunks[NumChunks++] = {static_cast<const char*>(data), size};
- return true;
- }
- bool TCoroutineChunkSerializer::WriteAliasedRaw(const void* data, int size) {
- Y_VERIFY(size >= 0);
- while (size) {
- if (CancelFlag || AbortFlag) {
- return false;
- } else if (const size_t bytesToAppend = Min<size_t>(size, SizeRemain)) {
- if (!Produce(data, bytesToAppend)) {
- return false;
- }
- data = static_cast<const char*>(data) + bytesToAppend;
- size -= bytesToAppend;
- } else {
- InnerContext.SwitchTo(BufFeedContext);
- }
- }
- return true;
- }
- bool TCoroutineChunkSerializer::Next(void** data, int* size) {
- if (CancelFlag || AbortFlag) {
- return false;
- }
- if (!SizeRemain) {
- InnerContext.SwitchTo(BufFeedContext);
- if (CancelFlag || AbortFlag) {
- return false;
- }
- }
- Y_VERIFY(SizeRemain);
- *data = BufferPtr;
- *size = SizeRemain;
- BufferPtr += SizeRemain;
- return Produce(*data, *size);
- }
- void TCoroutineChunkSerializer::BackUp(int count) {
- if (!count) {
- return;
- }
- Y_VERIFY(count > 0);
- Y_VERIFY(NumChunks);
- TChunk& buf = Chunks[NumChunks - 1];
- Y_VERIFY((size_t)count <= buf.second);
- Y_VERIFY(buf.first + buf.second == BufferPtr);
- buf.second -= count;
- if (!buf.second) {
- --NumChunks;
- }
- BufferPtr -= count;
- SizeRemain += count;
- TotalSerializedDataSize -= count;
- }
- void TCoroutineChunkSerializer::Resume() {
- TContMachineContext feedContext;
- BufFeedContext = &feedContext;
- feedContext.SwitchTo(&InnerContext);
- BufFeedContext = nullptr;
- }
- bool TCoroutineChunkSerializer::WriteRope(const TRope *rope) {
- for (auto iter = rope->Begin(); iter.Valid(); iter.AdvanceToNextContiguousBlock()) {
- if (!WriteAliasedRaw(iter.ContiguousData(), iter.ContiguousSize())) {
- return false;
- }
- }
- return true;
- }
- bool TCoroutineChunkSerializer::WriteString(const TString *s) {
- return WriteAliasedRaw(s->data(), s->length());
- }
- std::pair<TCoroutineChunkSerializer::TChunk*, TCoroutineChunkSerializer::TChunk*> TCoroutineChunkSerializer::FeedBuf(void* data, size_t size) {
-
- BufferPtr = static_cast<char*>(data);
- SizeRemain = size;
-
- Y_VERIFY(Event);
- NumChunks = 0;
- Resume();
- return {Chunks, Chunks + NumChunks};
- }
- void TCoroutineChunkSerializer::SetSerializingEvent(const IEventBase *event) {
- Y_VERIFY(Event == nullptr);
- Event = event;
- TotalSerializedDataSize = 0;
- AbortFlag = false;
- }
- void TCoroutineChunkSerializer::Abort() {
- Y_VERIFY(Event);
- AbortFlag = true;
- Resume();
- }
- void TCoroutineChunkSerializer::DoRun() {
- while (!CancelFlag) {
- Y_VERIFY(Event);
- SerializationSuccess = Event->SerializeToArcadiaStream(this);
- Event = nullptr;
- if (!CancelFlag) {
- InnerContext.SwitchTo(BufFeedContext);
- }
- }
- Finished = true;
- InnerContext.SwitchTo(BufFeedContext);
- }
- bool TAllocChunkSerializer::Next(void** pdata, int* psize) {
- if (Backup) {
-
-
- TRope::TIterator iter = Backup.Begin();
- *pdata = const_cast<char*>(iter.ContiguousData());
- *psize = iter.ContiguousSize();
- iter.AdvanceToNextContiguousBlock();
- Buffers->Append(Backup.Extract(Backup.Begin(), iter));
- } else {
-
- auto item = TRopeAlignedBuffer::Allocate(4096);
- *pdata = item->GetBuffer();
- *psize = item->GetCapacity();
- Buffers->Append(TRope(std::move(item)));
- }
- return true;
- }
- void TAllocChunkSerializer::BackUp(int count) {
- Backup.Insert(Backup.Begin(), Buffers->EraseBack(count));
- }
- bool TAllocChunkSerializer::WriteAliasedRaw(const void*, int) {
- Y_VERIFY(false);
- return false;
- }
- bool TAllocChunkSerializer::WriteRope(const TRope *rope) {
- Buffers->Append(TRope(*rope));
- return true;
- }
- bool TAllocChunkSerializer::WriteString(const TString *s) {
- Buffers->Append(*s);
- return true;
- }
- }
|