123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319 |
- #pragma once
- #include <util/generic/algorithm.h>
- #include <util/generic/list.h>
- #include <util/system/shmat.h>
- #include <util/generic/noncopyable.h>
- namespace NNetliba {
- class TBlockChain {
- public:
- struct TBlock {
- const char* Data;
- int Offset, Size; // Offset in whole chain
- TBlock()
- : Data(nullptr)
- , Offset(0)
- , Size(0)
- {
- }
- TBlock(const char* data, int offset, int sz)
- : Data(data)
- , Offset(offset)
- , Size(sz)
- {
- }
- };
- private:
- typedef TVector<TBlock> TBlockVector;
- TBlockVector Blocks;
- int Size;
- struct TBlockLess {
- bool operator()(const TBlock& b, int offset) const {
- return b.Offset < offset;
- }
- };
- public:
- TBlockChain()
- : Size(0)
- {
- }
- void AddBlock(const void* data, int sz) {
- Blocks.push_back(TBlock((const char*)data, Size, sz));
- Size += sz;
- }
- int GetSize() const {
- return Size;
- }
- const TBlock& GetBlock(int i) const {
- return Blocks[i];
- }
- int GetBlockCount() const {
- return Blocks.ysize();
- }
- int GetBlockIdByOffset(int offset) const {
- TBlockVector::const_iterator i = LowerBound(Blocks.begin(), Blocks.end(), offset, TBlockLess());
- if (i == Blocks.end())
- return Blocks.ysize() - 1;
- if (i->Offset == offset)
- return (int)(i - Blocks.begin());
- return (int)(i - Blocks.begin() - 1);
- }
- };
- //////////////////////////////////////////////////////////////////////////
- class TBlockChainIterator {
- const TBlockChain& Chain;
- int Pos, BlockPos, BlockId;
- bool Failed;
- public:
- TBlockChainIterator(const TBlockChain& chain)
- : Chain(chain)
- , Pos(0)
- , BlockPos(0)
- , BlockId(0)
- , Failed(false)
- {
- }
- void Read(void* dst, int sz) {
- char* dstBuf = (char*)dst;
- while (sz > 0) {
- if (BlockId >= Chain.GetBlockCount()) {
- // JACKPOT!
- fprintf(stderr, "reading beyond chain end: BlockId %d, Chain.GetBlockCount() %d, Pos %d, BlockPos %d\n",
- BlockId, Chain.GetBlockCount(), Pos, BlockPos);
- Y_ASSERT(0 && "reading beyond chain end");
- memset(dstBuf, 0, sz);
- Failed = true;
- return;
- }
- const TBlockChain::TBlock& blk = Chain.GetBlock(BlockId);
- int copySize = Min(blk.Size - BlockPos, sz);
- memcpy(dstBuf, blk.Data + BlockPos, copySize);
- dstBuf += copySize;
- Pos += copySize;
- BlockPos += copySize;
- sz -= copySize;
- if (BlockPos == blk.Size) {
- BlockPos = 0;
- ++BlockId;
- }
- }
- }
- void Seek(int pos) {
- if (pos < 0 || pos > Chain.GetSize()) {
- Y_ASSERT(0);
- Pos = 0;
- BlockPos = 0;
- BlockId = 0;
- return;
- }
- BlockId = Chain.GetBlockIdByOffset(pos);
- const TBlockChain::TBlock& blk = Chain.GetBlock(BlockId);
- Pos = pos;
- BlockPos = Pos - blk.Offset;
- }
- int GetPos() const {
- return Pos;
- }
- int GetSize() const {
- return Chain.GetSize();
- }
- bool HasFailed() const {
- return Failed;
- }
- void Fail() {
- Failed = true;
- }
- };
- //////////////////////////////////////////////////////////////////////////
- class TRopeDataPacket: public TNonCopyable {
- TBlockChain Chain;
- TVector<char*> Buf;
- char *Block, *BlockEnd;
- TList<TVector<char>> DataVectors;
- TIntrusivePtr<TSharedMemory> SharedData;
- TVector<TIntrusivePtr<TThrRefBase>> AttachedStorage;
- char DefaultBuf[128]; // prevent allocs in most cases
- enum {
- N_DEFAULT_BLOCK_SIZE = 1024
- };
- char* Alloc(int sz) {
- char* res = nullptr;
- if (BlockEnd - Block < sz) {
- int bufSize = Max((int)N_DEFAULT_BLOCK_SIZE, sz);
- char* newBlock = AllocBuf(bufSize);
- Block = newBlock;
- BlockEnd = Block + bufSize;
- Buf.push_back(newBlock);
- }
- res = Block;
- Block += sz;
- Y_ASSERT(Block <= BlockEnd);
- return res;
- }
- public:
- TRopeDataPacket()
- : Block(DefaultBuf)
- , BlockEnd(DefaultBuf + Y_ARRAY_SIZE(DefaultBuf))
- {
- }
- ~TRopeDataPacket() {
- for (size_t i = 0; i < Buf.size(); ++i)
- FreeBuf(Buf[i]);
- }
- static char* AllocBuf(int sz) {
- return new char[sz];
- }
- static void FreeBuf(char* buf) {
- delete[] buf;
- }
- // buf - pointer to buffer which will be freed with FreeBuf()
- // data - pointer to data start within buf
- // sz - size of useful data
- void AddBlock(char* buf, const char* data, int sz) {
- Buf.push_back(buf);
- Chain.AddBlock(data, sz);
- }
- void AddBlock(TThrRefBase* buf, const char* data, int sz) {
- AttachedStorage.push_back(buf);
- Chain.AddBlock(data, sz);
- }
- //
- void Write(const void* data, int sz) {
- char* buf = Alloc(sz);
- memcpy(buf, data, sz);
- Chain.AddBlock(buf, sz);
- }
- template <class T>
- void Write(const T& data) {
- Write(&data, sizeof(T));
- }
- //// caller guarantees that data will persist all *this lifetime
- //// int this case so we don`t have to copy data to locally held buffer
- //template<class T>
- //void WriteNoCopy(const T *data)
- //{
- // Chain.AddBlock(data, sizeof(T));
- //}
- // write some array like TVector<>
- //template<class T>
- //void WriteArr(const T &sz)
- //{
- // int n = (int)sz.size();
- // Write(n);
- // if (n > 0)
- // Write(&sz[0], n * sizeof(sz[0]));
- //}
- void WriteStroka(const TString& sz) {
- int n = (int)sz.size();
- Write(n);
- if (n > 0)
- Write(sz.c_str(), n * sizeof(sz[0]));
- }
- // will take *data ownership, saves copy
- void WriteDestructive(TVector<char>* data) {
- int n = data ? data->ysize() : 0;
- Write(n);
- if (n > 0) {
- TVector<char>& local = DataVectors.emplace_back(std::move(*data));
- Chain.AddBlock(&local[0], local.ysize());
- }
- }
- void AttachSharedData(TIntrusivePtr<TSharedMemory> shm) {
- SharedData = shm;
- }
- TSharedMemory* GetSharedData() const {
- return SharedData.Get();
- }
- const TBlockChain& GetChain() {
- return Chain;
- }
- int GetSize() {
- return Chain.GetSize();
- }
- };
- template <class T>
- inline void ReadArr(TBlockChainIterator* res, T* dst) {
- int n;
- res->Read(&n, sizeof(n));
- if (n >= 0) {
- dst->resize(n);
- if (n > 0)
- res->Read(&(*dst)[0], n * sizeof((*dst)[0]));
- } else {
- dst->resize(0);
- res->Fail();
- }
- }
- template <>
- inline void ReadArr<TString>(TBlockChainIterator* res, TString* dst) {
- int n;
- res->Read(&n, sizeof(n));
- if (n >= 0) {
- dst->resize(n);
- if (n > 0)
- res->Read(dst->begin(), n * sizeof(TString::value_type));
- } else {
- dst->resize(0);
- res->Fail();
- }
- }
- // saves on zeroing *dst with yresize()
- template <class T>
- static void ReadYArr(TBlockChainIterator* res, TVector<T>* dst) {
- int n;
- res->Read(&n, sizeof(n));
- if (n >= 0) {
- dst->yresize(n);
- if (n > 0)
- res->Read(&(*dst)[0], n * sizeof((*dst)[0]));
- } else {
- dst->yresize(0);
- res->Fail();
- }
- }
- template <class T>
- static void Read(TBlockChainIterator* res, T* dst) {
- res->Read(dst, sizeof(T));
- }
- ui32 CalcChecksum(const void* p, int size);
- ui32 CalcChecksum(const TBlockChain& chain);
- class TIncrementalChecksumCalcer {
- i64 TotalSum;
- int Offset;
- public:
- TIncrementalChecksumCalcer()
- : TotalSum(0)
- , Offset(0)
- {
- }
- void AddBlock(const void* p, int size);
- void AddBlockSum(ui32 sum, int size);
- ui32 CalcChecksum();
- static ui32 CalcBlockSum(const void* p, int size);
- };
- inline void AddChain(TIncrementalChecksumCalcer* ics, const TBlockChain& chain) {
- for (int k = 0; k < chain.GetBlockCount(); ++k) {
- const TBlockChain::TBlock& blk = chain.GetBlock(k);
- ics->AddBlock(blk.Data, blk.Size);
- }
- }
- }
|