ib_memstream.cpp 4.0 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122
  1. #include "stdafx.h"
  2. #include "ib_mem.h"
  3. #include "ib_memstream.h"
  4. #include "ib_low.h"
  5. namespace NNetliba {
  6. int TIBMemStream::WriteImpl(const void* userBuffer, int sizeArg) {
  7. const char* srcData = (const char*)userBuffer;
  8. int size = sizeArg;
  9. for (;;) {
  10. if (size == 0)
  11. return sizeArg;
  12. if (CurBlock == Blocks.ysize()) {
  13. // add new block
  14. TBlock& blk = Blocks.emplace_back();
  15. blk.StartOffset = GetLength();
  16. int szLog = 17 + Min(Blocks.ysize() / 2, 13);
  17. blk.BufSize = 1 << szLog;
  18. blk.DataSize = 0;
  19. blk.Mem = MemPool->Alloc(blk.BufSize);
  20. Y_ASSERT(CurBlockOffset == 0);
  21. }
  22. TBlock& curBlk = Blocks[CurBlock];
  23. int leftSpace = curBlk.BufSize - CurBlockOffset;
  24. int copySize = Min(size, leftSpace);
  25. memcpy(curBlk.Mem->GetData() + CurBlockOffset, srcData, copySize);
  26. size -= copySize;
  27. CurBlockOffset += copySize;
  28. srcData += copySize;
  29. curBlk.DataSize = Max(curBlk.DataSize, CurBlockOffset);
  30. if (CurBlockOffset == curBlk.BufSize) {
  31. ++CurBlock;
  32. CurBlockOffset = 0;
  33. }
  34. }
  35. }
  36. int TIBMemStream::ReadImpl(void* userBuffer, int sizeArg) {
  37. char* dstData = (char*)userBuffer;
  38. int size = sizeArg;
  39. for (;;) {
  40. if (size == 0)
  41. return sizeArg;
  42. if (CurBlock == Blocks.ysize()) {
  43. //memset(dstData, 0, size);
  44. size = 0;
  45. continue;
  46. }
  47. TBlock& curBlk = Blocks[CurBlock];
  48. int leftSpace = curBlk.DataSize - CurBlockOffset;
  49. int copySize = Min(size, leftSpace);
  50. memcpy(dstData, curBlk.Mem->GetData() + CurBlockOffset, copySize);
  51. size -= copySize;
  52. CurBlockOffset += copySize;
  53. dstData += copySize;
  54. if (CurBlockOffset == curBlk.DataSize) {
  55. ++CurBlock;
  56. CurBlockOffset = 0;
  57. }
  58. }
  59. }
  60. i64 TIBMemStream::GetLength() {
  61. i64 res = 0;
  62. for (int i = 0; i < Blocks.ysize(); ++i) {
  63. res += Blocks[i].DataSize;
  64. }
  65. return res;
  66. }
  67. i64 TIBMemStream::Seek(i64 pos) {
  68. for (int resBlockId = 0; resBlockId < Blocks.ysize(); ++resBlockId) {
  69. const TBlock& blk = Blocks[resBlockId];
  70. if (pos < blk.StartOffset + blk.DataSize) {
  71. CurBlock = resBlockId;
  72. CurBlockOffset = pos - blk.StartOffset;
  73. return pos;
  74. }
  75. }
  76. CurBlock = Blocks.ysize();
  77. CurBlockOffset = 0;
  78. return GetLength();
  79. }
  80. void TIBMemStream::GetBlocks(TVector<TBlockDescr>* res) const {
  81. int blockCount = Blocks.ysize();
  82. res->resize(blockCount);
  83. for (int i = 0; i < blockCount; ++i) {
  84. const TBlock& blk = Blocks[i];
  85. TBlockDescr& dst = (*res)[i];
  86. dst.Addr = blk.Mem->GetAddr();
  87. dst.BufSize = blk.BufSize;
  88. dst.DataSize = blk.DataSize;
  89. TMemoryRegion* mem = blk.Mem->GetMemRegion();
  90. dst.LocalKey = mem->GetLKey();
  91. dst.RemoteKey = mem->GetRKey();
  92. }
  93. }
  94. void TIBMemStream::CreateBlocks(const TVector<TBlockSizes>& arr) {
  95. int blockCount = arr.ysize();
  96. Blocks.resize(blockCount);
  97. i64 offset = 0;
  98. for (int i = 0; i < blockCount; ++i) {
  99. const TBlockSizes& src = arr[i];
  100. TBlock& blk = Blocks[i];
  101. blk.BufSize = src.BufSize;
  102. blk.DataSize = src.DataSize;
  103. blk.Mem = MemPool->Alloc(blk.BufSize);
  104. blk.StartOffset = offset;
  105. offset += blk.DataSize;
  106. }
  107. CurBlock = 0;
  108. CurBlockOffset = 0;
  109. }
  110. void TIBMemStream::Clear() {
  111. Blocks.resize(0);
  112. CurBlock = 0;
  113. CurBlockOffset = 0;
  114. }
  115. }