ib_buffers.h 6.1 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180
  1. #pragma once
  2. #include "ib_low.h"
  3. namespace NNetliba {
  4. // buffer id 0 is special, it is used when data is sent inline and should not be returned
  5. const size_t SMALL_PKT_SIZE = 4096;
  6. const ui64 BP_AH_USED_FLAG = 0x1000000000000000ul;
  7. const ui64 BP_BUF_ID_MASK = 0x00000000fffffffful;
  8. // single thread version
  9. class TIBBufferPool: public TThrRefBase, TNonCopyable {
  10. static constexpr int BLOCK_SIZE_LN = 11;
  11. static constexpr int BLOCK_SIZE = 1 << BLOCK_SIZE_LN;
  12. static constexpr int BLOCK_COUNT = 1024;
  13. struct TSingleBlock {
  14. TIntrusivePtr<TMemoryRegion> Mem;
  15. TVector<ui8> BlkRefCounts;
  16. TVector<TIntrusivePtr<TAddressHandle>> AHHolder;
  17. void Alloc(TPtrArg<TIBContext> ctx) {
  18. size_t dataSize = SMALL_PKT_SIZE * BLOCK_SIZE;
  19. Mem = new TMemoryRegion(ctx, dataSize);
  20. BlkRefCounts.resize(BLOCK_SIZE, 0);
  21. AHHolder.resize(BLOCK_SIZE);
  22. }
  23. char* GetBufData(ui64 idArg) {
  24. char* data = Mem->GetData();
  25. return data + (idArg & (BLOCK_SIZE - 1)) * SMALL_PKT_SIZE;
  26. }
  27. };
  28. TIntrusivePtr<TIBContext> IBCtx;
  29. TVector<int> FreeList;
  30. TVector<TSingleBlock> Blocks;
  31. size_t FirstFreeBlock;
  32. int PostRecvDeficit;
  33. TIntrusivePtr<TSharedReceiveQueue> SRQ;
  34. void AddBlock() {
  35. if (FirstFreeBlock == Blocks.size()) {
  36. Y_ABORT_UNLESS(0, "run out of buffers");
  37. }
  38. Blocks[FirstFreeBlock].Alloc(IBCtx);
  39. size_t start = (FirstFreeBlock == 0) ? 1 : FirstFreeBlock * BLOCK_SIZE;
  40. size_t finish = FirstFreeBlock * BLOCK_SIZE + BLOCK_SIZE;
  41. for (size_t i = start; i < finish; ++i) {
  42. FreeList.push_back(i);
  43. }
  44. ++FirstFreeBlock;
  45. }
  46. public:
  47. TIBBufferPool(TPtrArg<TIBContext> ctx, int maxSRQWorkRequests)
  48. : IBCtx(ctx)
  49. , FirstFreeBlock(0)
  50. , PostRecvDeficit(maxSRQWorkRequests)
  51. {
  52. Blocks.resize(BLOCK_COUNT);
  53. AddBlock();
  54. SRQ = new TSharedReceiveQueue(ctx, maxSRQWorkRequests);
  55. PostRecv();
  56. }
  57. TSharedReceiveQueue* GetSRQ() const {
  58. return SRQ.Get();
  59. }
  60. int AllocBuf() {
  61. if (FreeList.empty()) {
  62. AddBlock();
  63. }
  64. int id = FreeList.back();
  65. FreeList.pop_back();
  66. Y_ASSERT(++Blocks[id >> BLOCK_SIZE_LN].BlkRefCounts[id & (BLOCK_SIZE - 1)] == 1);
  67. return id;
  68. }
  69. void FreeBuf(ui64 idArg) {
  70. ui64 id = idArg & BP_BUF_ID_MASK;
  71. if (id == 0) {
  72. return;
  73. }
  74. Y_ASSERT(id > 0 && id < (ui64)(FirstFreeBlock * BLOCK_SIZE));
  75. FreeList.push_back(id);
  76. Y_ASSERT(--Blocks[id >> BLOCK_SIZE_LN].BlkRefCounts[id & (BLOCK_SIZE - 1)] == 0);
  77. if (idArg & BP_AH_USED_FLAG) {
  78. Blocks[id >> BLOCK_SIZE_LN].AHHolder[id & (BLOCK_SIZE - 1)] = nullptr;
  79. }
  80. }
  81. char* GetBufData(ui64 idArg) {
  82. ui64 id = idArg & BP_BUF_ID_MASK;
  83. return Blocks[id >> BLOCK_SIZE_LN].GetBufData(id);
  84. }
  85. int PostSend(TPtrArg<TRCQueuePair> qp, const void* data, size_t len) {
  86. if (len > SMALL_PKT_SIZE) {
  87. Y_ABORT_UNLESS(0, "buffer overrun");
  88. }
  89. if (len <= MAX_INLINE_DATA_SIZE) {
  90. qp->PostSend(nullptr, 0, data, len);
  91. return 0;
  92. } else {
  93. int id = AllocBuf();
  94. TSingleBlock& blk = Blocks[id >> BLOCK_SIZE_LN];
  95. char* buf = blk.GetBufData(id);
  96. memcpy(buf, data, len);
  97. qp->PostSend(blk.Mem, id, buf, len);
  98. return id;
  99. }
  100. }
  101. void PostSend(TPtrArg<TUDQueuePair> qp, TPtrArg<TAddressHandle> ah, int remoteQPN, int remoteQKey,
  102. const void* data, size_t len) {
  103. if (len > SMALL_PKT_SIZE - 40) {
  104. Y_ABORT_UNLESS(0, "buffer overrun");
  105. }
  106. ui64 id = AllocBuf();
  107. TSingleBlock& blk = Blocks[id >> BLOCK_SIZE_LN];
  108. int ptr = id & (BLOCK_SIZE - 1);
  109. blk.AHHolder[ptr] = ah.Get();
  110. id |= BP_AH_USED_FLAG;
  111. if (len <= MAX_INLINE_DATA_SIZE) {
  112. qp->PostSend(ah, remoteQPN, remoteQKey, nullptr, id, data, len);
  113. } else {
  114. char* buf = blk.GetBufData(id);
  115. memcpy(buf, data, len);
  116. qp->PostSend(ah, remoteQPN, remoteQKey, blk.Mem, id, buf, len);
  117. }
  118. }
  119. void RequestPostRecv() {
  120. ++PostRecvDeficit;
  121. }
  122. void PostRecv() {
  123. for (int i = 0; i < PostRecvDeficit; ++i) {
  124. int id = AllocBuf();
  125. TSingleBlock& blk = Blocks[id >> BLOCK_SIZE_LN];
  126. char* buf = blk.GetBufData(id);
  127. SRQ->PostReceive(blk.Mem, id, buf, SMALL_PKT_SIZE);
  128. }
  129. PostRecvDeficit = 0;
  130. }
  131. };
  132. class TIBRecvPacketProcess: public TNonCopyable {
  133. TIBBufferPool& BP;
  134. ui64 Id;
  135. char* Data;
  136. public:
  137. TIBRecvPacketProcess(TIBBufferPool& bp, const ibv_wc& wc)
  138. : BP(bp)
  139. , Id(wc.wr_id)
  140. {
  141. Y_ASSERT(wc.opcode & IBV_WC_RECV);
  142. BP.RequestPostRecv();
  143. Data = BP.GetBufData(Id);
  144. }
  145. // intended for postponed packet processing
  146. // with this call RequestPostRecv() should be called outside (and PostRecv() too in order to avoid rnr situation)
  147. TIBRecvPacketProcess(TIBBufferPool& bp, const ui64 wr_id)
  148. : BP(bp)
  149. , Id(wr_id)
  150. {
  151. Data = BP.GetBufData(Id);
  152. }
  153. ~TIBRecvPacketProcess() {
  154. BP.FreeBuf(Id);
  155. BP.PostRecv();
  156. }
  157. char* GetData() const {
  158. return Data;
  159. }
  160. char* GetUDData() const {
  161. return Data + 40;
  162. }
  163. ibv_grh* GetGRH() const {
  164. return (ibv_grh*)Data;
  165. }
  166. };
  167. }