ib_low.h 27 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737738739740741742743744745746747748749750751752753754755756757758759760761762763764765766767768769770771772773774775776777778779780781782783784785786787788789790791792793794795796797
  1. #pragma once
  2. #include "udp_address.h"
  3. #if defined(_linux_)
  4. #include <contrib/libs/ibdrv/include/infiniband/verbs.h>
  5. #include <contrib/libs/ibdrv/include/rdma/rdma_cma.h>
  6. #endif
  7. namespace NNetliba {
  8. #define CHECK_Z(x) \
  9. { \
  10. int rv = (x); \
  11. if (rv != 0) { \
  12. fprintf(stderr, "check_z failed, errno = %d\n", errno); \
  13. Y_ABORT_UNLESS(0, "check_z"); \
  14. } \
  15. }
  16. //////////////////////////////////////////////////////////////////////////
  17. const int MAX_SGE = 1;
  18. const size_t MAX_INLINE_DATA_SIZE = 16;
  19. const int MAX_OUTSTANDING_RDMA = 10;
  20. #if defined(_linux_)
  21. class TIBContext: public TThrRefBase, TNonCopyable {
  22. ibv_context* Context;
  23. ibv_pd* ProtDomain;
  24. TMutex Lock;
  25. ~TIBContext() override {
  26. if (Context) {
  27. CHECK_Z(ibv_dealloc_pd(ProtDomain));
  28. CHECK_Z(ibv_close_device(Context));
  29. }
  30. }
  31. public:
  32. TIBContext(ibv_device* device) {
  33. Context = ibv_open_device(device);
  34. if (Context) {
  35. ProtDomain = ibv_alloc_pd(Context);
  36. }
  37. }
  38. bool IsValid() const {
  39. return Context != nullptr && ProtDomain != nullptr;
  40. }
  41. class TLock {
  42. TIntrusivePtr<TIBContext> Ptr;
  43. TGuard<TMutex> Guard;
  44. public:
  45. TLock(TPtrArg<TIBContext> ctx)
  46. : Ptr(ctx)
  47. , Guard(ctx->Lock)
  48. {
  49. }
  50. ibv_context* GetContext() {
  51. return Ptr->Context;
  52. }
  53. ibv_pd* GetProtDomain() {
  54. return Ptr->ProtDomain;
  55. }
  56. };
  57. };
  58. class TIBPort: public TThrRefBase, TNonCopyable {
  59. int Port;
  60. int LID;
  61. TIntrusivePtr<TIBContext> IBCtx;
  62. enum {
  63. MAX_GID = 16
  64. };
  65. ibv_gid MyGidArr[MAX_GID];
  66. public:
  67. TIBPort(TPtrArg<TIBContext> ctx, int port)
  68. : IBCtx(ctx)
  69. {
  70. ibv_port_attr portAttrs;
  71. TIBContext::TLock ibContext(IBCtx);
  72. CHECK_Z(ibv_query_port(ibContext.GetContext(), port, &portAttrs));
  73. Port = port;
  74. LID = portAttrs.lid;
  75. for (int i = 0; i < MAX_GID; ++i) {
  76. ibv_gid& dst = MyGidArr[i];
  77. Zero(dst);
  78. ibv_query_gid(ibContext.GetContext(), Port, i, &dst);
  79. }
  80. }
  81. int GetPort() const {
  82. return Port;
  83. }
  84. int GetLID() const {
  85. return LID;
  86. }
  87. TIBContext* GetCtx() {
  88. return IBCtx.Get();
  89. }
  90. void GetGID(ibv_gid* res) const {
  91. *res = MyGidArr[0];
  92. }
  93. int GetGIDIndex(const ibv_gid& arg) const {
  94. for (int i = 0; i < MAX_GID; ++i) {
  95. const ibv_gid& chk = MyGidArr[i];
  96. if (memcmp(&chk, &arg, sizeof(chk)) == 0) {
  97. return i;
  98. }
  99. }
  100. return 0;
  101. }
  102. void GetAHAttr(ibv_wc* wc, ibv_grh* grh, ibv_ah_attr* res) {
  103. TIBContext::TLock ibContext(IBCtx);
  104. CHECK_Z(ibv_init_ah_from_wc(ibContext.GetContext(), Port, wc, grh, res));
  105. }
  106. };
  107. class TComplectionQueue: public TThrRefBase, TNonCopyable {
  108. ibv_cq* CQ;
  109. TIntrusivePtr<TIBContext> IBCtx;
  110. ~TComplectionQueue() override {
  111. if (CQ) {
  112. CHECK_Z(ibv_destroy_cq(CQ));
  113. }
  114. }
  115. public:
  116. TComplectionQueue(TPtrArg<TIBContext> ctx, int maxCQEcount)
  117. : IBCtx(ctx)
  118. {
  119. TIBContext::TLock ibContext(IBCtx);
  120. /* ibv_cq_init_attr_ex attr;
  121. Zero(attr);
  122. attr.cqe = maxCQEcount;
  123. attr.cq_create_flags = 0;
  124. ibv_cq_ex *vcq = ibv_create_cq_ex(ibContext.GetContext(), &attr);
  125. if (vcq) {
  126. CQ = (ibv_cq*)vcq; // doubtful trick but that's life
  127. } else {*/
  128. // no completion channel
  129. // no completion vector
  130. CQ = ibv_create_cq(ibContext.GetContext(), maxCQEcount, nullptr, nullptr, 0);
  131. // }
  132. }
  133. ibv_cq* GetCQ() {
  134. return CQ;
  135. }
  136. int Poll(ibv_wc* res, int bufSize) {
  137. Y_ASSERT(bufSize >= 1);
  138. //struct ibv_wc
  139. //{
  140. // ui64 wr_id;
  141. // enum ibv_wc_status status;
  142. // enum ibv_wc_opcode opcode;
  143. // ui32 vendor_err;
  144. // ui32 byte_len;
  145. // ui32 imm_data;/* network byte order */
  146. // ui32 qp_num;
  147. // ui32 src_qp;
  148. // enum ibv_wc_flags wc_flags;
  149. // ui16 pkey_index;
  150. // ui16 slid;
  151. // ui8 sl;
  152. // ui8 dlid_path_bits;
  153. //};
  154. int rv = ibv_poll_cq(CQ, bufSize, res);
  155. if (rv < 0) {
  156. Y_ABORT_UNLESS(0, "ibv_poll_cq failed");
  157. }
  158. if (rv > 0) {
  159. //printf("Completed wr\n");
  160. //printf(" wr_id = %" PRIx64 "\n", wc.wr_id);
  161. //printf(" status = %d\n", wc.status);
  162. //printf(" opcode = %d\n", wc.opcode);
  163. //printf(" byte_len = %d\n", wc.byte_len);
  164. //printf(" imm_data = %d\n", wc.imm_data);
  165. //printf(" qp_num = %d\n", wc.qp_num);
  166. //printf(" src_qp = %d\n", wc.src_qp);
  167. //printf(" wc_flags = %x\n", wc.wc_flags);
  168. //printf(" slid = %d\n", wc.slid);
  169. }
  170. //rv = number_of_toggled_wc;
  171. return rv;
  172. }
  173. };
  174. //struct ibv_mr
  175. //{
  176. // struct ibv_context *context;
  177. // struct ibv_pd *pd;
  178. // void *addr;
  179. // size_t length;
  180. // ui32 handle;
  181. // ui32 lkey;
  182. // ui32 rkey;
  183. //};
  184. class TMemoryRegion: public TThrRefBase, TNonCopyable {
  185. ibv_mr* MR;
  186. TIntrusivePtr<TIBContext> IBCtx;
  187. ~TMemoryRegion() override {
  188. if (MR) {
  189. CHECK_Z(ibv_dereg_mr(MR));
  190. }
  191. }
  192. public:
  193. TMemoryRegion(TPtrArg<TIBContext> ctx, size_t len)
  194. : IBCtx(ctx)
  195. {
  196. TIBContext::TLock ibContext(IBCtx);
  197. int access = IBV_ACCESS_LOCAL_WRITE | IBV_ACCESS_REMOTE_WRITE | IBV_ACCESS_REMOTE_READ; // TODO: IBV_ACCESS_ALLOCATE_MR
  198. MR = ibv_reg_mr(ibContext.GetProtDomain(), 0, len, access);
  199. Y_ASSERT(MR);
  200. }
  201. ui32 GetLKey() const {
  202. static_assert(sizeof(ui32) == sizeof(MR->lkey), "expect sizeof(ui32) == sizeof(MR->lkey)");
  203. return MR->lkey;
  204. }
  205. ui32 GetRKey() const {
  206. static_assert(sizeof(ui32) == sizeof(MR->lkey), "expect sizeof(ui32) == sizeof(MR->lkey)");
  207. return MR->lkey;
  208. }
  209. char* GetData() {
  210. return MR ? (char*)MR->addr : nullptr;
  211. }
  212. bool IsCovered(const void* data, size_t len) const {
  213. size_t dataAddr = reinterpret_cast<size_t>(data) / sizeof(char);
  214. size_t bufAddr = reinterpret_cast<size_t>(MR->addr) / sizeof(char);
  215. return (dataAddr >= bufAddr) && (dataAddr + len <= bufAddr + MR->length);
  216. }
  217. };
  218. class TSharedReceiveQueue: public TThrRefBase, TNonCopyable {
  219. ibv_srq* SRQ;
  220. TIntrusivePtr<TIBContext> IBCtx;
  221. ~TSharedReceiveQueue() override {
  222. if (SRQ) {
  223. ibv_destroy_srq(SRQ);
  224. }
  225. }
  226. public:
  227. TSharedReceiveQueue(TPtrArg<TIBContext> ctx, int maxWR)
  228. : IBCtx(ctx)
  229. {
  230. ibv_srq_init_attr attr;
  231. Zero(attr);
  232. attr.srq_context = this;
  233. attr.attr.max_sge = MAX_SGE;
  234. attr.attr.max_wr = maxWR;
  235. TIBContext::TLock ibContext(IBCtx);
  236. SRQ = ibv_create_srq(ibContext.GetProtDomain(), &attr);
  237. Y_ASSERT(SRQ);
  238. }
  239. ibv_srq* GetSRQ() {
  240. return SRQ;
  241. }
  242. void PostReceive(TPtrArg<TMemoryRegion> mem, ui64 id, const void* buf, size_t len) {
  243. Y_ASSERT(mem->IsCovered(buf, len));
  244. ibv_recv_wr wr, *bad;
  245. ibv_sge sg;
  246. sg.addr = reinterpret_cast<ui64>(buf) / sizeof(char);
  247. sg.length = len;
  248. sg.lkey = mem->GetLKey();
  249. Zero(wr);
  250. wr.wr_id = id;
  251. wr.sg_list = &sg;
  252. wr.num_sge = 1;
  253. CHECK_Z(ibv_post_srq_recv(SRQ, &wr, &bad));
  254. }
  255. };
  256. inline void MakeAH(ibv_ah_attr* res, TPtrArg<TIBPort> port, int lid, int serviceLevel) {
  257. Zero(*res);
  258. res->dlid = lid;
  259. res->port_num = port->GetPort();
  260. res->sl = serviceLevel;
  261. }
  262. void MakeAH(ibv_ah_attr* res, TPtrArg<TIBPort> port, const TUdpAddress& remoteAddr, const TUdpAddress& localAddr, int serviceLevel);
  263. class TAddressHandle: public TThrRefBase, TNonCopyable {
  264. ibv_ah* AH;
  265. TIntrusivePtr<TIBContext> IBCtx;
  266. ~TAddressHandle() override {
  267. if (AH) {
  268. CHECK_Z(ibv_destroy_ah(AH));
  269. }
  270. AH = nullptr;
  271. IBCtx = nullptr;
  272. }
  273. public:
  274. TAddressHandle(TPtrArg<TIBContext> ctx, ibv_ah_attr* attr)
  275. : IBCtx(ctx)
  276. {
  277. TIBContext::TLock ibContext(IBCtx);
  278. AH = ibv_create_ah(ibContext.GetProtDomain(), attr);
  279. Y_ASSERT(AH != nullptr);
  280. }
  281. TAddressHandle(TPtrArg<TIBPort> port, int lid, int serviceLevel)
  282. : IBCtx(port->GetCtx())
  283. {
  284. ibv_ah_attr attr;
  285. MakeAH(&attr, port, lid, serviceLevel);
  286. TIBContext::TLock ibContext(IBCtx);
  287. AH = ibv_create_ah(ibContext.GetProtDomain(), &attr);
  288. Y_ASSERT(AH != nullptr);
  289. }
  290. TAddressHandle(TPtrArg<TIBPort> port, const TUdpAddress& remoteAddr, const TUdpAddress& localAddr, int serviceLevel)
  291. : IBCtx(port->GetCtx())
  292. {
  293. ibv_ah_attr attr;
  294. MakeAH(&attr, port, remoteAddr, localAddr, serviceLevel);
  295. TIBContext::TLock ibContext(IBCtx);
  296. AH = ibv_create_ah(ibContext.GetProtDomain(), &attr);
  297. Y_ASSERT(AH != nullptr);
  298. }
  299. ibv_ah* GetAH() {
  300. return AH;
  301. }
  302. bool IsValid() const {
  303. return AH != nullptr;
  304. }
  305. };
  306. // GRH + wc -> address_handle_attr
  307. //int ibv_init_ah_from_wc(struct ibv_context *context, ui8 port_num,
  308. //struct ibv_wc *wc, struct ibv_grh *grh,
  309. //struct ibv_ah_attr *ah_attr)
  310. //ibv_create_ah_from_wc(struct ibv_pd *pd, struct ibv_wc *wc, struct ibv_grh
  311. // *grh, ui8 port_num)
  312. class TQueuePair: public TThrRefBase, TNonCopyable {
  313. protected:
  314. ibv_qp* QP;
  315. int MyPSN; // start packet sequence number
  316. TIntrusivePtr<TIBContext> IBCtx;
  317. TIntrusivePtr<TComplectionQueue> CQ;
  318. TIntrusivePtr<TSharedReceiveQueue> SRQ;
  319. TQueuePair(TPtrArg<TIBContext> ctx, TPtrArg<TComplectionQueue> cq, TPtrArg<TSharedReceiveQueue> srq,
  320. int sendQueueSize,
  321. ibv_qp_type qp_type)
  322. : IBCtx(ctx)
  323. , CQ(cq)
  324. , SRQ(srq)
  325. {
  326. MyPSN = GetCycleCount() & 0xffffff; // should be random and different on different runs, 24bit
  327. ibv_qp_init_attr attr;
  328. Zero(attr);
  329. attr.qp_context = this; // not really useful
  330. attr.send_cq = cq->GetCQ();
  331. attr.recv_cq = cq->GetCQ();
  332. attr.srq = srq->GetSRQ();
  333. attr.cap.max_send_wr = sendQueueSize;
  334. attr.cap.max_recv_wr = 0; // we are using srq, no need for qp's rq
  335. attr.cap.max_send_sge = MAX_SGE;
  336. attr.cap.max_recv_sge = MAX_SGE;
  337. attr.cap.max_inline_data = MAX_INLINE_DATA_SIZE;
  338. attr.qp_type = qp_type;
  339. attr.sq_sig_all = 1; // inline sends need not be signaled, but if they are not work queue overflows
  340. TIBContext::TLock ibContext(IBCtx);
  341. QP = ibv_create_qp(ibContext.GetProtDomain(), &attr);
  342. Y_ASSERT(QP);
  343. //struct ibv_qp {
  344. // struct ibv_context *context;
  345. // void *qp_context;
  346. // struct ibv_pd *pd;
  347. // struct ibv_cq *send_cq;
  348. // struct ibv_cq *recv_cq;
  349. // struct ibv_srq *srq;
  350. // ui32 handle;
  351. // ui32 qp_num;
  352. // enum ibv_qp_state state;
  353. // enum ibv_qp_type qp_type;
  354. // pthread_mutex_t mutex;
  355. // pthread_cond_t cond;
  356. // ui32 events_completed;
  357. //};
  358. //qp_context The value qp_context that was provided to ibv_create_qp()
  359. //qp_num The number of this Queue Pair
  360. //state The last known state of this Queue Pair. The actual state may be different from this state (in the RDMA device transitioned the state into other state)
  361. //qp_type The Transport Service Type of this Queue Pair
  362. }
  363. ~TQueuePair() override {
  364. if (QP) {
  365. CHECK_Z(ibv_destroy_qp(QP));
  366. }
  367. }
  368. void FillSendAttrs(ibv_send_wr* wr, ibv_sge* sg,
  369. ui64 localAddr, ui32 lKey, ui64 id, size_t len) {
  370. sg->addr = localAddr;
  371. sg->length = len;
  372. sg->lkey = lKey;
  373. Zero(*wr);
  374. wr->wr_id = id;
  375. wr->sg_list = sg;
  376. wr->num_sge = 1;
  377. if (len <= MAX_INLINE_DATA_SIZE) {
  378. wr->send_flags = IBV_SEND_INLINE;
  379. }
  380. }
  381. void FillSendAttrs(ibv_send_wr* wr, ibv_sge* sg,
  382. TPtrArg<TMemoryRegion> mem, ui64 id, const void* data, size_t len) {
  383. ui64 localAddr = reinterpret_cast<ui64>(data) / sizeof(char);
  384. ui32 lKey = 0;
  385. if (mem) {
  386. Y_ASSERT(mem->IsCovered(data, len));
  387. lKey = mem->GetLKey();
  388. } else {
  389. Y_ASSERT(len <= MAX_INLINE_DATA_SIZE);
  390. }
  391. FillSendAttrs(wr, sg, localAddr, lKey, id, len);
  392. }
  393. public:
  394. int GetQPN() const {
  395. if (QP)
  396. return QP->qp_num;
  397. return 0;
  398. }
  399. int GetPSN() const {
  400. return MyPSN;
  401. }
  402. // we are using srq
  403. //void PostReceive(const TMemoryRegion &mem)
  404. //{
  405. // ibv_recv_wr wr, *bad;
  406. // ibv_sge sg;
  407. // sg.addr = mem.Addr;
  408. // sg.length = mem.Length;
  409. // sg.lkey = mem.lkey;
  410. // Zero(wr);
  411. // wr.wr_id = 13;
  412. // wr.sg_list = sg;
  413. // wr.num_sge = 1;
  414. // CHECK_Z(ibv_post_recv(QP, &wr, &bad));
  415. //}
  416. };
  417. class TRCQueuePair: public TQueuePair {
  418. public:
  419. TRCQueuePair(TPtrArg<TIBContext> ctx, TPtrArg<TComplectionQueue> cq, TPtrArg<TSharedReceiveQueue> srq, int sendQueueSize)
  420. : TQueuePair(ctx, cq, srq, sendQueueSize, IBV_QPT_RC)
  421. {
  422. }
  423. // SRQ should have receive posted
  424. void Init(const ibv_ah_attr& peerAddr, int peerQPN, int peerPSN) {
  425. Y_ASSERT(QP->qp_type == IBV_QPT_RC);
  426. ibv_qp_attr attr;
  427. //{
  428. // enum ibv_qp_state qp_state;
  429. // enum ibv_qp_state cur_qp_state;
  430. // enum ibv_mtu path_mtu;
  431. // enum ibv_mig_state path_mig_state;
  432. // ui32 qkey;
  433. // ui32 rq_psn;
  434. // ui32 sq_psn;
  435. // ui32 dest_qp_num;
  436. // int qp_access_flags;
  437. // struct ibv_qp_cap cap;
  438. // struct ibv_ah_attr ah_attr;
  439. // struct ibv_ah_attr alt_ah_attr;
  440. // ui16 pkey_index;
  441. // ui16 alt_pkey_index;
  442. // ui8 en_sqd_async_notify;
  443. // ui8 sq_draining;
  444. // ui8 max_rd_atomic;
  445. // ui8 max_dest_rd_atomic;
  446. // ui8 min_rnr_timer;
  447. // ui8 port_num;
  448. // ui8 timeout;
  449. // ui8 retry_cnt;
  450. // ui8 rnr_retry;
  451. // ui8 alt_port_num;
  452. // ui8 alt_timeout;
  453. //};
  454. // RESET -> INIT
  455. Zero(attr);
  456. attr.qp_state = IBV_QPS_INIT;
  457. attr.pkey_index = 0;
  458. attr.port_num = peerAddr.port_num;
  459. // for connected QP
  460. attr.qp_access_flags = IBV_ACCESS_REMOTE_WRITE | IBV_ACCESS_REMOTE_READ | IBV_ACCESS_REMOTE_ATOMIC;
  461. CHECK_Z(ibv_modify_qp(QP, &attr,
  462. IBV_QP_STATE | IBV_QP_PKEY_INDEX | IBV_QP_PORT | IBV_QP_ACCESS_FLAGS));
  463. // INIT -> ReadyToReceive
  464. //PostReceive(mem);
  465. attr.qp_state = IBV_QPS_RTR;
  466. attr.path_mtu = IBV_MTU_512; // allows more fine grained VL arbitration
  467. // for connected QP
  468. attr.ah_attr = peerAddr;
  469. attr.dest_qp_num = peerQPN;
  470. attr.rq_psn = peerPSN;
  471. attr.max_dest_rd_atomic = MAX_OUTSTANDING_RDMA; // number of outstanding RDMA requests
  472. attr.min_rnr_timer = 12; // recommended
  473. CHECK_Z(ibv_modify_qp(QP, &attr,
  474. IBV_QP_STATE | IBV_QP_PATH_MTU |
  475. IBV_QP_AV | IBV_QP_DEST_QPN | IBV_QP_RQ_PSN | IBV_QP_MAX_DEST_RD_ATOMIC | IBV_QP_MIN_RNR_TIMER));
  476. // ReadyToReceive -> ReadyToTransmit
  477. attr.qp_state = IBV_QPS_RTS;
  478. // for connected QP
  479. attr.timeout = 14; // increased to 18 for sometime, 14 recommended
  480. //attr.retry_cnt = 0; // for debug purposes
  481. //attr.rnr_retry = 0; // for debug purposes
  482. attr.retry_cnt = 7; // release configuration
  483. attr.rnr_retry = 7; // release configuration (try forever)
  484. attr.sq_psn = MyPSN;
  485. attr.max_rd_atomic = MAX_OUTSTANDING_RDMA; // number of outstanding RDMA requests
  486. CHECK_Z(ibv_modify_qp(QP, &attr,
  487. IBV_QP_STATE |
  488. IBV_QP_TIMEOUT | IBV_QP_RETRY_CNT | IBV_QP_RNR_RETRY | IBV_QP_SQ_PSN | IBV_QP_MAX_QP_RD_ATOMIC));
  489. }
  490. void PostSend(TPtrArg<TMemoryRegion> mem, ui64 id, const void* data, size_t len) {
  491. ibv_send_wr wr, *bad;
  492. ibv_sge sg;
  493. FillSendAttrs(&wr, &sg, mem, id, data, len);
  494. wr.opcode = IBV_WR_SEND;
  495. //IBV_WR_RDMA_WRITE
  496. //IBV_WR_RDMA_WRITE_WITH_IMM
  497. //IBV_WR_SEND
  498. //IBV_WR_SEND_WITH_IMM
  499. //IBV_WR_RDMA_READ
  500. //wr.imm_data = xz;
  501. CHECK_Z(ibv_post_send(QP, &wr, &bad));
  502. }
  503. void PostRDMAWrite(ui64 remoteAddr, ui32 remoteKey,
  504. TPtrArg<TMemoryRegion> mem, ui64 id, const void* data, size_t len) {
  505. ibv_send_wr wr, *bad;
  506. ibv_sge sg;
  507. FillSendAttrs(&wr, &sg, mem, id, data, len);
  508. wr.opcode = IBV_WR_RDMA_WRITE;
  509. wr.wr.rdma.remote_addr = remoteAddr;
  510. wr.wr.rdma.rkey = remoteKey;
  511. CHECK_Z(ibv_post_send(QP, &wr, &bad));
  512. }
  513. void PostRDMAWrite(ui64 remoteAddr, ui32 remoteKey,
  514. ui64 localAddr, ui32 localKey, ui64 id, size_t len) {
  515. ibv_send_wr wr, *bad;
  516. ibv_sge sg;
  517. FillSendAttrs(&wr, &sg, localAddr, localKey, id, len);
  518. wr.opcode = IBV_WR_RDMA_WRITE;
  519. wr.wr.rdma.remote_addr = remoteAddr;
  520. wr.wr.rdma.rkey = remoteKey;
  521. CHECK_Z(ibv_post_send(QP, &wr, &bad));
  522. }
  523. void PostRDMAWriteImm(ui64 remoteAddr, ui32 remoteKey, ui32 immData,
  524. TPtrArg<TMemoryRegion> mem, ui64 id, const void* data, size_t len) {
  525. ibv_send_wr wr, *bad;
  526. ibv_sge sg;
  527. FillSendAttrs(&wr, &sg, mem, id, data, len);
  528. wr.opcode = IBV_WR_RDMA_WRITE_WITH_IMM;
  529. wr.imm_data = immData;
  530. wr.wr.rdma.remote_addr = remoteAddr;
  531. wr.wr.rdma.rkey = remoteKey;
  532. CHECK_Z(ibv_post_send(QP, &wr, &bad));
  533. }
  534. };
  535. class TUDQueuePair: public TQueuePair {
  536. TIntrusivePtr<TIBPort> Port;
  537. public:
  538. TUDQueuePair(TPtrArg<TIBPort> port, TPtrArg<TComplectionQueue> cq, TPtrArg<TSharedReceiveQueue> srq, int sendQueueSize)
  539. : TQueuePair(port->GetCtx(), cq, srq, sendQueueSize, IBV_QPT_UD)
  540. , Port(port)
  541. {
  542. }
  543. // SRQ should have receive posted
  544. void Init(int qkey) {
  545. Y_ASSERT(QP->qp_type == IBV_QPT_UD);
  546. ibv_qp_attr attr;
  547. // RESET -> INIT
  548. Zero(attr);
  549. attr.qp_state = IBV_QPS_INIT;
  550. attr.pkey_index = 0;
  551. attr.port_num = Port->GetPort();
  552. // for unconnected qp
  553. attr.qkey = qkey;
  554. CHECK_Z(ibv_modify_qp(QP, &attr,
  555. IBV_QP_STATE | IBV_QP_PKEY_INDEX | IBV_QP_PORT | IBV_QP_QKEY));
  556. // INIT -> ReadyToReceive
  557. //PostReceive(mem);
  558. attr.qp_state = IBV_QPS_RTR;
  559. CHECK_Z(ibv_modify_qp(QP, &attr, IBV_QP_STATE));
  560. // ReadyToReceive -> ReadyToTransmit
  561. attr.qp_state = IBV_QPS_RTS;
  562. attr.sq_psn = 0;
  563. CHECK_Z(ibv_modify_qp(QP, &attr, IBV_QP_STATE | IBV_QP_SQ_PSN));
  564. }
  565. void PostSend(TPtrArg<TAddressHandle> ah, int remoteQPN, int remoteQKey,
  566. TPtrArg<TMemoryRegion> mem, ui64 id, const void* data, size_t len) {
  567. ibv_send_wr wr, *bad;
  568. ibv_sge sg;
  569. FillSendAttrs(&wr, &sg, mem, id, data, len);
  570. wr.opcode = IBV_WR_SEND;
  571. wr.wr.ud.ah = ah->GetAH();
  572. wr.wr.ud.remote_qpn = remoteQPN;
  573. wr.wr.ud.remote_qkey = remoteQKey;
  574. //IBV_WR_SEND_WITH_IMM
  575. //wr.imm_data = xz;
  576. CHECK_Z(ibv_post_send(QP, &wr, &bad));
  577. }
  578. };
  579. TIntrusivePtr<TIBPort> GetIBDevice();
  580. #else
  581. //////////////////////////////////////////////////////////////////////////
  582. // stub for OS without IB support
  583. //////////////////////////////////////////////////////////////////////////
  584. enum ibv_wc_opcode {
  585. IBV_WC_SEND,
  586. IBV_WC_RDMA_WRITE,
  587. IBV_WC_RDMA_READ,
  588. IBV_WC_COMP_SWAP,
  589. IBV_WC_FETCH_ADD,
  590. IBV_WC_BIND_MW,
  591. IBV_WC_RECV = 1 << 7,
  592. IBV_WC_RECV_RDMA_WITH_IMM
  593. };
  594. enum ibv_wc_status {
  595. IBV_WC_SUCCESS,
  596. // lots of errors follow
  597. };
  598. //struct ibv_device;
  599. //struct ibv_pd;
  600. union ibv_gid {
  601. ui8 raw[16];
  602. struct {
  603. ui64 subnet_prefix;
  604. ui64 interface_id;
  605. } global;
  606. };
  607. struct ibv_wc {
  608. ui64 wr_id;
  609. enum ibv_wc_status status;
  610. enum ibv_wc_opcode opcode;
  611. ui32 imm_data; /* in network byte order */
  612. ui32 qp_num;
  613. ui32 src_qp;
  614. };
  615. struct ibv_grh {};
  616. struct ibv_ah_attr {
  617. ui8 sl;
  618. };
  619. //struct ibv_cq;
  620. class TIBContext: public TThrRefBase, TNonCopyable {
  621. public:
  622. bool IsValid() const {
  623. return false;
  624. }
  625. //ibv_context *GetContext() { return 0; }
  626. //ibv_pd *GetProtDomain() { return 0; }
  627. };
  628. class TIBPort: public TThrRefBase, TNonCopyable {
  629. public:
  630. TIBPort(TPtrArg<TIBContext>, int) {
  631. }
  632. int GetPort() const {
  633. return 1;
  634. }
  635. int GetLID() const {
  636. return 1;
  637. }
  638. TIBContext* GetCtx() {
  639. return 0;
  640. }
  641. void GetGID(ibv_gid* res) {
  642. Zero(*res);
  643. }
  644. void GetAHAttr(ibv_wc*, ibv_grh*, ibv_ah_attr*) {
  645. }
  646. };
  647. class TComplectionQueue: public TThrRefBase, TNonCopyable {
  648. public:
  649. TComplectionQueue(TPtrArg<TIBContext>, int) {
  650. }
  651. //ibv_cq *GetCQ() { return 0; }
  652. int Poll(ibv_wc*, int) {
  653. return 0;
  654. }
  655. };
  656. class TMemoryRegion: public TThrRefBase, TNonCopyable {
  657. public:
  658. TMemoryRegion(TPtrArg<TIBContext>, size_t) {
  659. }
  660. ui32 GetLKey() const {
  661. return 0;
  662. }
  663. ui32 GetRKey() const {
  664. return 0;
  665. }
  666. char* GetData() {
  667. return 0;
  668. }
  669. bool IsCovered(const void*, size_t) const {
  670. return false;
  671. }
  672. };
  673. class TSharedReceiveQueue: public TThrRefBase, TNonCopyable {
  674. public:
  675. TSharedReceiveQueue(TPtrArg<TIBContext>, int) {
  676. }
  677. //ibv_srq *GetSRQ() { return SRQ; }
  678. void PostReceive(TPtrArg<TMemoryRegion>, ui64, const void*, size_t) {
  679. }
  680. };
  681. inline void MakeAH(ibv_ah_attr*, TPtrArg<TIBPort>, int, int) {
  682. }
  683. class TAddressHandle: public TThrRefBase, TNonCopyable {
  684. public:
  685. TAddressHandle(TPtrArg<TIBContext>, ibv_ah_attr*) {
  686. }
  687. TAddressHandle(TPtrArg<TIBPort>, int, int) {
  688. }
  689. TAddressHandle(TPtrArg<TIBPort>, const TUdpAddress&, const TUdpAddress&, int) {
  690. }
  691. //ibv_ah *GetAH() { return AH; }
  692. bool IsValid() {
  693. return true;
  694. }
  695. };
  696. class TQueuePair: public TThrRefBase, TNonCopyable {
  697. public:
  698. int GetQPN() const {
  699. return 0;
  700. }
  701. int GetPSN() const {
  702. return 0;
  703. }
  704. };
  705. class TRCQueuePair: public TQueuePair {
  706. public:
  707. TRCQueuePair(TPtrArg<TIBContext>, TPtrArg<TComplectionQueue>, TPtrArg<TSharedReceiveQueue>, int) {
  708. }
  709. // SRQ should have receive posted
  710. void Init(const ibv_ah_attr&, int, int) {
  711. }
  712. void PostSend(TPtrArg<TMemoryRegion>, ui64, const void*, size_t) {
  713. }
  714. void PostRDMAWrite(ui64, ui32, TPtrArg<TMemoryRegion>, ui64, const void*, size_t) {
  715. }
  716. void PostRDMAWrite(ui64, ui32, ui64, ui32, ui64, size_t) {
  717. }
  718. void PostRDMAWriteImm(ui64, ui32, ui32, TPtrArg<TMemoryRegion>, ui64, const void*, size_t) {
  719. }
  720. };
  721. class TUDQueuePair: public TQueuePair {
  722. TIntrusivePtr<TIBPort> Port;
  723. public:
  724. TUDQueuePair(TPtrArg<TIBPort>, TPtrArg<TComplectionQueue>, TPtrArg<TSharedReceiveQueue>, int) {
  725. }
  726. // SRQ should have receive posted
  727. void Init(int) {
  728. }
  729. void PostSend(TPtrArg<TAddressHandle>, int, int, TPtrArg<TMemoryRegion>, ui64, const void*, size_t) {
  730. }
  731. };
  732. inline TIntrusivePtr<TIBPort> GetIBDevice() {
  733. return 0;
  734. }
  735. #endif
  736. }