ib_cs.cpp 32 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737738739740741742743744745746747748749750751752753754755756757758759760761762763764765766767768769770771772773774775776
  1. #include "stdafx.h"
  2. #include "ib_cs.h"
  3. #include "ib_buffers.h"
  4. #include "ib_mem.h"
  5. #include <util/generic/deque.h>
  6. #include <util/digest/murmur.h>
  7. /*
  8. Questions
  9. does rdma work?
  10. what is RC latency?
  11. 3us if measured by completion event arrival
  12. 2.3us if bind to socket 0 & use inline send
  13. memory region - can we use memory from some offset?
  14. yes
  15. is send_inplace supported and is it faster?
  16. yes, supported, 1024 bytes limit, inline is faster (2.4 vs 2.9)
  17. is srq a penalty compared to regular rq?
  18. rdma is faster anyway, so why bother
  19. collective ops
  20. support asymmetric configurations by additional transfers (overlap 1 or 2 hosts is allowed)
  21. remove commented stuff all around
  22. next gen
  23. shared+registered large mem blocks for easy transfer
  24. no crc calcs
  25. direct channel exposure
  26. make ui64 packet id? otherwise we could get duplicate id (highly improbable but possible)
  27. lock free allocation in ib_mem
  28. */
  29. namespace NNetliba {
  30. const int WELCOME_QKEY = 0x13081976;
  31. const int MAX_SEND_COUNT = (128 - 10) / 4;
  32. const int QP_SEND_QUEUE_SIZE = (MAX_SEND_COUNT * 2 + 10) + 10;
  33. const int WELCOME_QP_SEND_SIZE = 10000;
  34. const int MAX_SRQ_WORK_REQUESTS = 10000;
  35. const int MAX_CQ_EVENTS = MAX_SRQ_WORK_REQUESTS; //1000;
  36. const double CHANNEL_CHECK_INTERVAL = 1.;
  37. const int TRAFFIC_SL = 4; // 4 is mandatory for RoCE to work, it's the only lossless priority(?)
  38. const int CONNECT_SL = 1;
  39. class TIBClientServer: public IIBClientServer {
  40. enum ECmd {
  41. CMD_HANDSHAKE,
  42. CMD_HANDSHAKE_ACK,
  43. CMD_CONFIRM,
  44. CMD_DATA_TINY,
  45. CMD_DATA_INIT,
  46. CMD_BUFFER_READY,
  47. CMD_DATA_COMPLETE,
  48. CMD_KEEP_ALIVE,
  49. };
  50. #pragma pack(1)
  51. struct TCmdHandshake {
  52. char Command;
  53. int QPN, PSN;
  54. TGUID SocketId;
  55. TUdpAddress MyAddress; // address of the handshake sender as viewed from receiver
  56. };
  57. struct TCmdHandshakeAck {
  58. char Command;
  59. int QPN, PSN;
  60. int YourQPN;
  61. };
  62. struct TCmdConfirm {
  63. char Command;
  64. };
  65. struct TCmdDataTiny {
  66. struct THeader {
  67. char Command;
  68. ui16 Size;
  69. TGUID PacketGuid;
  70. } Header;
  71. typedef char TDataVec[SMALL_PKT_SIZE - sizeof(THeader)];
  72. TDataVec Data;
  73. static int GetMaxDataSize() {
  74. return sizeof(TDataVec);
  75. }
  76. };
  77. struct TCmdDataInit {
  78. char Command;
  79. size_t Size;
  80. TGUID PacketGuid;
  81. };
  82. struct TCmdBufferReady {
  83. char Command;
  84. TGUID PacketGuid;
  85. ui64 RemoteAddr;
  86. ui32 RemoteKey;
  87. };
  88. struct TCmdDataComplete {
  89. char Command;
  90. TGUID PacketGuid;
  91. ui64 DataHash;
  92. };
  93. struct TCmdKeepAlive {
  94. char Command;
  95. };
  96. #pragma pack()
  97. struct TCompleteInfo {
  98. enum {
  99. CI_DATA_TINY,
  100. CI_RDMA_COMPLETE,
  101. CI_DATA_SENT,
  102. CI_KEEP_ALIVE,
  103. CI_IGNORE,
  104. };
  105. int Type;
  106. int BufId;
  107. TIBMsgHandle MsgHandle;
  108. TCompleteInfo(int t, int bufId, TIBMsgHandle msg)
  109. : Type(t)
  110. , BufId(bufId)
  111. , MsgHandle(msg)
  112. {
  113. }
  114. };
  115. struct TPendingQueuedSend {
  116. TGUID PacketGuid;
  117. TIBMsgHandle MsgHandle;
  118. TRopeDataPacket* Data;
  119. TPendingQueuedSend()
  120. : MsgHandle(0)
  121. {
  122. }
  123. TPendingQueuedSend(const TGUID& packetGuid, TIBMsgHandle msgHandle, TRopeDataPacket* data)
  124. : PacketGuid(packetGuid)
  125. , MsgHandle(msgHandle)
  126. , Data(data)
  127. {
  128. }
  129. };
  130. struct TQueuedSend {
  131. TGUID PacketGuid;
  132. TIBMsgHandle MsgHandle;
  133. TIntrusivePtr<TIBMemBlock> MemBlock;
  134. ui64 RemoteAddr;
  135. ui32 RemoteKey;
  136. TQueuedSend() = default;
  137. TQueuedSend(const TGUID& packetGuid, TIBMsgHandle msgHandle)
  138. : PacketGuid(packetGuid)
  139. , MsgHandle(msgHandle)
  140. , RemoteAddr(0)
  141. , RemoteKey(0)
  142. {
  143. }
  144. };
  145. struct TQueuedRecv {
  146. TGUID PacketGuid;
  147. TIntrusivePtr<TIBMemBlock> Data;
  148. TQueuedRecv() = default;
  149. TQueuedRecv(const TGUID& packetGuid, TPtrArg<TIBMemBlock> data)
  150. : PacketGuid(packetGuid)
  151. , Data(data)
  152. {
  153. }
  154. };
  155. struct TIBPeer: public IIBPeer {
  156. TUdpAddress PeerAddress;
  157. TIntrusivePtr<TRCQueuePair> QP;
  158. EState State;
  159. int SendCount;
  160. NHPTimer::STime LastRecv;
  161. TDeque<TPendingQueuedSend> PendingSendQueue;
  162. // these lists have limited size and potentially just circle buffers
  163. TDeque<TQueuedSend> SendQueue;
  164. TDeque<TQueuedRecv> RecvQueue;
  165. TDeque<TCompleteInfo> OutMsgs;
  166. TIBPeer(const TUdpAddress& peerAddress, TPtrArg<TRCQueuePair> qp)
  167. : PeerAddress(peerAddress)
  168. , QP(qp)
  169. , State(CONNECTING)
  170. , SendCount(0)
  171. {
  172. NHPTimer::GetTime(&LastRecv);
  173. }
  174. ~TIBPeer() override {
  175. //printf("IBPeer destroyed\n");
  176. }
  177. EState GetState() override {
  178. return State;
  179. }
  180. TDeque<TQueuedSend>::iterator GetSend(const TGUID& packetGuid) {
  181. for (TDeque<TQueuedSend>::iterator z = SendQueue.begin(); z != SendQueue.end(); ++z) {
  182. if (z->PacketGuid == packetGuid) {
  183. return z;
  184. }
  185. }
  186. Y_ABORT_UNLESS(0, "no send by guid");
  187. return SendQueue.begin();
  188. }
  189. TDeque<TQueuedSend>::iterator GetSend(TIBMsgHandle msgHandle) {
  190. for (TDeque<TQueuedSend>::iterator z = SendQueue.begin(); z != SendQueue.end(); ++z) {
  191. if (z->MsgHandle == msgHandle) {
  192. return z;
  193. }
  194. }
  195. Y_ABORT_UNLESS(0, "no send by handle");
  196. return SendQueue.begin();
  197. }
  198. TDeque<TQueuedRecv>::iterator GetRecv(const TGUID& packetGuid) {
  199. for (TDeque<TQueuedRecv>::iterator z = RecvQueue.begin(); z != RecvQueue.end(); ++z) {
  200. if (z->PacketGuid == packetGuid) {
  201. return z;
  202. }
  203. }
  204. Y_ABORT_UNLESS(0, "no recv by guid");
  205. return RecvQueue.begin();
  206. }
  207. void PostRDMA(TQueuedSend& qs) {
  208. Y_ASSERT(qs.RemoteAddr != 0 && qs.MemBlock.Get() != nullptr);
  209. QP->PostRDMAWrite(qs.RemoteAddr, qs.RemoteKey,
  210. qs.MemBlock->GetMemRegion(), 0, qs.MemBlock->GetData(), qs.MemBlock->GetSize());
  211. OutMsgs.push_back(TCompleteInfo(TCompleteInfo::CI_RDMA_COMPLETE, 0, qs.MsgHandle));
  212. //printf("Post rdma write, size %d\n", qs.Data->GetSize());
  213. }
  214. void PostSend(TIBBufferPool& bp, const void* data, size_t len, int t, TIBMsgHandle msgHandle) {
  215. int bufId = bp.PostSend(QP, data, len);
  216. OutMsgs.push_back(TCompleteInfo(t, bufId, msgHandle));
  217. }
  218. };
  219. TIntrusivePtr<TIBPort> Port;
  220. TIntrusivePtr<TIBMemPool> MemPool;
  221. TIntrusivePtr<TIBMemPool::TCopyResultStorage> CopyResults;
  222. TIntrusivePtr<TComplectionQueue> CQ;
  223. TIBBufferPool BP;
  224. TIntrusivePtr<TUDQueuePair> WelcomeQP;
  225. int WelcomeQPN;
  226. TIBConnectInfo ConnectInfo;
  227. TDeque<TIBSendResult> SendResults;
  228. TDeque<TRequest*> ReceivedList;
  229. typedef THashMap<int, TIntrusivePtr<TIBPeer>> TPeerChannelHash;
  230. TPeerChannelHash Channels;
  231. TIBMsgHandle MsgCounter;
  232. NHPTimer::STime LastCheckTime;
  233. ~TIBClientServer() override {
  234. for (auto& z : ReceivedList) {
  235. delete z;
  236. }
  237. }
  238. TIBPeer* GetChannelByQPN(int qpn) {
  239. TPeerChannelHash::iterator z = Channels.find(qpn);
  240. if (z == Channels.end()) {
  241. return nullptr;
  242. }
  243. return z->second.Get();
  244. }
  245. // IIBClientServer
  246. TRequest* GetRequest() override {
  247. if (ReceivedList.empty()) {
  248. return nullptr;
  249. }
  250. TRequest* res = ReceivedList.front();
  251. ReceivedList.pop_front();
  252. return res;
  253. }
  254. bool GetSendResult(TIBSendResult* res) override {
  255. if (SendResults.empty()) {
  256. return false;
  257. }
  258. *res = SendResults.front();
  259. SendResults.pop_front();
  260. return true;
  261. }
  262. void StartSend(TPtrArg<TIBPeer> peer, const TGUID& packetGuid, TIBMsgHandle msgHandle, TRopeDataPacket* data) {
  263. int sz = data->GetSize();
  264. if (sz <= TCmdDataTiny::GetMaxDataSize()) {
  265. TCmdDataTiny dataTiny;
  266. dataTiny.Header.Command = CMD_DATA_TINY;
  267. dataTiny.Header.Size = (ui16)sz;
  268. dataTiny.Header.PacketGuid = packetGuid;
  269. TBlockChainIterator bc(data->GetChain());
  270. bc.Read(dataTiny.Data, sz);
  271. peer->PostSend(BP, &dataTiny, sizeof(dataTiny.Header) + sz, TCompleteInfo::CI_DATA_TINY, msgHandle);
  272. //printf("Send CMD_DATA_TINY\n");
  273. } else {
  274. MemPool->CopyData(data, msgHandle, peer, CopyResults);
  275. peer->SendQueue.push_back(TQueuedSend(packetGuid, msgHandle));
  276. {
  277. TQueuedSend& msg = peer->SendQueue.back();
  278. TCmdDataInit dataInit;
  279. dataInit.Command = CMD_DATA_INIT;
  280. dataInit.PacketGuid = msg.PacketGuid;
  281. dataInit.Size = data->GetSize();
  282. peer->PostSend(BP, &dataInit, sizeof(dataInit), TCompleteInfo::CI_IGNORE, 0);
  283. //printf("Send CMD_DATA_INIT\n");
  284. }
  285. }
  286. ++peer->SendCount;
  287. }
  288. void SendCompleted(TPtrArg<TIBPeer> peer, TIBMsgHandle msgHandle) {
  289. SendResults.push_back(TIBSendResult(msgHandle, true));
  290. if (--peer->SendCount < MAX_SEND_COUNT) {
  291. if (!peer->PendingSendQueue.empty()) {
  292. TPendingQueuedSend& qs = peer->PendingSendQueue.front();
  293. StartSend(peer, qs.PacketGuid, qs.MsgHandle, qs.Data);
  294. //printf("Sending pending %d\n", qs.MsgHandle);
  295. peer->PendingSendQueue.pop_front();
  296. }
  297. }
  298. }
  299. void SendFailed(TPtrArg<TIBPeer> peer, TIBMsgHandle msgHandle) {
  300. //printf("IB SendFailed()\n");
  301. SendResults.push_back(TIBSendResult(msgHandle, false));
  302. --peer->SendCount;
  303. }
  304. void PeerFailed(TPtrArg<TIBPeer> peer) {
  305. //printf("PeerFailed(), peer %p, state %d (%d pending, %d queued, %d out, %d sendcount)\n",
  306. // peer.Get(), peer->State,
  307. // (int)peer->PendingSendQueue.size(),
  308. // (int)peer->SendQueue.size(),
  309. // (int)peer->OutMsgs.size(),
  310. // peer->SendCount);
  311. peer->State = IIBPeer::FAILED;
  312. while (!peer->PendingSendQueue.empty()) {
  313. TPendingQueuedSend& qs = peer->PendingSendQueue.front();
  314. SendResults.push_back(TIBSendResult(qs.MsgHandle, false));
  315. peer->PendingSendQueue.pop_front();
  316. }
  317. while (!peer->SendQueue.empty()) {
  318. TQueuedSend& qs = peer->SendQueue.front();
  319. SendFailed(peer, qs.MsgHandle);
  320. peer->SendQueue.pop_front();
  321. }
  322. while (!peer->OutMsgs.empty()) {
  323. TCompleteInfo& cc = peer->OutMsgs.front();
  324. //printf("Don't wait completion for sent packet (QPN %d), bufId %d\n", peer->QP->GetQPN(), cc.BufId);
  325. if (cc.Type == TCompleteInfo::CI_DATA_TINY) {
  326. SendFailed(peer, cc.MsgHandle);
  327. }
  328. BP.FreeBuf(cc.BufId);
  329. peer->OutMsgs.pop_front();
  330. }
  331. {
  332. Y_ASSERT(peer->SendCount == 0);
  333. //printf("Remove peer %p from hash (QPN %d)\n", peer.Get(), peer->QP->GetQPN());
  334. TPeerChannelHash::iterator z = Channels.find(peer->QP->GetQPN());
  335. if (z == Channels.end()) {
  336. Y_ABORT_UNLESS(0, "peer failed for unregistered peer");
  337. }
  338. Channels.erase(z);
  339. }
  340. }
  341. TIBMsgHandle Send(TPtrArg<IIBPeer> peerArg, TRopeDataPacket* data, const TGUID& packetGuid) override {
  342. TIBPeer* peer = static_cast<TIBPeer*>(peerArg.Get()); // trust me, I'm professional
  343. if (peer == nullptr || peer->State != IIBPeer::OK) {
  344. return -1;
  345. }
  346. Y_ASSERT(Channels.find(peer->QP->GetQPN())->second == peer);
  347. TIBMsgHandle msgHandle = ++MsgCounter;
  348. if (peer->SendCount >= MAX_SEND_COUNT) {
  349. peer->PendingSendQueue.push_back(TPendingQueuedSend(packetGuid, msgHandle, data));
  350. } else {
  351. //printf("Sending direct %d\n", msgHandle);
  352. StartSend(peer, packetGuid, msgHandle, data);
  353. }
  354. return msgHandle;
  355. }
  356. void ParsePacket(ibv_wc* wc, NHPTimer::STime tCurrent) {
  357. if (wc->status != IBV_WC_SUCCESS) {
  358. TIBPeer* peer = GetChannelByQPN(wc->qp_num);
  359. if (peer) {
  360. //printf("failed recv packet (status %d)\n", wc->status);
  361. PeerFailed(peer);
  362. } else {
  363. //printf("Ignoring recv error for closed/non existing QPN %d\n", wc->qp_num);
  364. }
  365. return;
  366. }
  367. TIBRecvPacketProcess pkt(BP, *wc);
  368. TIBPeer* peer = GetChannelByQPN(wc->qp_num);
  369. if (peer) {
  370. Y_ASSERT(peer->State != IIBPeer::FAILED);
  371. peer->LastRecv = tCurrent;
  372. char cmdId = *(const char*)pkt.GetData();
  373. switch (cmdId) {
  374. case CMD_CONFIRM:
  375. //printf("got confirm\n");
  376. Y_ASSERT(peer->State == IIBPeer::CONNECTING);
  377. peer->State = IIBPeer::OK;
  378. break;
  379. case CMD_DATA_TINY:
  380. //printf("Recv CMD_DATA_TINY\n");
  381. {
  382. const TCmdDataTiny& dataTiny = *(TCmdDataTiny*)pkt.GetData();
  383. TRequest* req = new TRequest;
  384. req->Address = peer->PeerAddress;
  385. req->Guid = dataTiny.Header.PacketGuid;
  386. req->Data = new TRopeDataPacket;
  387. req->Data->Write(dataTiny.Data, dataTiny.Header.Size);
  388. ReceivedList.push_back(req);
  389. }
  390. break;
  391. case CMD_DATA_INIT:
  392. //printf("Recv CMD_DATA_INIT\n");
  393. {
  394. const TCmdDataInit& data = *(TCmdDataInit*)pkt.GetData();
  395. TIntrusivePtr<TIBMemBlock> blk = MemPool->Alloc(data.Size);
  396. peer->RecvQueue.push_back(TQueuedRecv(data.PacketGuid, blk));
  397. TCmdBufferReady ready;
  398. ready.Command = CMD_BUFFER_READY;
  399. ready.PacketGuid = data.PacketGuid;
  400. ready.RemoteAddr = reinterpret_cast<ui64>(blk->GetData()) / sizeof(char);
  401. ready.RemoteKey = blk->GetMemRegion()->GetRKey();
  402. peer->PostSend(BP, &ready, sizeof(ready), TCompleteInfo::CI_IGNORE, 0);
  403. //printf("Send CMD_BUFFER_READY\n");
  404. }
  405. break;
  406. case CMD_BUFFER_READY:
  407. //printf("Recv CMD_BUFFER_READY\n");
  408. {
  409. const TCmdBufferReady& ready = *(TCmdBufferReady*)pkt.GetData();
  410. TDeque<TQueuedSend>::iterator z = peer->GetSend(ready.PacketGuid);
  411. TQueuedSend& qs = *z;
  412. qs.RemoteAddr = ready.RemoteAddr;
  413. qs.RemoteKey = ready.RemoteKey;
  414. if (qs.MemBlock.Get()) {
  415. peer->PostRDMA(qs);
  416. }
  417. }
  418. break;
  419. case CMD_DATA_COMPLETE:
  420. //printf("Recv CMD_DATA_COMPLETE\n");
  421. {
  422. const TCmdDataComplete& cmd = *(TCmdDataComplete*)pkt.GetData();
  423. TDeque<TQueuedRecv>::iterator z = peer->GetRecv(cmd.PacketGuid);
  424. TQueuedRecv& qr = *z;
  425. #ifdef _DEBUG
  426. Y_ABORT_UNLESS(MurmurHash<ui64>(qr.Data->GetData(), qr.Data->GetSize()) == cmd.DataHash || cmd.DataHash == 0, "RDMA data hash mismatch");
  427. #endif
  428. TRequest* req = new TRequest;
  429. req->Address = peer->PeerAddress;
  430. req->Guid = qr.PacketGuid;
  431. req->Data = new TRopeDataPacket;
  432. req->Data->AddBlock(qr.Data.Get(), qr.Data->GetData(), qr.Data->GetSize());
  433. ReceivedList.push_back(req);
  434. peer->RecvQueue.erase(z);
  435. }
  436. break;
  437. case CMD_KEEP_ALIVE:
  438. break;
  439. default:
  440. Y_ASSERT(0);
  441. break;
  442. }
  443. } else {
  444. // can get here
  445. //printf("Ignoring packet for closed/non existing QPN %d\n", wc->qp_num);
  446. }
  447. }
  448. void OnComplete(ibv_wc* wc, NHPTimer::STime tCurrent) {
  449. TIBPeer* peer = GetChannelByQPN(wc->qp_num);
  450. if (peer) {
  451. if (!peer->OutMsgs.empty()) {
  452. peer->LastRecv = tCurrent;
  453. if (wc->status != IBV_WC_SUCCESS) {
  454. //printf("completed with status %d\n", wc->status);
  455. PeerFailed(peer);
  456. } else {
  457. const TCompleteInfo& cc = peer->OutMsgs.front();
  458. switch (cc.Type) {
  459. case TCompleteInfo::CI_DATA_TINY:
  460. //printf("Completed data_tiny\n");
  461. SendCompleted(peer, cc.MsgHandle);
  462. break;
  463. case TCompleteInfo::CI_RDMA_COMPLETE:
  464. //printf("Completed rdma_complete\n");
  465. {
  466. TDeque<TQueuedSend>::iterator z = peer->GetSend(cc.MsgHandle);
  467. TQueuedSend& qs = *z;
  468. TCmdDataComplete complete;
  469. complete.Command = CMD_DATA_COMPLETE;
  470. complete.PacketGuid = qs.PacketGuid;
  471. #ifdef _DEBUG
  472. complete.DataHash = MurmurHash<ui64>(qs.MemBlock->GetData(), qs.MemBlock->GetSize());
  473. #else
  474. complete.DataHash = 0;
  475. #endif
  476. peer->PostSend(BP, &complete, sizeof(complete), TCompleteInfo::CI_DATA_SENT, qs.MsgHandle);
  477. //printf("Send CMD_DATA_COMPLETE\n");
  478. }
  479. break;
  480. case TCompleteInfo::CI_DATA_SENT:
  481. //printf("Completed data_sent\n");
  482. {
  483. TDeque<TQueuedSend>::iterator z = peer->GetSend(cc.MsgHandle);
  484. TIBMsgHandle msgHandle = z->MsgHandle;
  485. peer->SendQueue.erase(z);
  486. SendCompleted(peer, msgHandle);
  487. }
  488. break;
  489. case TCompleteInfo::CI_KEEP_ALIVE:
  490. break;
  491. case TCompleteInfo::CI_IGNORE:
  492. //printf("Completed ignored\n");
  493. break;
  494. default:
  495. Y_ASSERT(0);
  496. break;
  497. }
  498. peer->OutMsgs.pop_front();
  499. BP.FreeBuf(wc->wr_id);
  500. }
  501. } else {
  502. Y_ABORT_UNLESS(0, "got completion without outstanding messages");
  503. }
  504. } else {
  505. //printf("Got completion for non existing qpn %d, bufId %d (status %d)\n", wc->qp_num, (int)wc->wr_id, (int)wc->status);
  506. if (wc->status == IBV_WC_SUCCESS) {
  507. Y_ABORT_UNLESS(0, "only errors should go unmatched");
  508. }
  509. // no need to free buf since it has to be freed in PeerFailed()
  510. }
  511. }
  512. void ParseWelcomePacket(ibv_wc* wc) {
  513. TIBRecvPacketProcess pkt(BP, *wc);
  514. char cmdId = *(const char*)pkt.GetUDData();
  515. switch (cmdId) {
  516. case CMD_HANDSHAKE: {
  517. //printf("got handshake\n");
  518. const TCmdHandshake& handshake = *(TCmdHandshake*)pkt.GetUDData();
  519. if (handshake.SocketId != ConnectInfo.SocketId) {
  520. // connection attempt from wrong IB subnet
  521. break;
  522. }
  523. TIntrusivePtr<TRCQueuePair> rcQP;
  524. rcQP = new TRCQueuePair(Port->GetCtx(), CQ, BP.GetSRQ(), QP_SEND_QUEUE_SIZE);
  525. int qpn = rcQP->GetQPN();
  526. Y_ASSERT(Channels.find(qpn) == Channels.end());
  527. TIntrusivePtr<TIBPeer>& peer = Channels[qpn];
  528. peer = new TIBPeer(handshake.MyAddress, rcQP);
  529. ibv_ah_attr peerAddr;
  530. TIntrusivePtr<TAddressHandle> ahPeer;
  531. Port->GetAHAttr(wc, pkt.GetGRH(), &peerAddr);
  532. ahPeer = new TAddressHandle(Port->GetCtx(), &peerAddr);
  533. peerAddr.sl = TRAFFIC_SL;
  534. rcQP->Init(peerAddr, handshake.QPN, handshake.PSN);
  535. TCmdHandshakeAck handshakeAck;
  536. handshakeAck.Command = CMD_HANDSHAKE_ACK;
  537. handshakeAck.PSN = rcQP->GetPSN();
  538. handshakeAck.QPN = rcQP->GetQPN();
  539. handshakeAck.YourQPN = handshake.QPN;
  540. // if ack gets lost we'll create new Peer Channel
  541. // and this one will be erased in Step() by timeout counted from LastRecv
  542. BP.PostSend(WelcomeQP, ahPeer, wc->src_qp, WELCOME_QKEY, &handshakeAck, sizeof(handshakeAck));
  543. //printf("send handshake_ack\n");
  544. } break;
  545. case CMD_HANDSHAKE_ACK: {
  546. //printf("got handshake_ack\n");
  547. const TCmdHandshakeAck& handshakeAck = *(TCmdHandshakeAck*)pkt.GetUDData();
  548. TIBPeer* peer = GetChannelByQPN(handshakeAck.YourQPN);
  549. if (peer) {
  550. ibv_ah_attr peerAddr;
  551. Port->GetAHAttr(wc, pkt.GetGRH(), &peerAddr);
  552. peerAddr.sl = TRAFFIC_SL;
  553. peer->QP->Init(peerAddr, handshakeAck.QPN, handshakeAck.PSN);
  554. peer->State = IIBPeer::OK;
  555. TCmdConfirm confirm;
  556. confirm.Command = CMD_CONFIRM;
  557. peer->PostSend(BP, &confirm, sizeof(confirm), TCompleteInfo::CI_IGNORE, 0);
  558. //printf("send confirm\n");
  559. } else {
  560. // respective QPN was deleted or never existed
  561. // silently ignore and peer channel on remote side
  562. // will not get into confirmed state and will be deleted
  563. }
  564. } break;
  565. default:
  566. Y_ASSERT(0);
  567. break;
  568. }
  569. }
  570. bool Step(NHPTimer::STime tCurrent) override {
  571. bool rv = false;
  572. // only have to process completions, everything is done on completion of something
  573. ibv_wc wcArr[10];
  574. for (;;) {
  575. int wcCount = CQ->Poll(wcArr, Y_ARRAY_SIZE(wcArr));
  576. if (wcCount == 0) {
  577. break;
  578. }
  579. rv = true;
  580. for (int z = 0; z < wcCount; ++z) {
  581. ibv_wc& wc = wcArr[z];
  582. if (wc.opcode & IBV_WC_RECV) {
  583. // received msg
  584. if ((int)wc.qp_num == WelcomeQPN) {
  585. if (wc.status != IBV_WC_SUCCESS) {
  586. Y_ABORT_UNLESS(0, "ud recv op completed with error %d\n", (int)wc.status);
  587. }
  588. Y_ASSERT(wc.opcode == IBV_WC_RECV | IBV_WC_SEND);
  589. ParseWelcomePacket(&wc);
  590. } else {
  591. ParsePacket(&wc, tCurrent);
  592. }
  593. } else {
  594. // send completion
  595. if ((int)wc.qp_num == WelcomeQPN) {
  596. // ok
  597. BP.FreeBuf(wc.wr_id);
  598. } else {
  599. OnComplete(&wc, tCurrent);
  600. }
  601. }
  602. }
  603. }
  604. {
  605. TIntrusivePtr<TIBMemBlock> memBlock;
  606. i64 msgHandle;
  607. TIntrusivePtr<TIBPeer> peer;
  608. while (CopyResults->GetCopyResult(&memBlock, &msgHandle, &peer)) {
  609. if (peer->GetState() != IIBPeer::OK) {
  610. continue;
  611. }
  612. TDeque<TQueuedSend>::iterator z = peer->GetSend(msgHandle);
  613. if (z == peer->SendQueue.end()) {
  614. Y_ABORT_UNLESS(0, "peer %p, copy completed, msg %d not found?\n", peer.Get(), (int)msgHandle);
  615. continue;
  616. }
  617. TQueuedSend& qs = *z;
  618. qs.MemBlock = memBlock;
  619. if (qs.RemoteAddr != 0) {
  620. peer->PostRDMA(qs);
  621. }
  622. rv = true;
  623. }
  624. }
  625. {
  626. NHPTimer::STime t1 = LastCheckTime;
  627. if (NHPTimer::GetTimePassed(&t1) > CHANNEL_CHECK_INTERVAL) {
  628. for (TPeerChannelHash::iterator z = Channels.begin(); z != Channels.end();) {
  629. TIntrusivePtr<TIBPeer> peer = z->second;
  630. ++z; // peer can be removed from Channels
  631. Y_ASSERT(peer->State != IIBPeer::FAILED);
  632. NHPTimer::STime t2 = peer->LastRecv;
  633. double timeSinceLastRecv = NHPTimer::GetTimePassed(&t2);
  634. if (timeSinceLastRecv > CHANNEL_CHECK_INTERVAL) {
  635. if (peer->State == IIBPeer::CONNECTING) {
  636. Y_ASSERT(peer->OutMsgs.empty() && peer->SendCount == 0);
  637. // if handshake does not seem to work out - close connection
  638. //printf("IB connecting timed out\n");
  639. PeerFailed(peer);
  640. } else {
  641. // if we have outmsg we hope that IB will report us if there are any problems
  642. // with connectivity
  643. if (peer->OutMsgs.empty()) {
  644. //printf("Sending keep alive\n");
  645. TCmdKeepAlive keep;
  646. keep.Command = CMD_KEEP_ALIVE;
  647. peer->PostSend(BP, &keep, sizeof(keep), TCompleteInfo::CI_KEEP_ALIVE, 0);
  648. }
  649. }
  650. }
  651. }
  652. LastCheckTime = t1;
  653. }
  654. }
  655. return rv;
  656. }
  657. IIBPeer* ConnectPeer(const TIBConnectInfo& info, const TUdpAddress& peerAddr, const TUdpAddress& myAddr) override {
  658. for (auto& channel : Channels) {
  659. TIntrusivePtr<TIBPeer> peer = channel.second;
  660. if (peer->PeerAddress == peerAddr) {
  661. return peer.Get();
  662. }
  663. }
  664. TIntrusivePtr<TRCQueuePair> rcQP;
  665. rcQP = new TRCQueuePair(Port->GetCtx(), CQ, BP.GetSRQ(), QP_SEND_QUEUE_SIZE);
  666. int qpn = rcQP->GetQPN();
  667. Y_ASSERT(Channels.find(qpn) == Channels.end());
  668. TIntrusivePtr<TIBPeer>& peer = Channels[qpn];
  669. peer = new TIBPeer(peerAddr, rcQP);
  670. TCmdHandshake handshake;
  671. handshake.Command = CMD_HANDSHAKE;
  672. handshake.PSN = rcQP->GetPSN();
  673. handshake.QPN = rcQP->GetQPN();
  674. handshake.SocketId = info.SocketId;
  675. handshake.MyAddress = myAddr;
  676. TIntrusivePtr<TAddressHandle> serverAH;
  677. if (info.LID != 0) {
  678. serverAH = new TAddressHandle(Port, info.LID, CONNECT_SL);
  679. } else {
  680. //ibv_gid addr;
  681. //addr.global.subnet_prefix = info.Subnet;
  682. //addr.global.interface_id = info.Interface;
  683. //serverAH = new TAddressHandle(Port, addr, CONNECT_SL);
  684. TUdpAddress local = myAddr;
  685. local.Port = 0;
  686. TUdpAddress remote = peerAddr;
  687. remote.Port = 0;
  688. //printf("local Addr %s\n", GetAddressAsString(local).c_str());
  689. //printf("remote Addr %s\n", GetAddressAsString(remote).c_str());
  690. // CRAP - somehow prevent connecting machines from different RoCE isles
  691. serverAH = new TAddressHandle(Port, remote, local, CONNECT_SL);
  692. if (!serverAH->IsValid()) {
  693. return nullptr;
  694. }
  695. }
  696. BP.PostSend(WelcomeQP, serverAH, info.QPN, WELCOME_QKEY, &handshake, sizeof(handshake));
  697. //printf("send handshake\n");
  698. return peer.Get();
  699. }
  700. const TIBConnectInfo& GetConnectInfo() override {
  701. return ConnectInfo;
  702. }
  703. public:
  704. TIBClientServer(TPtrArg<TIBPort> port)
  705. : Port(port)
  706. , MemPool(GetIBMemPool())
  707. , CQ(new TComplectionQueue(port->GetCtx(), MAX_CQ_EVENTS))
  708. , BP(port->GetCtx(), MAX_SRQ_WORK_REQUESTS)
  709. , WelcomeQP(new TUDQueuePair(port, CQ, BP.GetSRQ(), WELCOME_QP_SEND_SIZE))
  710. , WelcomeQPN(WelcomeQP->GetQPN())
  711. , MsgCounter(1)
  712. {
  713. CopyResults = new TIBMemPool::TCopyResultStorage;
  714. CreateGuid(&ConnectInfo.SocketId);
  715. ibv_gid addr;
  716. port->GetGID(&addr);
  717. ConnectInfo.Interface = addr.global.interface_id;
  718. ConnectInfo.Subnet = addr.global.subnet_prefix;
  719. //printf("connect addr subnet %lx, iface %lx\n", addr.global.subnet_prefix, addr.global.interface_id);
  720. ConnectInfo.LID = port->GetLID();
  721. ConnectInfo.QPN = WelcomeQPN;
  722. WelcomeQP->Init(WELCOME_QKEY);
  723. NHPTimer::GetTime(&LastCheckTime);
  724. }
  725. };
  726. IIBClientServer* CreateIBClientServer() {
  727. TIntrusivePtr<TIBPort> port = GetIBDevice();
  728. if (port.Get() == nullptr) {
  729. return nullptr;
  730. }
  731. return new TIBClientServer(port);
  732. }
  733. }