block_chain.h 9.4 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319
  1. #pragma once
  2. #include <util/generic/algorithm.h>
  3. #include <util/generic/list.h>
  4. #include <util/system/shmat.h>
  5. #include <util/generic/noncopyable.h>
  6. namespace NNetliba {
  7. class TBlockChain {
  8. public:
  9. struct TBlock {
  10. const char* Data;
  11. int Offset, Size; // Offset in whole chain
  12. TBlock()
  13. : Data(nullptr)
  14. , Offset(0)
  15. , Size(0)
  16. {
  17. }
  18. TBlock(const char* data, int offset, int sz)
  19. : Data(data)
  20. , Offset(offset)
  21. , Size(sz)
  22. {
  23. }
  24. };
  25. private:
  26. typedef TVector<TBlock> TBlockVector;
  27. TBlockVector Blocks;
  28. int Size;
  29. struct TBlockLess {
  30. bool operator()(const TBlock& b, int offset) const {
  31. return b.Offset < offset;
  32. }
  33. };
  34. public:
  35. TBlockChain()
  36. : Size(0)
  37. {
  38. }
  39. void AddBlock(const void* data, int sz) {
  40. Blocks.push_back(TBlock((const char*)data, Size, sz));
  41. Size += sz;
  42. }
  43. int GetSize() const {
  44. return Size;
  45. }
  46. const TBlock& GetBlock(int i) const {
  47. return Blocks[i];
  48. }
  49. int GetBlockCount() const {
  50. return Blocks.ysize();
  51. }
  52. int GetBlockIdByOffset(int offset) const {
  53. TBlockVector::const_iterator i = LowerBound(Blocks.begin(), Blocks.end(), offset, TBlockLess());
  54. if (i == Blocks.end())
  55. return Blocks.ysize() - 1;
  56. if (i->Offset == offset)
  57. return (int)(i - Blocks.begin());
  58. return (int)(i - Blocks.begin() - 1);
  59. }
  60. };
  61. //////////////////////////////////////////////////////////////////////////
  62. class TBlockChainIterator {
  63. const TBlockChain& Chain;
  64. int Pos, BlockPos, BlockId;
  65. bool Failed;
  66. public:
  67. TBlockChainIterator(const TBlockChain& chain)
  68. : Chain(chain)
  69. , Pos(0)
  70. , BlockPos(0)
  71. , BlockId(0)
  72. , Failed(false)
  73. {
  74. }
  75. void Read(void* dst, int sz) {
  76. char* dstBuf = (char*)dst;
  77. while (sz > 0) {
  78. if (BlockId >= Chain.GetBlockCount()) {
  79. // JACKPOT!
  80. fprintf(stderr, "reading beyond chain end: BlockId %d, Chain.GetBlockCount() %d, Pos %d, BlockPos %d\n",
  81. BlockId, Chain.GetBlockCount(), Pos, BlockPos);
  82. Y_ASSERT(0 && "reading beyond chain end");
  83. memset(dstBuf, 0, sz);
  84. Failed = true;
  85. return;
  86. }
  87. const TBlockChain::TBlock& blk = Chain.GetBlock(BlockId);
  88. int copySize = Min(blk.Size - BlockPos, sz);
  89. memcpy(dstBuf, blk.Data + BlockPos, copySize);
  90. dstBuf += copySize;
  91. Pos += copySize;
  92. BlockPos += copySize;
  93. sz -= copySize;
  94. if (BlockPos == blk.Size) {
  95. BlockPos = 0;
  96. ++BlockId;
  97. }
  98. }
  99. }
  100. void Seek(int pos) {
  101. if (pos < 0 || pos > Chain.GetSize()) {
  102. Y_ASSERT(0);
  103. Pos = 0;
  104. BlockPos = 0;
  105. BlockId = 0;
  106. return;
  107. }
  108. BlockId = Chain.GetBlockIdByOffset(pos);
  109. const TBlockChain::TBlock& blk = Chain.GetBlock(BlockId);
  110. Pos = pos;
  111. BlockPos = Pos - blk.Offset;
  112. }
  113. int GetPos() const {
  114. return Pos;
  115. }
  116. int GetSize() const {
  117. return Chain.GetSize();
  118. }
  119. bool HasFailed() const {
  120. return Failed;
  121. }
  122. void Fail() {
  123. Failed = true;
  124. }
  125. };
  126. //////////////////////////////////////////////////////////////////////////
  127. class TRopeDataPacket: public TNonCopyable {
  128. TBlockChain Chain;
  129. TVector<char*> Buf;
  130. char *Block, *BlockEnd;
  131. TList<TVector<char>> DataVectors;
  132. TIntrusivePtr<TSharedMemory> SharedData;
  133. TVector<TIntrusivePtr<TThrRefBase>> AttachedStorage;
  134. char DefaultBuf[128]; // prevent allocs in most cases
  135. enum {
  136. N_DEFAULT_BLOCK_SIZE = 1024
  137. };
  138. char* Alloc(int sz) {
  139. char* res = nullptr;
  140. if (BlockEnd - Block < sz) {
  141. int bufSize = Max((int)N_DEFAULT_BLOCK_SIZE, sz);
  142. char* newBlock = AllocBuf(bufSize);
  143. Block = newBlock;
  144. BlockEnd = Block + bufSize;
  145. Buf.push_back(newBlock);
  146. }
  147. res = Block;
  148. Block += sz;
  149. Y_ASSERT(Block <= BlockEnd);
  150. return res;
  151. }
  152. public:
  153. TRopeDataPacket()
  154. : Block(DefaultBuf)
  155. , BlockEnd(DefaultBuf + Y_ARRAY_SIZE(DefaultBuf))
  156. {
  157. }
  158. ~TRopeDataPacket() {
  159. for (size_t i = 0; i < Buf.size(); ++i)
  160. FreeBuf(Buf[i]);
  161. }
  162. static char* AllocBuf(int sz) {
  163. return new char[sz];
  164. }
  165. static void FreeBuf(char* buf) {
  166. delete[] buf;
  167. }
  168. // buf - pointer to buffer which will be freed with FreeBuf()
  169. // data - pointer to data start within buf
  170. // sz - size of useful data
  171. void AddBlock(char* buf, const char* data, int sz) {
  172. Buf.push_back(buf);
  173. Chain.AddBlock(data, sz);
  174. }
  175. void AddBlock(TThrRefBase* buf, const char* data, int sz) {
  176. AttachedStorage.push_back(buf);
  177. Chain.AddBlock(data, sz);
  178. }
  179. //
  180. void Write(const void* data, int sz) {
  181. char* buf = Alloc(sz);
  182. memcpy(buf, data, sz);
  183. Chain.AddBlock(buf, sz);
  184. }
  185. template <class T>
  186. void Write(const T& data) {
  187. Write(&data, sizeof(T));
  188. }
  189. //// caller guarantees that data will persist all *this lifetime
  190. //// int this case so we don`t have to copy data to locally held buffer
  191. //template<class T>
  192. //void WriteNoCopy(const T *data)
  193. //{
  194. // Chain.AddBlock(data, sizeof(T));
  195. //}
  196. // write some array like TVector<>
  197. //template<class T>
  198. //void WriteArr(const T &sz)
  199. //{
  200. // int n = (int)sz.size();
  201. // Write(n);
  202. // if (n > 0)
  203. // Write(&sz[0], n * sizeof(sz[0]));
  204. //}
  205. void WriteStroka(const TString& sz) {
  206. int n = (int)sz.size();
  207. Write(n);
  208. if (n > 0)
  209. Write(sz.c_str(), n * sizeof(sz[0]));
  210. }
  211. // will take *data ownership, saves copy
  212. void WriteDestructive(TVector<char>* data) {
  213. int n = data ? data->ysize() : 0;
  214. Write(n);
  215. if (n > 0) {
  216. TVector<char>& local = DataVectors.emplace_back(std::move(*data));
  217. Chain.AddBlock(&local[0], local.ysize());
  218. }
  219. }
  220. void AttachSharedData(TIntrusivePtr<TSharedMemory> shm) {
  221. SharedData = shm;
  222. }
  223. TSharedMemory* GetSharedData() const {
  224. return SharedData.Get();
  225. }
  226. const TBlockChain& GetChain() {
  227. return Chain;
  228. }
  229. int GetSize() {
  230. return Chain.GetSize();
  231. }
  232. };
  233. template <class T>
  234. inline void ReadArr(TBlockChainIterator* res, T* dst) {
  235. int n;
  236. res->Read(&n, sizeof(n));
  237. if (n >= 0) {
  238. dst->resize(n);
  239. if (n > 0)
  240. res->Read(&(*dst)[0], n * sizeof((*dst)[0]));
  241. } else {
  242. dst->resize(0);
  243. res->Fail();
  244. }
  245. }
  246. template <>
  247. inline void ReadArr<TString>(TBlockChainIterator* res, TString* dst) {
  248. int n;
  249. res->Read(&n, sizeof(n));
  250. if (n >= 0) {
  251. dst->resize(n);
  252. if (n > 0)
  253. res->Read(dst->begin(), n * sizeof(TString::value_type));
  254. } else {
  255. dst->resize(0);
  256. res->Fail();
  257. }
  258. }
  259. // saves on zeroing *dst with yresize()
  260. template <class T>
  261. static void ReadYArr(TBlockChainIterator* res, TVector<T>* dst) {
  262. int n;
  263. res->Read(&n, sizeof(n));
  264. if (n >= 0) {
  265. dst->yresize(n);
  266. if (n > 0)
  267. res->Read(&(*dst)[0], n * sizeof((*dst)[0]));
  268. } else {
  269. dst->yresize(0);
  270. res->Fail();
  271. }
  272. }
  273. template <class T>
  274. static void Read(TBlockChainIterator* res, T* dst) {
  275. res->Read(dst, sizeof(T));
  276. }
  277. ui32 CalcChecksum(const void* p, int size);
  278. ui32 CalcChecksum(const TBlockChain& chain);
  279. class TIncrementalChecksumCalcer {
  280. i64 TotalSum;
  281. int Offset;
  282. public:
  283. TIncrementalChecksumCalcer()
  284. : TotalSum(0)
  285. , Offset(0)
  286. {
  287. }
  288. void AddBlock(const void* p, int size);
  289. void AddBlockSum(ui32 sum, int size);
  290. ui32 CalcChecksum();
  291. static ui32 CalcBlockSum(const void* p, int size);
  292. };
  293. inline void AddChain(TIncrementalChecksumCalcer* ics, const TBlockChain& chain) {
  294. for (int k = 0; k < chain.GetBlockCount(); ++k) {
  295. const TBlockChain::TBlock& blk = chain.GetBlock(k);
  296. ics->AddBlock(blk.Data, blk.Size);
  297. }
  298. }
  299. }