123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654 |
- #pragma once
- #include "event.h"
- #include "event_load.h"
- #include <google/protobuf/io/zero_copy_stream.h>
- #include <google/protobuf/arena.h>
- #include <library/cpp/actors/protos/actors.pb.h>
- #include <library/cpp/containers/stack_vector/stack_vec.h>
- #include <util/generic/deque.h>
- #include <util/system/context.h>
- #include <util/system/filemap.h>
- #include <util/string/builder.h>
- #include <util/thread/lfstack.h>
- #include <array>
- #include <span>
- // enable only when patch with this macro was successfully deployed
- #define USE_EXTENDED_PAYLOAD_FORMAT 0
- namespace NActors {
- class TRopeStream : public NProtoBuf::io::ZeroCopyInputStream {
- TRope::TConstIterator Iter;
- const size_t Size;
- public:
- TRopeStream(TRope::TConstIterator iter, size_t size)
- : Iter(iter)
- , Size(size)
- {}
- bool Next(const void** data, int* size) override;
- void BackUp(int count) override;
- bool Skip(int count) override;
- int64_t ByteCount() const override {
- return TotalByteCount;
- }
- private:
- int64_t TotalByteCount = 0;
- };
- class TChunkSerializer : public NProtoBuf::io::ZeroCopyOutputStream {
- public:
- TChunkSerializer() = default;
- virtual ~TChunkSerializer() = default;
- virtual bool WriteRope(const TRope *rope) = 0;
- virtual bool WriteString(const TString *s) = 0;
- };
- class TAllocChunkSerializer final : public TChunkSerializer {
- public:
- bool Next(void** data, int* size) override;
- void BackUp(int count) override;
- int64_t ByteCount() const override {
- return Buffers->GetSize();
- }
- bool WriteAliasedRaw(const void* data, int size) override;
- // WARNING: these methods require owner to retain ownership and immutability of passed objects
- bool WriteRope(const TRope *rope) override;
- bool WriteString(const TString *s) override;
- inline TIntrusivePtr<TEventSerializedData> Release(TEventSerializationInfo&& serializationInfo) {
- Buffers->SetSerializationInfo(std::move(serializationInfo));
- return std::move(Buffers);
- }
- protected:
- TIntrusivePtr<TEventSerializedData> Buffers = new TEventSerializedData;
- TRope Backup;
- };
- class TCoroutineChunkSerializer final : public TChunkSerializer, protected ITrampoLine {
- public:
- using TChunk = std::pair<const char*, size_t>;
- TCoroutineChunkSerializer();
- ~TCoroutineChunkSerializer();
- void SetSerializingEvent(const IEventBase *event);
- void Abort();
- std::span<TChunk> FeedBuf(void* data, size_t size);
- bool IsComplete() const {
- return !Event;
- }
- bool IsSuccessfull() const {
- return SerializationSuccess;
- }
- const IEventBase *GetCurrentEvent() const {
- return Event;
- }
- bool Next(void** data, int* size) override;
- void BackUp(int count) override;
- int64_t ByteCount() const override {
- return TotalSerializedDataSize;
- }
- bool WriteAliasedRaw(const void* data, int size) override;
- bool AllowsAliasing() const override;
- bool WriteRope(const TRope *rope) override;
- bool WriteString(const TString *s) override;
- protected:
- void DoRun() override;
- void Resume();
- void Produce(const void *data, size_t size);
- i64 TotalSerializedDataSize;
- TMappedAllocation Stack;
- TContClosure SelfClosure;
- TContMachineContext InnerContext;
- TContMachineContext *BufFeedContext = nullptr;
- char *BufferPtr;
- size_t SizeRemain;
- std::vector<TChunk> Chunks;
- const IEventBase *Event = nullptr;
- bool CancelFlag = false;
- bool AbortFlag;
- bool SerializationSuccess;
- bool Finished = false;
- };
- struct TProtoArenaHolder : public TAtomicRefCount<TProtoArenaHolder> {
- google::protobuf::Arena Arena;
- TProtoArenaHolder() = default;
- explicit TProtoArenaHolder(const google::protobuf::ArenaOptions& arenaOptions)
- : Arena(arenaOptions)
- {};
- google::protobuf::Arena* Get() {
- return &Arena;
- }
- template<typename TRecord>
- TRecord* Allocate() {
- return google::protobuf::Arena::CreateMessage<TRecord>(&Arena);
- }
- };
- static const size_t EventMaxByteSize = 140 << 20; // (140MB)
- template <typename TEv, typename TRecord /*protobuf record*/, ui32 TEventType, typename TRecHolder>
- class TEventPBBase: public TEventBase<TEv, TEventType> , public TRecHolder {
- // a vector of data buffers referenced by record; if filled, then extended serialization mechanism applies
- TVector<TRope> Payload;
- size_t TotalPayloadSize = 0;
- public:
- using TRecHolder::Record;
- public:
- using ProtoRecordType = TRecord;
- TEventPBBase() = default;
- explicit TEventPBBase(const TRecord& rec)
- : TRecHolder(rec)
- {}
- explicit TEventPBBase(TRecord&& rec)
- : TRecHolder(rec)
- {}
- explicit TEventPBBase(TIntrusivePtr<TProtoArenaHolder> arena)
- : TRecHolder(std::move(arena))
- {}
- TString ToStringHeader() const override {
- return Record.GetTypeName();
- }
- TString ToString() const override {
- TStringStream ss;
- ss << ToStringHeader() << " " << Record.ShortDebugString();
- return ss.Str();
- }
- bool IsSerializable() const override {
- return true;
- }
- bool SerializeToArcadiaStream(TChunkSerializer* chunker) const override {
- return SerializeToArcadiaStreamImpl(chunker, TString());
- }
- ui32 CalculateSerializedSize() const override {
- ssize_t result = Record.ByteSize();
- if (result >= 0 && Payload) {
- ++result; // marker
- char buf[MaxNumberBytes];
- result += SerializeNumber(Payload.size(), buf);
- for (const TRope& rope : Payload) {
- result += SerializeNumber(rope.GetSize(), buf);
- }
- result += TotalPayloadSize;
- }
- return result;
- }
- static IEventBase* Load(TEventSerializedData *input) {
- THolder<TEventPBBase> ev(new TEv());
- if (!input->GetSize()) {
- Y_PROTOBUF_SUPPRESS_NODISCARD ev->Record.ParseFromString(TString());
- } else {
- TRope::TConstIterator iter = input->GetBeginIter();
- ui64 size = input->GetSize();
- if (const auto& info = input->GetSerializationInfo(); info.IsExtendedFormat) {
- // check marker
- if (!iter.Valid() || (*iter.ContiguousData() != PayloadMarker && *iter.ContiguousData() != ExtendedPayloadMarker)) {
- Y_FAIL("invalid event");
- }
- const bool dataIsSeparate = *iter.ContiguousData() == ExtendedPayloadMarker; // ropes go after sizes
- auto fetchRope = [&](size_t len) {
- TRope::TConstIterator begin = iter;
- iter += len;
- size -= len;
- ev->Payload.emplace_back(begin, iter);
- ev->TotalPayloadSize += len;
- };
- // skip marker
- iter += 1;
- --size;
- // parse number of payload ropes
- size_t numRopes = DeserializeNumber(iter, size);
- if (numRopes == Max<size_t>()) {
- Y_FAIL("invalid event");
- }
- TStackVec<size_t, 16> ropeLens;
- if (dataIsSeparate) {
- ropeLens.reserve(numRopes);
- }
- while (numRopes--) {
- // parse length of the rope
- const size_t len = DeserializeNumber(iter, size);
- if (len == Max<size_t>() || size < len) {
- Y_FAIL("invalid event len# %zu size# %" PRIu64, len, size);
- }
- // extract the rope
- if (dataIsSeparate) {
- ropeLens.push_back(len);
- } else {
- fetchRope(len);
- }
- }
- for (size_t len : ropeLens) {
- fetchRope(len);
- }
- }
- // parse the protobuf
- TRopeStream stream(iter, size);
- if (!ev->Record.ParseFromZeroCopyStream(&stream)) {
- Y_FAIL("Failed to parse protobuf event type %" PRIu32 " class %s", TEventType, TypeName(ev->Record).data());
- }
- }
- ev->CachedByteSize = input->GetSize();
- return ev.Release();
- }
- size_t GetCachedByteSize() const {
- if (CachedByteSize == 0) {
- CachedByteSize = CalculateSerializedSize();
- }
- return CachedByteSize;
- }
- ui32 CalculateSerializedSizeCached() const override {
- return GetCachedByteSize();
- }
- void InvalidateCachedByteSize() {
- CachedByteSize = 0;
- }
- TEventSerializationInfo CreateSerializationInfo() const override {
- return CreateSerializationInfoImpl(0);
- }
- bool AllowExternalDataChannel() const {
- return TotalPayloadSize >= 4096;
- }
- public:
- void ReservePayload(size_t size) {
- Payload.reserve(size);
- }
- ui32 AddPayload(TRope&& rope) {
- const ui32 id = Payload.size();
- TotalPayloadSize += rope.size();
- Payload.push_back(std::move(rope));
- InvalidateCachedByteSize();
- return id;
- }
- const TRope& GetPayload(ui32 id) const {
- Y_VERIFY(id < Payload.size());
- return Payload[id];
- }
- ui32 GetPayloadCount() const {
- return Payload.size();
- }
- void StripPayload() {
- Payload.clear();
- TotalPayloadSize = 0;
- }
- protected:
- TEventSerializationInfo CreateSerializationInfoImpl(size_t preserializedSize) const {
- TEventSerializationInfo info;
- info.IsExtendedFormat = static_cast<bool>(Payload);
- if (static_cast<const TEv&>(*this).AllowExternalDataChannel()) {
- if (Payload) {
- char temp[MaxNumberBytes];
- #if USE_EXTENDED_PAYLOAD_FORMAT
- size_t headerLen = 1 + SerializeNumber(Payload.size(), temp);
- for (const TRope& rope : Payload) {
- headerLen += SerializeNumber(rope.size(), temp);
- }
- info.Sections.push_back(TEventSectionInfo{0, headerLen, 0, 0, true});
- for (const TRope& rope : Payload) {
- info.Sections.push_back(TEventSectionInfo{0, rope.size(), 0, 0, false});
- }
- #else
- info.Sections.push_back(TEventSectionInfo{0, 1 + SerializeNumber(Payload.size(), temp), 0, 0, true}); // payload marker and rope count
- for (const TRope& rope : Payload) {
- const size_t ropeSize = rope.GetSize();
- info.Sections.back().Size += SerializeNumber(ropeSize, temp);
- info.Sections.push_back(TEventSectionInfo{0, ropeSize, 0, 0, false}); // data as a separate section
- }
- #endif
- }
- const size_t byteSize = Max<ssize_t>(0, Record.ByteSize()) + preserializedSize;
- info.Sections.push_back(TEventSectionInfo{0, byteSize, 0, 0, true}); // protobuf itself
- #ifndef NDEBUG
- size_t total = 0;
- for (const auto& section : info.Sections) {
- total += section.Size;
- }
- size_t serialized = CalculateSerializedSize();
- Y_VERIFY(total == serialized, "total# %zu serialized# %zu byteSize# %zd Payload.size# %zu", total,
- serialized, byteSize, Payload.size());
- #endif
- }
- return info;
- }
- bool SerializeToArcadiaStreamImpl(TChunkSerializer* chunker, const TString& preserialized) const {
- // serialize payload first
- if (Payload) {
- void *data;
- int size = 0;
- auto append = [&](const char *p, size_t len) {
- while (len) {
- if (size) {
- const size_t numBytesToCopy = std::min<size_t>(size, len);
- memcpy(data, p, numBytesToCopy);
- data = static_cast<char*>(data) + numBytesToCopy;
- size -= numBytesToCopy;
- p += numBytesToCopy;
- len -= numBytesToCopy;
- } else if (!chunker->Next(&data, &size)) {
- return false;
- }
- }
- return true;
- };
- auto appendNumber = [&](size_t number) {
- char buf[MaxNumberBytes];
- return append(buf, SerializeNumber(number, buf));
- };
- #if USE_EXTENDED_PAYLOAD_FORMAT
- char marker = ExtendedPayloadMarker;
- append(&marker, 1);
- if (!appendNumber(Payload.size())) {
- return false;
- }
- for (const TRope& rope : Payload) {
- if (!appendNumber(rope.GetSize())) {
- return false;
- }
- }
- if (size) {
- chunker->BackUp(std::exchange(size, 0));
- }
- for (const TRope& rope : Payload) {
- if (!chunker->WriteRope(&rope)) {
- return false;
- }
- }
- #else
- char marker = PayloadMarker;
- append(&marker, 1);
- if (!appendNumber(Payload.size())) {
- return false;
- }
- for (const TRope& rope : Payload) {
- if (!appendNumber(rope.GetSize())) {
- return false;
- }
- if (rope) {
- if (size) {
- chunker->BackUp(std::exchange(size, 0));
- }
- if (!chunker->WriteRope(&rope)) {
- return false;
- }
- }
- }
- if (size) {
- chunker->BackUp(size);
- }
- #endif
- }
- if (preserialized && !chunker->WriteString(&preserialized)) {
- return false;
- }
- return Record.SerializeToZeroCopyStream(chunker);
- }
- protected:
- mutable size_t CachedByteSize = 0;
- static constexpr char ExtendedPayloadMarker = 0x06;
- static constexpr char PayloadMarker = 0x07;
- static constexpr size_t MaxNumberBytes = (sizeof(size_t) * CHAR_BIT + 6) / 7;
- static size_t SerializeNumber(size_t num, char *buffer) {
- char *begin = buffer;
- do {
- *buffer++ = (num & 0x7F) | (num >= 128 ? 0x80 : 0x00);
- num >>= 7;
- } while (num);
- return buffer - begin;
- }
- static size_t DeserializeNumber(const char **ptr, const char *end) {
- const char *p = *ptr;
- size_t res = 0;
- size_t offset = 0;
- for (;;) {
- if (p == end) {
- return Max<size_t>();
- }
- const char byte = *p++;
- res |= (static_cast<size_t>(byte) & 0x7F) << offset;
- offset += 7;
- if (!(byte & 0x80)) {
- break;
- }
- }
- *ptr = p;
- return res;
- }
- static size_t DeserializeNumber(TRope::TConstIterator& iter, ui64& size) {
- size_t res = 0;
- size_t offset = 0;
- for (;;) {
- if (!iter.Valid()) {
- return Max<size_t>();
- }
- const char byte = *iter.ContiguousData();
- iter += 1;
- --size;
- res |= (static_cast<size_t>(byte) & 0x7F) << offset;
- offset += 7;
- if (!(byte & 0x80)) {
- break;
- }
- }
- return res;
- }
- };
- // Protobuf record not using arena
- template <typename TRecord>
- struct TRecordHolder {
- TRecord Record;
- TRecordHolder() = default;
- TRecordHolder(const TRecord& rec)
- : Record(rec)
- {}
- TRecordHolder(TRecord&& rec)
- : Record(std::move(rec))
- {}
- };
- // Protobuf arena and a record allocated on it
- template <typename TRecord, size_t InitialBlockSize, size_t MaxBlockSize>
- struct TArenaRecordHolder {
- TIntrusivePtr<TProtoArenaHolder> Arena;
- TRecord& Record;
- // Arena depends on block size to be a multiple of 8 for correctness
- // FIXME: uncomment these asserts when code is synchronized between repositories
- // static_assert((InitialBlockSize & 7) == 0, "Misaligned InitialBlockSize");
- // static_assert((MaxBlockSize & 7) == 0, "Misaligned MaxBlockSize");
- static const google::protobuf::ArenaOptions GetArenaOptions() {
- google::protobuf::ArenaOptions opts;
- opts.initial_block_size = InitialBlockSize;
- opts.max_block_size = MaxBlockSize;
- return opts;
- }
- TArenaRecordHolder()
- : Arena(MakeIntrusive<TProtoArenaHolder>(GetArenaOptions()))
- , Record(*Arena->Allocate<TRecord>())
- {};
- TArenaRecordHolder(const TRecord& rec)
- : TArenaRecordHolder()
- {
- Record.CopyFrom(rec);
- }
- // not allowed to move from another protobuf, it's a potenial copying
- TArenaRecordHolder(TRecord&& rec) = delete;
- TArenaRecordHolder(TIntrusivePtr<TProtoArenaHolder> arena)
- : Arena(std::move(arena))
- , Record(*Arena->Allocate<TRecord>())
- {};
- };
- template <typename TEv, typename TRecord, ui32 TEventType>
- class TEventPB : public TEventPBBase<TEv, TRecord, TEventType, TRecordHolder<TRecord> > {
- typedef TEventPBBase<TEv, TRecord, TEventType, TRecordHolder<TRecord> > TPbBase;
- // NOTE: No extra fields allowed: TEventPB must be a "template typedef"
- public:
- using TPbBase::TPbBase;
- };
- template <typename TEv, typename TRecord, ui32 TEventType, size_t InitialBlockSize = 512, size_t MaxBlockSize = 16*1024>
- using TEventPBWithArena = TEventPBBase<TEv, TRecord, TEventType, TArenaRecordHolder<TRecord, InitialBlockSize, MaxBlockSize> >;
- template <typename TEv, typename TRecord, ui32 TEventType>
- class TEventShortDebugPB: public TEventPB<TEv, TRecord, TEventType> {
- public:
- using TBase = TEventPB<TEv, TRecord, TEventType>;
- TEventShortDebugPB() = default;
- explicit TEventShortDebugPB(const TRecord& rec)
- : TBase(rec)
- {
- }
- explicit TEventShortDebugPB(TRecord&& rec)
- : TBase(std::move(rec))
- {
- }
- TString ToString() const override {
- return TypeName<TEv>() + " { " + TBase::Record.ShortDebugString() + " }";
- }
- };
- template <typename TEv, typename TRecord, ui32 TEventType>
- class TEventPreSerializedPB: public TEventPB<TEv, TRecord, TEventType> {
- protected:
- using TBase = TEventPB<TEv, TRecord, TEventType>;
- using TSelf = TEventPreSerializedPB<TEv, TRecord, TEventType>;
- using TBase::Record;
- public:
- TString PreSerializedData; // already serialized PB data (using message::SerializeToString)
- TEventPreSerializedPB() = default;
- explicit TEventPreSerializedPB(const TRecord& rec)
- : TBase(rec)
- {
- }
- explicit TEventPreSerializedPB(TRecord&& rec)
- : TBase(std::move(rec))
- {
- }
- // when remote event received locally this method will merge preserialized data
- const TRecord& GetRecord() {
- TRecord& base(TBase::Record);
- if (!PreSerializedData.empty()) {
- TRecord copy;
- Y_PROTOBUF_SUPPRESS_NODISCARD copy.ParseFromString(PreSerializedData);
- copy.MergeFrom(base);
- base.Swap(©);
- PreSerializedData.clear();
- }
- return TBase::Record;
- }
- const TRecord& GetRecord() const {
- return const_cast<TSelf*>(this)->GetRecord();
- }
- TRecord* MutableRecord() {
- GetRecord(); // Make sure PreSerializedData is parsed
- return &(TBase::Record);
- }
- TString ToString() const override {
- return GetRecord().ShortDebugString();
- }
- bool SerializeToArcadiaStream(TChunkSerializer* chunker) const override {
- return TBase::SerializeToArcadiaStreamImpl(chunker, PreSerializedData);
- }
- ui32 CalculateSerializedSize() const override {
- return PreSerializedData.size() + TBase::CalculateSerializedSize();
- }
- size_t GetCachedByteSize() const {
- return PreSerializedData.size() + TBase::GetCachedByteSize();
- }
- ui32 CalculateSerializedSizeCached() const override {
- return GetCachedByteSize();
- }
- TEventSerializationInfo CreateSerializationInfo() const override {
- return TBase::CreateSerializationInfoImpl(PreSerializedData.size());
- }
- };
- inline TActorId ActorIdFromProto(const NActorsProto::TActorId& actorId) {
- return TActorId(actorId.GetRawX1(), actorId.GetRawX2());
- }
- inline void ActorIdToProto(const TActorId& src, NActorsProto::TActorId* dest) {
- Y_VERIFY_DEBUG(dest);
- dest->SetRawX1(src.RawX1());
- dest->SetRawX2(src.RawX2());
- }
- }
|