#pragma once #include "defs.h" #include #include #include #include namespace NKikimr { namespace NMiniKQL { class TPagedBuffer; class TBufferPage : private TNonCopyable { friend class TPagedBuffer; static const size_t PageCapacity; public: static const size_t PageAllocSize = 128 * 1024; TBufferPage() = default; ~TBufferPage() = default; inline const TBufferPage* Next() const { return Next_; } inline TBufferPage* Next() { return Next_; } inline size_t Size() const { return Size_; } inline char* Data() { return reinterpret_cast(this + 1); } inline const char* Data() const { return reinterpret_cast(this + 1); } inline void Clear() { Size_ = 0; } private: TBufferPage* Next_ = nullptr; size_t Size_ = 0; static TBufferPage* Allocate(); static void Free(TBufferPage* page); static inline const TBufferPage* GetPage(const char* data) { Y_DEBUG_ABORT_UNLESS(data); return reinterpret_cast(data - sizeof(TBufferPage)); } static inline TBufferPage* GetPage(char* data) { Y_DEBUG_ABORT_UNLESS(data); return reinterpret_cast(data - sizeof(TBufferPage)); } }; class TPagedBuffer : private TNonCopyable { public: using TPtr = std::shared_ptr; using TConstPtr = std::shared_ptr; TPagedBuffer() = default; ~TPagedBuffer() { if (Head_) { TBufferPage* curr = TBufferPage::GetPage(Head_); while (curr) { auto drop = curr; curr = curr->Next_; TBufferPage::Free(drop); } } } template inline void ForEachPage(TFunc f) const { if (!Head_) { return; } const TBufferPage* head = TBufferPage::GetPage(Head_); auto page = head; auto end = TBufferPage::GetPage(Tail_); while (page) { const char* src; size_t len; if (page == end) { src = Tail_; len = TailSize_; } else { src = page->Data(); len = page->Size(); } if (page == head) { src += HeadReserve_; len -= HeadReserve_; } if (len) { f(src, len); } page = (page == end) ? nullptr : page->Next(); } } inline size_t Size() const { // + (Tail_ ? TailSize_ : 0); size_t sizeWithReserve = ClosedPagesSize_ + ((-size_t(Tail_ != nullptr)) & TailSize_); Y_DEBUG_ABORT_UNLESS(sizeWithReserve >= HeadReserve_); return sizeWithReserve - HeadReserve_; } template inline void CopyTo(TContainer& out) const { ForEachPage([&out](const char* data, size_t len) { out.insert(out.end(), data, data + len); }); } inline void CopyTo(char* dst) const { ForEachPage([&dst](const char* data, size_t len) { std::memcpy(dst, data, len); dst += len; }); } inline void CopyTo(IOutputStream& out) const { ForEachPage([&out](const char* data, size_t len) { out.Write(data, len); }); } size_t ReservedHeaderSize() const { return HeadReserve_; } inline void ReserveHeader(size_t len) { Y_DEBUG_ABORT_UNLESS(len > 0); Y_DEBUG_ABORT_UNLESS(Head_ == Tail_); Y_DEBUG_ABORT_UNLESS(HeadReserve_ == 0); Advance(len); HeadReserve_ = len; } char* Header(size_t len) { if (len > HeadReserve_) { return nullptr; } Y_DEBUG_ABORT_UNLESS(Head_); HeadReserve_ -= len; return Head_ + HeadReserve_; } // buffer-style operations with last page inline char* Pos() const { Y_DEBUG_ABORT_UNLESS(Tail_); return Tail_ + TailSize_; } inline void Clear() { Tail_ = Head_; ClosedPagesSize_ = HeadReserve_ = 0; // = Tail_ ? 0 : TBufferPage::PageAllocSize; TailSize_ = (-size_t(Tail_ == nullptr)) & TBufferPage::PageCapacity; } inline void EraseBack(size_t len) { Y_DEBUG_ABORT_UNLESS(Tail_ && TailSize_ >= len); TailSize_ -= len; } inline void Advance(size_t len) { if (Y_LIKELY(TailSize_ + len <= TBufferPage::PageCapacity)) { TailSize_ += len; return; } MKQL_ENSURE(len <= TBufferPage::PageCapacity, "Advance() size too big"); AppendPage(); TailSize_ = len; } inline void Append(char c) { Advance(1); *(Pos() - 1) = c; } inline void Append(const char* data, size_t size) { while (size) { if (TailSize_ == TBufferPage::PageCapacity) { AppendPage(); } Y_DEBUG_ABORT_UNLESS(TailSize_ < TBufferPage::PageCapacity); size_t avail = TBufferPage::PageCapacity - TailSize_; size_t chunk = std::min(avail, size); std::memcpy(Pos(), data, chunk); TailSize_ += chunk; data += chunk; size -= chunk; } } static NYql::TChunkedBuffer AsChunkedBuffer(const TConstPtr& buf); private: void AppendPage(); char* Head_ = nullptr; char* Tail_ = nullptr; // TailSize_ is initialized as if last page is full, this way we can simplifiy check in Advance() size_t TailSize_ = TBufferPage::PageCapacity; size_t HeadReserve_ = 0; size_t ClosedPagesSize_ = 0; }; } // NMiniKQL } // NKikimr