ib_low.h 27 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737738739740741742743744745746747748749750751752753754755756757758759760761762763764765766767768769770771772773774775776777778779780781782783784785786787788789790791792793794795796797
  1. #pragma once
  2. #include "udp_address.h"
  3. #if defined(_linux_) && !defined(__ANDROID__)
  4. #include <contrib/libs/ibdrv/include/infiniband/verbs.h>
  5. #include <contrib/libs/ibdrv/include/rdma/rdma_cma.h>
  6. #endif
  7. namespace NNetliba {
  8. #define CHECK_Z(x) \
  9. { \
  10. int rv = (x); \
  11. if (rv != 0) { \
  12. fprintf(stderr, "check_z failed, errno = %d\n", errno); \
  13. Y_ABORT_UNLESS(0, "check_z"); \
  14. } \
  15. }
  16. //////////////////////////////////////////////////////////////////////////
  17. const int MAX_SGE = 1;
  18. const size_t MAX_INLINE_DATA_SIZE = 16;
  19. const int MAX_OUTSTANDING_RDMA = 10;
  20. #if defined(_linux_) && !defined(__ANDROID__)
  21. class TIBContext: public TThrRefBase, TNonCopyable {
  22. ibv_context* Context;
  23. ibv_pd* ProtDomain;
  24. TMutex Lock;
  25. ~TIBContext() override {
  26. if (Context) {
  27. CHECK_Z(ibv_dealloc_pd(ProtDomain));
  28. CHECK_Z(ibv_close_device(Context));
  29. }
  30. }
  31. public:
  32. TIBContext(ibv_device* device) {
  33. Context = ibv_open_device(device);
  34. if (Context) {
  35. ProtDomain = ibv_alloc_pd(Context);
  36. }
  37. }
  38. bool IsValid() const {
  39. return Context != nullptr && ProtDomain != nullptr;
  40. }
  41. class TLock {
  42. TIntrusivePtr<TIBContext> Ptr;
  43. TGuard<TMutex> Guard;
  44. public:
  45. TLock(TPtrArg<TIBContext> ctx)
  46. : Ptr(ctx)
  47. , Guard(ctx->Lock)
  48. {
  49. }
  50. ibv_context* GetContext() {
  51. return Ptr->Context;
  52. }
  53. ibv_pd* GetProtDomain() {
  54. return Ptr->ProtDomain;
  55. }
  56. };
  57. };
  58. class TIBPort: public TThrRefBase, TNonCopyable {
  59. int Port;
  60. int LID;
  61. TIntrusivePtr<TIBContext> IBCtx;
  62. static constexpr int MAX_GID = 16;
  63. ibv_gid MyGidArr[MAX_GID];
  64. public:
  65. TIBPort(TPtrArg<TIBContext> ctx, int port)
  66. : IBCtx(ctx)
  67. {
  68. ibv_port_attr portAttrs;
  69. TIBContext::TLock ibContext(IBCtx);
  70. CHECK_Z(ibv_query_port(ibContext.GetContext(), port, &portAttrs));
  71. Port = port;
  72. LID = portAttrs.lid;
  73. for (int i = 0; i < MAX_GID; ++i) {
  74. ibv_gid& dst = MyGidArr[i];
  75. Zero(dst);
  76. ibv_query_gid(ibContext.GetContext(), Port, i, &dst);
  77. }
  78. }
  79. int GetPort() const {
  80. return Port;
  81. }
  82. int GetLID() const {
  83. return LID;
  84. }
  85. TIBContext* GetCtx() {
  86. return IBCtx.Get();
  87. }
  88. void GetGID(ibv_gid* res) const {
  89. *res = MyGidArr[0];
  90. }
  91. int GetGIDIndex(const ibv_gid& arg) const {
  92. for (int i = 0; i < MAX_GID; ++i) {
  93. const ibv_gid& chk = MyGidArr[i];
  94. if (memcmp(&chk, &arg, sizeof(chk)) == 0) {
  95. return i;
  96. }
  97. }
  98. return 0;
  99. }
  100. void GetAHAttr(ibv_wc* wc, ibv_grh* grh, ibv_ah_attr* res) {
  101. TIBContext::TLock ibContext(IBCtx);
  102. CHECK_Z(ibv_init_ah_from_wc(ibContext.GetContext(), Port, wc, grh, res));
  103. }
  104. };
  105. class TComplectionQueue: public TThrRefBase, TNonCopyable {
  106. ibv_cq* CQ;
  107. TIntrusivePtr<TIBContext> IBCtx;
  108. ~TComplectionQueue() override {
  109. if (CQ) {
  110. CHECK_Z(ibv_destroy_cq(CQ));
  111. }
  112. }
  113. public:
  114. TComplectionQueue(TPtrArg<TIBContext> ctx, int maxCQEcount)
  115. : IBCtx(ctx)
  116. {
  117. TIBContext::TLock ibContext(IBCtx);
  118. /* ibv_cq_init_attr_ex attr;
  119. Zero(attr);
  120. attr.cqe = maxCQEcount;
  121. attr.cq_create_flags = 0;
  122. ibv_cq_ex *vcq = ibv_create_cq_ex(ibContext.GetContext(), &attr);
  123. if (vcq) {
  124. CQ = (ibv_cq*)vcq; // doubtful trick but that's life
  125. } else {*/
  126. // no completion channel
  127. // no completion vector
  128. CQ = ibv_create_cq(ibContext.GetContext(), maxCQEcount, nullptr, nullptr, 0);
  129. // }
  130. }
  131. ibv_cq* GetCQ() {
  132. return CQ;
  133. }
  134. int Poll(ibv_wc* res, int bufSize) {
  135. Y_ASSERT(bufSize >= 1);
  136. //struct ibv_wc
  137. //{
  138. // ui64 wr_id;
  139. // enum ibv_wc_status status;
  140. // enum ibv_wc_opcode opcode;
  141. // ui32 vendor_err;
  142. // ui32 byte_len;
  143. // ui32 imm_data;/* network byte order */
  144. // ui32 qp_num;
  145. // ui32 src_qp;
  146. // enum ibv_wc_flags wc_flags;
  147. // ui16 pkey_index;
  148. // ui16 slid;
  149. // ui8 sl;
  150. // ui8 dlid_path_bits;
  151. //};
  152. int rv = ibv_poll_cq(CQ, bufSize, res);
  153. if (rv < 0) {
  154. Y_ABORT_UNLESS(0, "ibv_poll_cq failed");
  155. }
  156. if (rv > 0) {
  157. //printf("Completed wr\n");
  158. //printf(" wr_id = %" PRIx64 "\n", wc.wr_id);
  159. //printf(" status = %d\n", wc.status);
  160. //printf(" opcode = %d\n", wc.opcode);
  161. //printf(" byte_len = %d\n", wc.byte_len);
  162. //printf(" imm_data = %d\n", wc.imm_data);
  163. //printf(" qp_num = %d\n", wc.qp_num);
  164. //printf(" src_qp = %d\n", wc.src_qp);
  165. //printf(" wc_flags = %x\n", wc.wc_flags);
  166. //printf(" slid = %d\n", wc.slid);
  167. }
  168. //rv = number_of_toggled_wc;
  169. return rv;
  170. }
  171. };
  172. //struct ibv_mr
  173. //{
  174. // struct ibv_context *context;
  175. // struct ibv_pd *pd;
  176. // void *addr;
  177. // size_t length;
  178. // ui32 handle;
  179. // ui32 lkey;
  180. // ui32 rkey;
  181. //};
  182. class TMemoryRegion: public TThrRefBase, TNonCopyable {
  183. ibv_mr* MR;
  184. TIntrusivePtr<TIBContext> IBCtx;
  185. ~TMemoryRegion() override {
  186. if (MR) {
  187. CHECK_Z(ibv_dereg_mr(MR));
  188. }
  189. }
  190. public:
  191. TMemoryRegion(TPtrArg<TIBContext> ctx, size_t len)
  192. : IBCtx(ctx)
  193. {
  194. TIBContext::TLock ibContext(IBCtx);
  195. int access = IBV_ACCESS_LOCAL_WRITE | IBV_ACCESS_REMOTE_WRITE | IBV_ACCESS_REMOTE_READ; // TODO: IBV_ACCESS_ALLOCATE_MR
  196. MR = ibv_reg_mr(ibContext.GetProtDomain(), 0, len, access);
  197. Y_ASSERT(MR);
  198. }
  199. ui32 GetLKey() const {
  200. static_assert(sizeof(ui32) == sizeof(MR->lkey), "expect sizeof(ui32) == sizeof(MR->lkey)");
  201. return MR->lkey;
  202. }
  203. ui32 GetRKey() const {
  204. static_assert(sizeof(ui32) == sizeof(MR->lkey), "expect sizeof(ui32) == sizeof(MR->lkey)");
  205. return MR->lkey;
  206. }
  207. char* GetData() {
  208. return MR ? (char*)MR->addr : nullptr;
  209. }
  210. bool IsCovered(const void* data, size_t len) const {
  211. size_t dataAddr = reinterpret_cast<size_t>(data) / sizeof(char);
  212. size_t bufAddr = reinterpret_cast<size_t>(MR->addr) / sizeof(char);
  213. return (dataAddr >= bufAddr) && (dataAddr + len <= bufAddr + MR->length);
  214. }
  215. };
  216. class TSharedReceiveQueue: public TThrRefBase, TNonCopyable {
  217. ibv_srq* SRQ;
  218. TIntrusivePtr<TIBContext> IBCtx;
  219. ~TSharedReceiveQueue() override {
  220. if (SRQ) {
  221. ibv_destroy_srq(SRQ);
  222. }
  223. }
  224. public:
  225. TSharedReceiveQueue(TPtrArg<TIBContext> ctx, int maxWR)
  226. : IBCtx(ctx)
  227. {
  228. ibv_srq_init_attr attr;
  229. Zero(attr);
  230. attr.srq_context = this;
  231. attr.attr.max_sge = MAX_SGE;
  232. attr.attr.max_wr = maxWR;
  233. TIBContext::TLock ibContext(IBCtx);
  234. SRQ = ibv_create_srq(ibContext.GetProtDomain(), &attr);
  235. Y_ASSERT(SRQ);
  236. }
  237. ibv_srq* GetSRQ() {
  238. return SRQ;
  239. }
  240. void PostReceive(TPtrArg<TMemoryRegion> mem, ui64 id, const void* buf, size_t len) {
  241. Y_ASSERT(mem->IsCovered(buf, len));
  242. ibv_recv_wr wr, *bad;
  243. ibv_sge sg;
  244. sg.addr = reinterpret_cast<ui64>(buf) / sizeof(char);
  245. sg.length = len;
  246. sg.lkey = mem->GetLKey();
  247. Zero(wr);
  248. wr.wr_id = id;
  249. wr.sg_list = &sg;
  250. wr.num_sge = 1;
  251. CHECK_Z(ibv_post_srq_recv(SRQ, &wr, &bad));
  252. }
  253. };
  254. inline void MakeAH(ibv_ah_attr* res, TPtrArg<TIBPort> port, int lid, int serviceLevel) {
  255. Zero(*res);
  256. res->dlid = lid;
  257. res->port_num = port->GetPort();
  258. res->sl = serviceLevel;
  259. }
  260. void MakeAH(ibv_ah_attr* res, TPtrArg<TIBPort> port, const TUdpAddress& remoteAddr, const TUdpAddress& localAddr, int serviceLevel);
  261. class TAddressHandle: public TThrRefBase, TNonCopyable {
  262. ibv_ah* AH;
  263. TIntrusivePtr<TIBContext> IBCtx;
  264. ~TAddressHandle() override {
  265. if (AH) {
  266. CHECK_Z(ibv_destroy_ah(AH));
  267. }
  268. AH = nullptr;
  269. IBCtx = nullptr;
  270. }
  271. public:
  272. TAddressHandle(TPtrArg<TIBContext> ctx, ibv_ah_attr* attr)
  273. : IBCtx(ctx)
  274. {
  275. TIBContext::TLock ibContext(IBCtx);
  276. AH = ibv_create_ah(ibContext.GetProtDomain(), attr);
  277. Y_ASSERT(AH != nullptr);
  278. }
  279. TAddressHandle(TPtrArg<TIBPort> port, int lid, int serviceLevel)
  280. : IBCtx(port->GetCtx())
  281. {
  282. ibv_ah_attr attr;
  283. MakeAH(&attr, port, lid, serviceLevel);
  284. TIBContext::TLock ibContext(IBCtx);
  285. AH = ibv_create_ah(ibContext.GetProtDomain(), &attr);
  286. Y_ASSERT(AH != nullptr);
  287. }
  288. TAddressHandle(TPtrArg<TIBPort> port, const TUdpAddress& remoteAddr, const TUdpAddress& localAddr, int serviceLevel)
  289. : IBCtx(port->GetCtx())
  290. {
  291. ibv_ah_attr attr;
  292. MakeAH(&attr, port, remoteAddr, localAddr, serviceLevel);
  293. TIBContext::TLock ibContext(IBCtx);
  294. AH = ibv_create_ah(ibContext.GetProtDomain(), &attr);
  295. Y_ASSERT(AH != nullptr);
  296. }
  297. ibv_ah* GetAH() {
  298. return AH;
  299. }
  300. bool IsValid() const {
  301. return AH != nullptr;
  302. }
  303. };
  304. // GRH + wc -> address_handle_attr
  305. //int ibv_init_ah_from_wc(struct ibv_context *context, ui8 port_num,
  306. //struct ibv_wc *wc, struct ibv_grh *grh,
  307. //struct ibv_ah_attr *ah_attr)
  308. //ibv_create_ah_from_wc(struct ibv_pd *pd, struct ibv_wc *wc, struct ibv_grh
  309. // *grh, ui8 port_num)
  310. class TQueuePair: public TThrRefBase, TNonCopyable {
  311. protected:
  312. ibv_qp* QP;
  313. int MyPSN; // start packet sequence number
  314. TIntrusivePtr<TIBContext> IBCtx;
  315. TIntrusivePtr<TComplectionQueue> CQ;
  316. TIntrusivePtr<TSharedReceiveQueue> SRQ;
  317. TQueuePair(TPtrArg<TIBContext> ctx, TPtrArg<TComplectionQueue> cq, TPtrArg<TSharedReceiveQueue> srq,
  318. int sendQueueSize,
  319. ibv_qp_type qp_type)
  320. : IBCtx(ctx)
  321. , CQ(cq)
  322. , SRQ(srq)
  323. {
  324. MyPSN = GetCycleCount() & 0xffffff; // should be random and different on different runs, 24bit
  325. ibv_qp_init_attr attr;
  326. Zero(attr);
  327. attr.qp_context = this; // not really useful
  328. attr.send_cq = cq->GetCQ();
  329. attr.recv_cq = cq->GetCQ();
  330. attr.srq = srq->GetSRQ();
  331. attr.cap.max_send_wr = sendQueueSize;
  332. attr.cap.max_recv_wr = 0; // we are using srq, no need for qp's rq
  333. attr.cap.max_send_sge = MAX_SGE;
  334. attr.cap.max_recv_sge = MAX_SGE;
  335. attr.cap.max_inline_data = MAX_INLINE_DATA_SIZE;
  336. attr.qp_type = qp_type;
  337. attr.sq_sig_all = 1; // inline sends need not be signaled, but if they are not work queue overflows
  338. TIBContext::TLock ibContext(IBCtx);
  339. QP = ibv_create_qp(ibContext.GetProtDomain(), &attr);
  340. Y_ASSERT(QP);
  341. //struct ibv_qp {
  342. // struct ibv_context *context;
  343. // void *qp_context;
  344. // struct ibv_pd *pd;
  345. // struct ibv_cq *send_cq;
  346. // struct ibv_cq *recv_cq;
  347. // struct ibv_srq *srq;
  348. // ui32 handle;
  349. // ui32 qp_num;
  350. // enum ibv_qp_state state;
  351. // enum ibv_qp_type qp_type;
  352. // pthread_mutex_t mutex;
  353. // pthread_cond_t cond;
  354. // ui32 events_completed;
  355. //};
  356. //qp_context The value qp_context that was provided to ibv_create_qp()
  357. //qp_num The number of this Queue Pair
  358. //state The last known state of this Queue Pair. The actual state may be different from this state (in the RDMA device transitioned the state into other state)
  359. //qp_type The Transport Service Type of this Queue Pair
  360. }
  361. ~TQueuePair() override {
  362. if (QP) {
  363. CHECK_Z(ibv_destroy_qp(QP));
  364. }
  365. }
  366. void FillSendAttrs(ibv_send_wr* wr, ibv_sge* sg,
  367. ui64 localAddr, ui32 lKey, ui64 id, size_t len) {
  368. sg->addr = localAddr;
  369. sg->length = len;
  370. sg->lkey = lKey;
  371. Zero(*wr);
  372. wr->wr_id = id;
  373. wr->sg_list = sg;
  374. wr->num_sge = 1;
  375. if (len <= MAX_INLINE_DATA_SIZE) {
  376. wr->send_flags = IBV_SEND_INLINE;
  377. }
  378. }
  379. void FillSendAttrs(ibv_send_wr* wr, ibv_sge* sg,
  380. TPtrArg<TMemoryRegion> mem, ui64 id, const void* data, size_t len) {
  381. ui64 localAddr = reinterpret_cast<ui64>(data) / sizeof(char);
  382. ui32 lKey = 0;
  383. if (mem) {
  384. Y_ASSERT(mem->IsCovered(data, len));
  385. lKey = mem->GetLKey();
  386. } else {
  387. Y_ASSERT(len <= MAX_INLINE_DATA_SIZE);
  388. }
  389. FillSendAttrs(wr, sg, localAddr, lKey, id, len);
  390. }
  391. public:
  392. int GetQPN() const {
  393. if (QP)
  394. return QP->qp_num;
  395. return 0;
  396. }
  397. int GetPSN() const {
  398. return MyPSN;
  399. }
  400. // we are using srq
  401. //void PostReceive(const TMemoryRegion &mem)
  402. //{
  403. // ibv_recv_wr wr, *bad;
  404. // ibv_sge sg;
  405. // sg.addr = mem.Addr;
  406. // sg.length = mem.Length;
  407. // sg.lkey = mem.lkey;
  408. // Zero(wr);
  409. // wr.wr_id = 13;
  410. // wr.sg_list = sg;
  411. // wr.num_sge = 1;
  412. // CHECK_Z(ibv_post_recv(QP, &wr, &bad));
  413. //}
  414. };
  415. class TRCQueuePair: public TQueuePair {
  416. public:
  417. TRCQueuePair(TPtrArg<TIBContext> ctx, TPtrArg<TComplectionQueue> cq, TPtrArg<TSharedReceiveQueue> srq, int sendQueueSize)
  418. : TQueuePair(ctx, cq, srq, sendQueueSize, IBV_QPT_RC)
  419. {
  420. }
  421. // SRQ should have receive posted
  422. void Init(const ibv_ah_attr& peerAddr, int peerQPN, int peerPSN) {
  423. Y_ASSERT(QP->qp_type == IBV_QPT_RC);
  424. ibv_qp_attr attr;
  425. //{
  426. // enum ibv_qp_state qp_state;
  427. // enum ibv_qp_state cur_qp_state;
  428. // enum ibv_mtu path_mtu;
  429. // enum ibv_mig_state path_mig_state;
  430. // ui32 qkey;
  431. // ui32 rq_psn;
  432. // ui32 sq_psn;
  433. // ui32 dest_qp_num;
  434. // int qp_access_flags;
  435. // struct ibv_qp_cap cap;
  436. // struct ibv_ah_attr ah_attr;
  437. // struct ibv_ah_attr alt_ah_attr;
  438. // ui16 pkey_index;
  439. // ui16 alt_pkey_index;
  440. // ui8 en_sqd_async_notify;
  441. // ui8 sq_draining;
  442. // ui8 max_rd_atomic;
  443. // ui8 max_dest_rd_atomic;
  444. // ui8 min_rnr_timer;
  445. // ui8 port_num;
  446. // ui8 timeout;
  447. // ui8 retry_cnt;
  448. // ui8 rnr_retry;
  449. // ui8 alt_port_num;
  450. // ui8 alt_timeout;
  451. //};
  452. // RESET -> INIT
  453. Zero(attr);
  454. attr.qp_state = IBV_QPS_INIT;
  455. attr.pkey_index = 0;
  456. attr.port_num = peerAddr.port_num;
  457. // for connected QP
  458. attr.qp_access_flags = IBV_ACCESS_REMOTE_WRITE | IBV_ACCESS_REMOTE_READ | IBV_ACCESS_REMOTE_ATOMIC;
  459. CHECK_Z(ibv_modify_qp(QP, &attr,
  460. IBV_QP_STATE | IBV_QP_PKEY_INDEX | IBV_QP_PORT | IBV_QP_ACCESS_FLAGS));
  461. // INIT -> ReadyToReceive
  462. //PostReceive(mem);
  463. attr.qp_state = IBV_QPS_RTR;
  464. attr.path_mtu = IBV_MTU_512; // allows more fine grained VL arbitration
  465. // for connected QP
  466. attr.ah_attr = peerAddr;
  467. attr.dest_qp_num = peerQPN;
  468. attr.rq_psn = peerPSN;
  469. attr.max_dest_rd_atomic = MAX_OUTSTANDING_RDMA; // number of outstanding RDMA requests
  470. attr.min_rnr_timer = 12; // recommended
  471. CHECK_Z(ibv_modify_qp(QP, &attr,
  472. IBV_QP_STATE | IBV_QP_PATH_MTU |
  473. IBV_QP_AV | IBV_QP_DEST_QPN | IBV_QP_RQ_PSN | IBV_QP_MAX_DEST_RD_ATOMIC | IBV_QP_MIN_RNR_TIMER));
  474. // ReadyToReceive -> ReadyToTransmit
  475. attr.qp_state = IBV_QPS_RTS;
  476. // for connected QP
  477. attr.timeout = 14; // increased to 18 for sometime, 14 recommended
  478. //attr.retry_cnt = 0; // for debug purposes
  479. //attr.rnr_retry = 0; // for debug purposes
  480. attr.retry_cnt = 7; // release configuration
  481. attr.rnr_retry = 7; // release configuration (try forever)
  482. attr.sq_psn = MyPSN;
  483. attr.max_rd_atomic = MAX_OUTSTANDING_RDMA; // number of outstanding RDMA requests
  484. CHECK_Z(ibv_modify_qp(QP, &attr,
  485. IBV_QP_STATE |
  486. IBV_QP_TIMEOUT | IBV_QP_RETRY_CNT | IBV_QP_RNR_RETRY | IBV_QP_SQ_PSN | IBV_QP_MAX_QP_RD_ATOMIC));
  487. }
  488. void PostSend(TPtrArg<TMemoryRegion> mem, ui64 id, const void* data, size_t len) {
  489. ibv_send_wr wr, *bad;
  490. ibv_sge sg;
  491. FillSendAttrs(&wr, &sg, mem, id, data, len);
  492. wr.opcode = IBV_WR_SEND;
  493. //IBV_WR_RDMA_WRITE
  494. //IBV_WR_RDMA_WRITE_WITH_IMM
  495. //IBV_WR_SEND
  496. //IBV_WR_SEND_WITH_IMM
  497. //IBV_WR_RDMA_READ
  498. //wr.imm_data = xz;
  499. CHECK_Z(ibv_post_send(QP, &wr, &bad));
  500. }
  501. void PostRDMAWrite(ui64 remoteAddr, ui32 remoteKey,
  502. TPtrArg<TMemoryRegion> mem, ui64 id, const void* data, size_t len) {
  503. ibv_send_wr wr, *bad;
  504. ibv_sge sg;
  505. FillSendAttrs(&wr, &sg, mem, id, data, len);
  506. wr.opcode = IBV_WR_RDMA_WRITE;
  507. wr.wr.rdma.remote_addr = remoteAddr;
  508. wr.wr.rdma.rkey = remoteKey;
  509. CHECK_Z(ibv_post_send(QP, &wr, &bad));
  510. }
  511. void PostRDMAWrite(ui64 remoteAddr, ui32 remoteKey,
  512. ui64 localAddr, ui32 localKey, ui64 id, size_t len) {
  513. ibv_send_wr wr, *bad;
  514. ibv_sge sg;
  515. FillSendAttrs(&wr, &sg, localAddr, localKey, id, len);
  516. wr.opcode = IBV_WR_RDMA_WRITE;
  517. wr.wr.rdma.remote_addr = remoteAddr;
  518. wr.wr.rdma.rkey = remoteKey;
  519. CHECK_Z(ibv_post_send(QP, &wr, &bad));
  520. }
  521. void PostRDMAWriteImm(ui64 remoteAddr, ui32 remoteKey, ui32 immData,
  522. TPtrArg<TMemoryRegion> mem, ui64 id, const void* data, size_t len) {
  523. ibv_send_wr wr, *bad;
  524. ibv_sge sg;
  525. FillSendAttrs(&wr, &sg, mem, id, data, len);
  526. wr.opcode = IBV_WR_RDMA_WRITE_WITH_IMM;
  527. wr.imm_data = immData;
  528. wr.wr.rdma.remote_addr = remoteAddr;
  529. wr.wr.rdma.rkey = remoteKey;
  530. CHECK_Z(ibv_post_send(QP, &wr, &bad));
  531. }
  532. };
  533. class TUDQueuePair: public TQueuePair {
  534. TIntrusivePtr<TIBPort> Port;
  535. public:
  536. TUDQueuePair(TPtrArg<TIBPort> port, TPtrArg<TComplectionQueue> cq, TPtrArg<TSharedReceiveQueue> srq, int sendQueueSize)
  537. : TQueuePair(port->GetCtx(), cq, srq, sendQueueSize, IBV_QPT_UD)
  538. , Port(port)
  539. {
  540. }
  541. // SRQ should have receive posted
  542. void Init(int qkey) {
  543. Y_ASSERT(QP->qp_type == IBV_QPT_UD);
  544. ibv_qp_attr attr;
  545. // RESET -> INIT
  546. Zero(attr);
  547. attr.qp_state = IBV_QPS_INIT;
  548. attr.pkey_index = 0;
  549. attr.port_num = Port->GetPort();
  550. // for unconnected qp
  551. attr.qkey = qkey;
  552. CHECK_Z(ibv_modify_qp(QP, &attr,
  553. IBV_QP_STATE | IBV_QP_PKEY_INDEX | IBV_QP_PORT | IBV_QP_QKEY));
  554. // INIT -> ReadyToReceive
  555. //PostReceive(mem);
  556. attr.qp_state = IBV_QPS_RTR;
  557. CHECK_Z(ibv_modify_qp(QP, &attr, IBV_QP_STATE));
  558. // ReadyToReceive -> ReadyToTransmit
  559. attr.qp_state = IBV_QPS_RTS;
  560. attr.sq_psn = 0;
  561. CHECK_Z(ibv_modify_qp(QP, &attr, IBV_QP_STATE | IBV_QP_SQ_PSN));
  562. }
  563. void PostSend(TPtrArg<TAddressHandle> ah, int remoteQPN, int remoteQKey,
  564. TPtrArg<TMemoryRegion> mem, ui64 id, const void* data, size_t len) {
  565. ibv_send_wr wr, *bad;
  566. ibv_sge sg;
  567. FillSendAttrs(&wr, &sg, mem, id, data, len);
  568. wr.opcode = IBV_WR_SEND;
  569. wr.wr.ud.ah = ah->GetAH();
  570. wr.wr.ud.remote_qpn = remoteQPN;
  571. wr.wr.ud.remote_qkey = remoteQKey;
  572. //IBV_WR_SEND_WITH_IMM
  573. //wr.imm_data = xz;
  574. CHECK_Z(ibv_post_send(QP, &wr, &bad));
  575. }
  576. };
  577. TIntrusivePtr<TIBPort> GetIBDevice();
  578. #else
  579. //////////////////////////////////////////////////////////////////////////
  580. // stub for OS without IB support
  581. //////////////////////////////////////////////////////////////////////////
  582. enum ibv_wc_opcode {
  583. IBV_WC_SEND,
  584. IBV_WC_RDMA_WRITE,
  585. IBV_WC_RDMA_READ,
  586. IBV_WC_COMP_SWAP,
  587. IBV_WC_FETCH_ADD,
  588. IBV_WC_BIND_MW,
  589. IBV_WC_RECV = 1 << 7,
  590. IBV_WC_RECV_RDMA_WITH_IMM
  591. };
  592. enum ibv_wc_status {
  593. IBV_WC_SUCCESS,
  594. // lots of errors follow
  595. };
  596. //struct ibv_device;
  597. //struct ibv_pd;
  598. union ibv_gid {
  599. ui8 raw[16];
  600. struct {
  601. ui64 subnet_prefix;
  602. ui64 interface_id;
  603. } global;
  604. };
  605. struct ibv_wc {
  606. ui64 wr_id;
  607. enum ibv_wc_status status;
  608. enum ibv_wc_opcode opcode;
  609. ui32 imm_data; /* in network byte order */
  610. ui32 qp_num;
  611. ui32 src_qp;
  612. };
  613. struct ibv_grh {};
  614. struct ibv_ah_attr {
  615. ui8 sl;
  616. };
  617. //struct ibv_cq;
  618. class TIBContext: public TThrRefBase, TNonCopyable {
  619. public:
  620. bool IsValid() const {
  621. return false;
  622. }
  623. //ibv_context *GetContext() { return 0; }
  624. //ibv_pd *GetProtDomain() { return 0; }
  625. };
  626. class TIBPort: public TThrRefBase, TNonCopyable {
  627. public:
  628. TIBPort(TPtrArg<TIBContext>, int) {
  629. }
  630. int GetPort() const {
  631. return 1;
  632. }
  633. int GetLID() const {
  634. return 1;
  635. }
  636. TIBContext* GetCtx() {
  637. return 0;
  638. }
  639. void GetGID(ibv_gid* res) {
  640. Zero(*res);
  641. }
  642. void GetAHAttr(ibv_wc*, ibv_grh*, ibv_ah_attr*) {
  643. }
  644. };
  645. class TComplectionQueue: public TThrRefBase, TNonCopyable {
  646. public:
  647. TComplectionQueue(TPtrArg<TIBContext>, int) {
  648. }
  649. //ibv_cq *GetCQ() { return 0; }
  650. int Poll(ibv_wc*, int) {
  651. return 0;
  652. }
  653. };
  654. class TMemoryRegion: public TThrRefBase, TNonCopyable {
  655. public:
  656. TMemoryRegion(TPtrArg<TIBContext>, size_t) {
  657. }
  658. ui32 GetLKey() const {
  659. return 0;
  660. }
  661. ui32 GetRKey() const {
  662. return 0;
  663. }
  664. char* GetData() {
  665. return 0;
  666. }
  667. bool IsCovered(const void*, size_t) const {
  668. return false;
  669. }
  670. };
  671. class TSharedReceiveQueue: public TThrRefBase, TNonCopyable {
  672. public:
  673. TSharedReceiveQueue(TPtrArg<TIBContext>, int) {
  674. }
  675. //ibv_srq *GetSRQ() { return SRQ; }
  676. void PostReceive(TPtrArg<TMemoryRegion>, ui64, const void*, size_t) {
  677. }
  678. };
  679. inline void MakeAH(ibv_ah_attr*, TPtrArg<TIBPort>, int, int) {
  680. }
  681. class TAddressHandle: public TThrRefBase, TNonCopyable {
  682. public:
  683. TAddressHandle(TPtrArg<TIBContext>, ibv_ah_attr*) {
  684. }
  685. TAddressHandle(TPtrArg<TIBPort>, int, int) {
  686. }
  687. TAddressHandle(TPtrArg<TIBPort>, const TUdpAddress&, const TUdpAddress&, int) {
  688. }
  689. //ibv_ah *GetAH() { return AH; }
  690. bool IsValid() {
  691. return true;
  692. }
  693. };
  694. class TQueuePair: public TThrRefBase, TNonCopyable {
  695. public:
  696. int GetQPN() const {
  697. return 0;
  698. }
  699. int GetPSN() const {
  700. return 0;
  701. }
  702. };
  703. class TRCQueuePair: public TQueuePair {
  704. public:
  705. TRCQueuePair(TPtrArg<TIBContext>, TPtrArg<TComplectionQueue>, TPtrArg<TSharedReceiveQueue>, int) {
  706. }
  707. // SRQ should have receive posted
  708. void Init(const ibv_ah_attr&, int, int) {
  709. }
  710. void PostSend(TPtrArg<TMemoryRegion>, ui64, const void*, size_t) {
  711. }
  712. void PostRDMAWrite(ui64, ui32, TPtrArg<TMemoryRegion>, ui64, const void*, size_t) {
  713. }
  714. void PostRDMAWrite(ui64, ui32, ui64, ui32, ui64, size_t) {
  715. }
  716. void PostRDMAWriteImm(ui64, ui32, ui32, TPtrArg<TMemoryRegion>, ui64, const void*, size_t) {
  717. }
  718. };
  719. class TUDQueuePair: public TQueuePair {
  720. TIntrusivePtr<TIBPort> Port;
  721. public:
  722. TUDQueuePair(TPtrArg<TIBPort>, TPtrArg<TComplectionQueue>, TPtrArg<TSharedReceiveQueue>, int) {
  723. }
  724. // SRQ should have receive posted
  725. void Init(int) {
  726. }
  727. void PostSend(TPtrArg<TAddressHandle>, int, int, TPtrArg<TMemoryRegion>, ui64, const void*, size_t) {
  728. }
  729. };
  730. inline TIntrusivePtr<TIBPort> GetIBDevice() {
  731. return 0;
  732. }
  733. #endif
  734. }