block_chain.h 9.4 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318
  1. #pragma once
  2. #include <util/generic/algorithm.h>
  3. #include <util/generic/list.h>
  4. #include <util/system/shmat.h>
  5. #include <util/generic/noncopyable.h>
  6. namespace NNetliba {
  7. class TBlockChain {
  8. public:
  9. struct TBlock {
  10. const char* Data;
  11. int Offset, Size; // Offset in whole chain
  12. TBlock()
  13. : Data(nullptr)
  14. , Offset(0)
  15. , Size(0)
  16. {
  17. }
  18. TBlock(const char* data, int offset, int sz)
  19. : Data(data)
  20. , Offset(offset)
  21. , Size(sz)
  22. {
  23. }
  24. };
  25. private:
  26. typedef TVector<TBlock> TBlockVector;
  27. TBlockVector Blocks;
  28. int Size;
  29. struct TBlockLess {
  30. bool operator()(const TBlock& b, int offset) const {
  31. return b.Offset < offset;
  32. }
  33. };
  34. public:
  35. TBlockChain()
  36. : Size(0)
  37. {
  38. }
  39. void AddBlock(const void* data, int sz) {
  40. Blocks.push_back(TBlock((const char*)data, Size, sz));
  41. Size += sz;
  42. }
  43. int GetSize() const {
  44. return Size;
  45. }
  46. const TBlock& GetBlock(int i) const {
  47. return Blocks[i];
  48. }
  49. int GetBlockCount() const {
  50. return Blocks.ysize();
  51. }
  52. int GetBlockIdByOffset(int offset) const {
  53. TBlockVector::const_iterator i = LowerBound(Blocks.begin(), Blocks.end(), offset, TBlockLess());
  54. if (i == Blocks.end())
  55. return Blocks.ysize() - 1;
  56. if (i->Offset == offset)
  57. return (int)(i - Blocks.begin());
  58. return (int)(i - Blocks.begin() - 1);
  59. }
  60. };
  61. //////////////////////////////////////////////////////////////////////////
  62. class TBlockChainIterator {
  63. const TBlockChain& Chain;
  64. int Pos, BlockPos, BlockId;
  65. bool Failed;
  66. public:
  67. TBlockChainIterator(const TBlockChain& chain)
  68. : Chain(chain)
  69. , Pos(0)
  70. , BlockPos(0)
  71. , BlockId(0)
  72. , Failed(false)
  73. {
  74. }
  75. void Read(void* dst, int sz) {
  76. char* dstBuf = (char*)dst;
  77. while (sz > 0) {
  78. if (BlockId >= Chain.GetBlockCount()) {
  79. // JACKPOT!
  80. fprintf(stderr, "reading beyond chain end: BlockId %d, Chain.GetBlockCount() %d, Pos %d, BlockPos %d\n",
  81. BlockId, Chain.GetBlockCount(), Pos, BlockPos);
  82. Y_ASSERT(0 && "reading beyond chain end");
  83. memset(dstBuf, 0, sz);
  84. Failed = true;
  85. return;
  86. }
  87. const TBlockChain::TBlock& blk = Chain.GetBlock(BlockId);
  88. int copySize = Min(blk.Size - BlockPos, sz);
  89. memcpy(dstBuf, blk.Data + BlockPos, copySize);
  90. dstBuf += copySize;
  91. Pos += copySize;
  92. BlockPos += copySize;
  93. sz -= copySize;
  94. if (BlockPos == blk.Size) {
  95. BlockPos = 0;
  96. ++BlockId;
  97. }
  98. }
  99. }
  100. void Seek(int pos) {
  101. if (pos < 0 || pos > Chain.GetSize()) {
  102. Y_ASSERT(0);
  103. Pos = 0;
  104. BlockPos = 0;
  105. BlockId = 0;
  106. return;
  107. }
  108. BlockId = Chain.GetBlockIdByOffset(pos);
  109. const TBlockChain::TBlock& blk = Chain.GetBlock(BlockId);
  110. Pos = pos;
  111. BlockPos = Pos - blk.Offset;
  112. }
  113. int GetPos() const {
  114. return Pos;
  115. }
  116. int GetSize() const {
  117. return Chain.GetSize();
  118. }
  119. bool HasFailed() const {
  120. return Failed;
  121. }
  122. void Fail() {
  123. Failed = true;
  124. }
  125. };
  126. //////////////////////////////////////////////////////////////////////////
  127. class TRopeDataPacket: public TNonCopyable {
  128. TBlockChain Chain;
  129. TVector<char*> Buf;
  130. char *Block, *BlockEnd;
  131. TList<TVector<char>> DataVectors;
  132. TIntrusivePtr<TSharedMemory> SharedData;
  133. TVector<TIntrusivePtr<TThrRefBase>> AttachedStorage;
  134. char DefaultBuf[128]; // prevent allocs in most cases
  135. static constexpr int N_DEFAULT_BLOCK_SIZE = 1024;
  136. char* Alloc(int sz) {
  137. char* res = nullptr;
  138. if (BlockEnd - Block < sz) {
  139. int bufSize = Max((int)N_DEFAULT_BLOCK_SIZE, sz);
  140. char* newBlock = AllocBuf(bufSize);
  141. Block = newBlock;
  142. BlockEnd = Block + bufSize;
  143. Buf.push_back(newBlock);
  144. }
  145. res = Block;
  146. Block += sz;
  147. Y_ASSERT(Block <= BlockEnd);
  148. return res;
  149. }
  150. public:
  151. TRopeDataPacket()
  152. : Block(DefaultBuf)
  153. , BlockEnd(DefaultBuf + Y_ARRAY_SIZE(DefaultBuf))
  154. {
  155. }
  156. ~TRopeDataPacket() {
  157. for (size_t i = 0; i < Buf.size(); ++i)
  158. FreeBuf(Buf[i]);
  159. }
  160. static char* AllocBuf(int sz) {
  161. return new char[sz];
  162. }
  163. static void FreeBuf(char* buf) {
  164. delete[] buf;
  165. }
  166. // buf - pointer to buffer which will be freed with FreeBuf()
  167. // data - pointer to data start within buf
  168. // sz - size of useful data
  169. void AddBlock(char* buf, const char* data, int sz) {
  170. Buf.push_back(buf);
  171. Chain.AddBlock(data, sz);
  172. }
  173. void AddBlock(TThrRefBase* buf, const char* data, int sz) {
  174. AttachedStorage.push_back(buf);
  175. Chain.AddBlock(data, sz);
  176. }
  177. //
  178. void Write(const void* data, int sz) {
  179. char* buf = Alloc(sz);
  180. memcpy(buf, data, sz);
  181. Chain.AddBlock(buf, sz);
  182. }
  183. template <class T>
  184. void Write(const T& data) {
  185. Write(&data, sizeof(T));
  186. }
  187. //// caller guarantees that data will persist all *this lifetime
  188. //// int this case so we don`t have to copy data to locally held buffer
  189. //template<class T>
  190. //void WriteNoCopy(const T *data)
  191. //{
  192. // Chain.AddBlock(data, sizeof(T));
  193. //}
  194. // write some array like TVector<>
  195. //template<class T>
  196. //void WriteArr(const T &sz)
  197. //{
  198. // int n = (int)sz.size();
  199. // Write(n);
  200. // if (n > 0)
  201. // Write(&sz[0], n * sizeof(sz[0]));
  202. //}
  203. void WriteStroka(const TString& sz) {
  204. int n = (int)sz.size();
  205. Write(n);
  206. if (n > 0)
  207. Write(sz.c_str(), n * sizeof(sz[0]));
  208. }
  209. // will take *data ownership, saves copy
  210. void WriteDestructive(TVector<char>* data) {
  211. int n = data ? data->ysize() : 0;
  212. Write(n);
  213. if (n > 0) {
  214. TVector<char>& local = DataVectors.emplace_back(std::move(*data));
  215. Chain.AddBlock(&local[0], local.ysize());
  216. }
  217. }
  218. void AttachSharedData(TIntrusivePtr<TSharedMemory> shm) {
  219. SharedData = shm;
  220. }
  221. TSharedMemory* GetSharedData() const {
  222. return SharedData.Get();
  223. }
  224. const TBlockChain& GetChain() {
  225. return Chain;
  226. }
  227. int GetSize() {
  228. return Chain.GetSize();
  229. }
  230. };
  231. template <class T>
  232. inline void ReadArr(TBlockChainIterator* res, T* dst) {
  233. int n;
  234. res->Read(&n, sizeof(n));
  235. if (n >= 0) {
  236. dst->resize(n);
  237. if (n > 0)
  238. res->Read(&(*dst)[0], n * sizeof((*dst)[0]));
  239. } else {
  240. dst->resize(0);
  241. res->Fail();
  242. }
  243. }
  244. template <>
  245. inline void ReadArr<TString>(TBlockChainIterator* res, TString* dst) {
  246. int n;
  247. res->Read(&n, sizeof(n));
  248. if (n >= 0) {
  249. dst->resize(n);
  250. if (n > 0)
  251. res->Read(dst->begin(), n * sizeof(TString::value_type));
  252. } else {
  253. dst->resize(0);
  254. res->Fail();
  255. }
  256. }
  257. // saves on zeroing *dst with yresize()
  258. template <class T>
  259. static void ReadYArr(TBlockChainIterator* res, TVector<T>* dst) {
  260. int n;
  261. res->Read(&n, sizeof(n));
  262. if (n >= 0) {
  263. dst->yresize(n);
  264. if (n > 0)
  265. res->Read(&(*dst)[0], n * sizeof((*dst)[0]));
  266. } else {
  267. dst->yresize(0);
  268. res->Fail();
  269. }
  270. }
  271. template <class T>
  272. static void Read(TBlockChainIterator* res, T* dst) {
  273. res->Read(dst, sizeof(T));
  274. }
  275. ui32 CalcChecksum(const void* p, int size);
  276. ui32 CalcChecksum(const TBlockChain& chain);
  277. class TIncrementalChecksumCalcer {
  278. i64 TotalSum;
  279. int Offset;
  280. public:
  281. TIncrementalChecksumCalcer()
  282. : TotalSum(0)
  283. , Offset(0)
  284. {
  285. }
  286. void AddBlock(const void* p, int size);
  287. void AddBlockSum(ui32 sum, int size);
  288. ui32 CalcChecksum();
  289. static ui32 CalcBlockSum(const void* p, int size);
  290. };
  291. inline void AddChain(TIncrementalChecksumCalcer* ics, const TBlockChain& chain) {
  292. for (int k = 0; k < chain.GetBlockCount(); ++k) {
  293. const TBlockChain::TBlock& blk = chain.GetBlock(k);
  294. ics->AddBlock(blk.Data, blk.Size);
  295. }
  296. }
  297. }