ib_buffers.h 6.1 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181
  1. #pragma once
  2. #include "ib_low.h"
  3. namespace NNetliba {
  4. // buffer id 0 is special, it is used when data is sent inline and should not be returned
  5. const size_t SMALL_PKT_SIZE = 4096;
  6. const ui64 BP_AH_USED_FLAG = 0x1000000000000000ul;
  7. const ui64 BP_BUF_ID_MASK = 0x00000000fffffffful;
  8. // single thread version
  9. class TIBBufferPool: public TThrRefBase, TNonCopyable {
  10. enum {
  11. BLOCK_SIZE_LN = 11,
  12. BLOCK_SIZE = 1 << BLOCK_SIZE_LN,
  13. BLOCK_COUNT = 1024
  14. };
  15. struct TSingleBlock {
  16. TIntrusivePtr<TMemoryRegion> Mem;
  17. TVector<ui8> BlkRefCounts;
  18. TVector<TIntrusivePtr<TAddressHandle>> AHHolder;
  19. void Alloc(TPtrArg<TIBContext> ctx) {
  20. size_t dataSize = SMALL_PKT_SIZE * BLOCK_SIZE;
  21. Mem = new TMemoryRegion(ctx, dataSize);
  22. BlkRefCounts.resize(BLOCK_SIZE, 0);
  23. AHHolder.resize(BLOCK_SIZE);
  24. }
  25. char* GetBufData(ui64 idArg) {
  26. char* data = Mem->GetData();
  27. return data + (idArg & (BLOCK_SIZE - 1)) * SMALL_PKT_SIZE;
  28. }
  29. };
  30. TIntrusivePtr<TIBContext> IBCtx;
  31. TVector<int> FreeList;
  32. TVector<TSingleBlock> Blocks;
  33. size_t FirstFreeBlock;
  34. int PostRecvDeficit;
  35. TIntrusivePtr<TSharedReceiveQueue> SRQ;
  36. void AddBlock() {
  37. if (FirstFreeBlock == Blocks.size()) {
  38. Y_ABORT_UNLESS(0, "run out of buffers");
  39. }
  40. Blocks[FirstFreeBlock].Alloc(IBCtx);
  41. size_t start = (FirstFreeBlock == 0) ? 1 : FirstFreeBlock * BLOCK_SIZE;
  42. size_t finish = FirstFreeBlock * BLOCK_SIZE + BLOCK_SIZE;
  43. for (size_t i = start; i < finish; ++i) {
  44. FreeList.push_back(i);
  45. }
  46. ++FirstFreeBlock;
  47. }
  48. public:
  49. TIBBufferPool(TPtrArg<TIBContext> ctx, int maxSRQWorkRequests)
  50. : IBCtx(ctx)
  51. , FirstFreeBlock(0)
  52. , PostRecvDeficit(maxSRQWorkRequests)
  53. {
  54. Blocks.resize(BLOCK_COUNT);
  55. AddBlock();
  56. SRQ = new TSharedReceiveQueue(ctx, maxSRQWorkRequests);
  57. PostRecv();
  58. }
  59. TSharedReceiveQueue* GetSRQ() const {
  60. return SRQ.Get();
  61. }
  62. int AllocBuf() {
  63. if (FreeList.empty()) {
  64. AddBlock();
  65. }
  66. int id = FreeList.back();
  67. FreeList.pop_back();
  68. Y_ASSERT(++Blocks[id >> BLOCK_SIZE_LN].BlkRefCounts[id & (BLOCK_SIZE - 1)] == 1);
  69. return id;
  70. }
  71. void FreeBuf(ui64 idArg) {
  72. ui64 id = idArg & BP_BUF_ID_MASK;
  73. if (id == 0) {
  74. return;
  75. }
  76. Y_ASSERT(id > 0 && id < (ui64)(FirstFreeBlock * BLOCK_SIZE));
  77. FreeList.push_back(id);
  78. Y_ASSERT(--Blocks[id >> BLOCK_SIZE_LN].BlkRefCounts[id & (BLOCK_SIZE - 1)] == 0);
  79. if (idArg & BP_AH_USED_FLAG) {
  80. Blocks[id >> BLOCK_SIZE_LN].AHHolder[id & (BLOCK_SIZE - 1)] = nullptr;
  81. }
  82. }
  83. char* GetBufData(ui64 idArg) {
  84. ui64 id = idArg & BP_BUF_ID_MASK;
  85. return Blocks[id >> BLOCK_SIZE_LN].GetBufData(id);
  86. }
  87. int PostSend(TPtrArg<TRCQueuePair> qp, const void* data, size_t len) {
  88. if (len > SMALL_PKT_SIZE) {
  89. Y_ABORT_UNLESS(0, "buffer overrun");
  90. }
  91. if (len <= MAX_INLINE_DATA_SIZE) {
  92. qp->PostSend(nullptr, 0, data, len);
  93. return 0;
  94. } else {
  95. int id = AllocBuf();
  96. TSingleBlock& blk = Blocks[id >> BLOCK_SIZE_LN];
  97. char* buf = blk.GetBufData(id);
  98. memcpy(buf, data, len);
  99. qp->PostSend(blk.Mem, id, buf, len);
  100. return id;
  101. }
  102. }
  103. void PostSend(TPtrArg<TUDQueuePair> qp, TPtrArg<TAddressHandle> ah, int remoteQPN, int remoteQKey,
  104. const void* data, size_t len) {
  105. if (len > SMALL_PKT_SIZE - 40) {
  106. Y_ABORT_UNLESS(0, "buffer overrun");
  107. }
  108. ui64 id = AllocBuf();
  109. TSingleBlock& blk = Blocks[id >> BLOCK_SIZE_LN];
  110. int ptr = id & (BLOCK_SIZE - 1);
  111. blk.AHHolder[ptr] = ah.Get();
  112. id |= BP_AH_USED_FLAG;
  113. if (len <= MAX_INLINE_DATA_SIZE) {
  114. qp->PostSend(ah, remoteQPN, remoteQKey, nullptr, id, data, len);
  115. } else {
  116. char* buf = blk.GetBufData(id);
  117. memcpy(buf, data, len);
  118. qp->PostSend(ah, remoteQPN, remoteQKey, blk.Mem, id, buf, len);
  119. }
  120. }
  121. void RequestPostRecv() {
  122. ++PostRecvDeficit;
  123. }
  124. void PostRecv() {
  125. for (int i = 0; i < PostRecvDeficit; ++i) {
  126. int id = AllocBuf();
  127. TSingleBlock& blk = Blocks[id >> BLOCK_SIZE_LN];
  128. char* buf = blk.GetBufData(id);
  129. SRQ->PostReceive(blk.Mem, id, buf, SMALL_PKT_SIZE);
  130. }
  131. PostRecvDeficit = 0;
  132. }
  133. };
  134. class TIBRecvPacketProcess: public TNonCopyable {
  135. TIBBufferPool& BP;
  136. ui64 Id;
  137. char* Data;
  138. public:
  139. TIBRecvPacketProcess(TIBBufferPool& bp, const ibv_wc& wc)
  140. : BP(bp)
  141. , Id(wc.wr_id)
  142. {
  143. Y_ASSERT(wc.opcode & IBV_WC_RECV);
  144. BP.RequestPostRecv();
  145. Data = BP.GetBufData(Id);
  146. }
  147. // intended for postponed packet processing
  148. // with this call RequestPostRecv() should be called outside (and PostRecv() too in order to avoid rnr situation)
  149. TIBRecvPacketProcess(TIBBufferPool& bp, const ui64 wr_id)
  150. : BP(bp)
  151. , Id(wr_id)
  152. {
  153. Data = BP.GetBufData(Id);
  154. }
  155. ~TIBRecvPacketProcess() {
  156. BP.FreeBuf(Id);
  157. BP.PostRecv();
  158. }
  159. char* GetData() const {
  160. return Data;
  161. }
  162. char* GetUDData() const {
  163. return Data + 40;
  164. }
  165. ibv_grh* GetGRH() const {
  166. return (ibv_grh*)Data;
  167. }
  168. };
  169. }