#pragma once #include #include #include #include namespace NNetliba { class TBlockChain { public: struct TBlock { const char* Data; int Offset, Size; // Offset in whole chain TBlock() : Data(nullptr) , Offset(0) , Size(0) { } TBlock(const char* data, int offset, int sz) : Data(data) , Offset(offset) , Size(sz) { } }; private: typedef TVector TBlockVector; TBlockVector Blocks; int Size; struct TBlockLess { bool operator()(const TBlock& b, int offset) const { return b.Offset < offset; } }; public: TBlockChain() : Size(0) { } void AddBlock(const void* data, int sz) { Blocks.push_back(TBlock((const char*)data, Size, sz)); Size += sz; } int GetSize() const { return Size; } const TBlock& GetBlock(int i) const { return Blocks[i]; } int GetBlockCount() const { return Blocks.ysize(); } int GetBlockIdByOffset(int offset) const { TBlockVector::const_iterator i = LowerBound(Blocks.begin(), Blocks.end(), offset, TBlockLess()); if (i == Blocks.end()) return Blocks.ysize() - 1; if (i->Offset == offset) return (int)(i - Blocks.begin()); return (int)(i - Blocks.begin() - 1); } }; ////////////////////////////////////////////////////////////////////////// class TBlockChainIterator { const TBlockChain& Chain; int Pos, BlockPos, BlockId; bool Failed; public: TBlockChainIterator(const TBlockChain& chain) : Chain(chain) , Pos(0) , BlockPos(0) , BlockId(0) , Failed(false) { } void Read(void* dst, int sz) { char* dstBuf = (char*)dst; while (sz > 0) { if (BlockId >= Chain.GetBlockCount()) { // JACKPOT! fprintf(stderr, "reading beyond chain end: BlockId %d, Chain.GetBlockCount() %d, Pos %d, BlockPos %d\n", BlockId, Chain.GetBlockCount(), Pos, BlockPos); Y_ASSERT(0 && "reading beyond chain end"); memset(dstBuf, 0, sz); Failed = true; return; } const TBlockChain::TBlock& blk = Chain.GetBlock(BlockId); int copySize = Min(blk.Size - BlockPos, sz); memcpy(dstBuf, blk.Data + BlockPos, copySize); dstBuf += copySize; Pos += copySize; BlockPos += copySize; sz -= copySize; if (BlockPos == blk.Size) { BlockPos = 0; ++BlockId; } } } void Seek(int pos) { if (pos < 0 || pos > Chain.GetSize()) { Y_ASSERT(0); Pos = 0; BlockPos = 0; BlockId = 0; return; } BlockId = Chain.GetBlockIdByOffset(pos); const TBlockChain::TBlock& blk = Chain.GetBlock(BlockId); Pos = pos; BlockPos = Pos - blk.Offset; } int GetPos() const { return Pos; } int GetSize() const { return Chain.GetSize(); } bool HasFailed() const { return Failed; } void Fail() { Failed = true; } }; ////////////////////////////////////////////////////////////////////////// class TRopeDataPacket: public TNonCopyable { TBlockChain Chain; TVector Buf; char *Block, *BlockEnd; TList> DataVectors; TIntrusivePtr SharedData; TVector> AttachedStorage; char DefaultBuf[128]; // prevent allocs in most cases enum { N_DEFAULT_BLOCK_SIZE = 1024 }; char* Alloc(int sz) { char* res = nullptr; if (BlockEnd - Block < sz) { int bufSize = Max((int)N_DEFAULT_BLOCK_SIZE, sz); char* newBlock = AllocBuf(bufSize); Block = newBlock; BlockEnd = Block + bufSize; Buf.push_back(newBlock); } res = Block; Block += sz; Y_ASSERT(Block <= BlockEnd); return res; } public: TRopeDataPacket() : Block(DefaultBuf) , BlockEnd(DefaultBuf + Y_ARRAY_SIZE(DefaultBuf)) { } ~TRopeDataPacket() { for (size_t i = 0; i < Buf.size(); ++i) FreeBuf(Buf[i]); } static char* AllocBuf(int sz) { return new char[sz]; } static void FreeBuf(char* buf) { delete[] buf; } // buf - pointer to buffer which will be freed with FreeBuf() // data - pointer to data start within buf // sz - size of useful data void AddBlock(char* buf, const char* data, int sz) { Buf.push_back(buf); Chain.AddBlock(data, sz); } void AddBlock(TThrRefBase* buf, const char* data, int sz) { AttachedStorage.push_back(buf); Chain.AddBlock(data, sz); } // void Write(const void* data, int sz) { char* buf = Alloc(sz); memcpy(buf, data, sz); Chain.AddBlock(buf, sz); } template void Write(const T& data) { Write(&data, sizeof(T)); } //// caller guarantees that data will persist all *this lifetime //// int this case so we don`t have to copy data to locally held buffer //template //void WriteNoCopy(const T *data) //{ // Chain.AddBlock(data, sizeof(T)); //} // write some array like TVector<> //template //void WriteArr(const T &sz) //{ // int n = (int)sz.size(); // Write(n); // if (n > 0) // Write(&sz[0], n * sizeof(sz[0])); //} void WriteStroka(const TString& sz) { int n = (int)sz.size(); Write(n); if (n > 0) Write(sz.c_str(), n * sizeof(sz[0])); } // will take *data ownership, saves copy void WriteDestructive(TVector* data) { int n = data ? data->ysize() : 0; Write(n); if (n > 0) { TVector& local = DataVectors.emplace_back(std::move(*data)); Chain.AddBlock(&local[0], local.ysize()); } } void AttachSharedData(TIntrusivePtr shm) { SharedData = shm; } TSharedMemory* GetSharedData() const { return SharedData.Get(); } const TBlockChain& GetChain() { return Chain; } int GetSize() { return Chain.GetSize(); } }; template inline void ReadArr(TBlockChainIterator* res, T* dst) { int n; res->Read(&n, sizeof(n)); if (n >= 0) { dst->resize(n); if (n > 0) res->Read(&(*dst)[0], n * sizeof((*dst)[0])); } else { dst->resize(0); res->Fail(); } } template <> inline void ReadArr(TBlockChainIterator* res, TString* dst) { int n; res->Read(&n, sizeof(n)); if (n >= 0) { dst->resize(n); if (n > 0) res->Read(dst->begin(), n * sizeof(TString::value_type)); } else { dst->resize(0); res->Fail(); } } // saves on zeroing *dst with yresize() template static void ReadYArr(TBlockChainIterator* res, TVector* dst) { int n; res->Read(&n, sizeof(n)); if (n >= 0) { dst->yresize(n); if (n > 0) res->Read(&(*dst)[0], n * sizeof((*dst)[0])); } else { dst->yresize(0); res->Fail(); } } template static void Read(TBlockChainIterator* res, T* dst) { res->Read(dst, sizeof(T)); } ui32 CalcChecksum(const void* p, int size); ui32 CalcChecksum(const TBlockChain& chain); class TIncrementalChecksumCalcer { i64 TotalSum; int Offset; public: TIncrementalChecksumCalcer() : TotalSum(0) , Offset(0) { } void AddBlock(const void* p, int size); void AddBlockSum(ui32 sum, int size); ui32 CalcChecksum(); static ui32 CalcBlockSum(const void* p, int size); }; inline void AddChain(TIncrementalChecksumCalcer* ics, const TBlockChain& chain) { for (int k = 0; k < chain.GetBlockCount(); ++k) { const TBlockChain::TBlock& blk = chain.GetBlock(k); ics->AddBlock(blk.Data, blk.Size); } } }