ib_test.cpp 8.2 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231
  1. #include "stdafx.h"
  2. #include "ib_test.h"
  3. #include "ib_buffers.h"
  4. #include "udp_socket.h"
  5. #include "udp_address.h"
  6. #include <util/system/hp_timer.h>
  7. namespace NNetliba {
  8. struct TWelcomeSocketAddr {
  9. int LID;
  10. int QPN;
  11. };
  12. struct TRCQueuePairHandshake {
  13. int QPN, PSN;
  14. };
  15. class TIPSocket {
  16. TNetSocket s;
  17. static constexpr int HDR_SIZE = UDP_LOW_LEVEL_HEADER_SIZE;
  18. public:
  19. void Init(int port) {
  20. if (!InitLocalIPList()) {
  21. Y_ASSERT(0 && "Can not determine self IP address");
  22. return;
  23. }
  24. s.Open(port);
  25. }
  26. bool IsValid() {
  27. return s.IsValid();
  28. }
  29. void Respond(const TWelcomeSocketAddr& info) {
  30. char buf[10000];
  31. int sz = sizeof(buf);
  32. sockaddr_in6 fromAddress;
  33. bool rv = s.RecvFrom(buf, &sz, &fromAddress);
  34. if (rv && strcmp(buf + HDR_SIZE, "Hello_IB") == 0) {
  35. printf("send welcome info\n");
  36. memcpy(buf + HDR_SIZE, &info, sizeof(info));
  37. TNetSocket::ESendError err = s.SendTo(buf, sizeof(info) + HDR_SIZE, fromAddress, FF_ALLOW_FRAG);
  38. if (err != TNetSocket::SEND_OK) {
  39. printf("SendTo() fail %d\n", err);
  40. }
  41. }
  42. }
  43. void Request(const char* hostName, int port, TWelcomeSocketAddr* res) {
  44. TUdpAddress addr = CreateAddress(hostName, port);
  45. printf("addr = %s\n", GetAddressAsString(addr).c_str());
  46. sockaddr_in6 sockAddr;
  47. GetWinsockAddr(&sockAddr, addr);
  48. for (;;) {
  49. char buf[10000];
  50. int sz = sizeof(buf);
  51. sockaddr_in6 fromAddress;
  52. bool rv = s.RecvFrom(buf, &sz, &fromAddress);
  53. if (rv) {
  54. if (sz == sizeof(TWelcomeSocketAddr) + HDR_SIZE) {
  55. *res = *(TWelcomeSocketAddr*)(buf + HDR_SIZE);
  56. break;
  57. }
  58. printf("Get unexpected %d bytes from somewhere?\n", sz);
  59. }
  60. strcpy(buf + HDR_SIZE, "Hello_IB");
  61. TNetSocket::ESendError err = s.SendTo(buf, strlen(buf) + 1 + HDR_SIZE, sockAddr, FF_ALLOW_FRAG);
  62. if (err != TNetSocket::SEND_OK) {
  63. printf("SendTo() fail %d\n", err);
  64. }
  65. Sleep(TDuration::MilliSeconds(100));
  66. }
  67. }
  68. };
  69. // can hang if opposite side exits, but it's only basic test
  70. static void WaitForRecv(TIBBufferPool* bp, TPtrArg<TComplectionQueue> cq, ibv_wc* wc) {
  71. for (;;) {
  72. if (cq->Poll(wc, 1) == 1) {
  73. if (wc->opcode & IBV_WC_RECV) {
  74. break;
  75. }
  76. bp->FreeBuf(wc->wr_id);
  77. }
  78. }
  79. }
  80. void RunIBTest(bool isClient, const char* serverName) {
  81. TIntrusivePtr<TIBPort> port = GetIBDevice();
  82. if (port.Get() == nullptr) {
  83. printf("No IB device found\n");
  84. return;
  85. }
  86. const int IP_PORT = 13666;
  87. const int WELCOME_QKEY = 0x1113013;
  88. const int MAX_SRQ_WORK_REQUESTS = 100;
  89. const int MAX_CQ_EVENTS = 1000;
  90. const int QP_SEND_QUEUE_SIZE = 3;
  91. TIntrusivePtr<TComplectionQueue> cq = new TComplectionQueue(port->GetCtx(), MAX_CQ_EVENTS);
  92. TIBBufferPool bp(port->GetCtx(), MAX_SRQ_WORK_REQUESTS);
  93. if (!isClient) {
  94. // server
  95. TIPSocket ipSocket;
  96. ipSocket.Init(IP_PORT);
  97. if (!ipSocket.IsValid()) {
  98. printf("UDP port %d is not available\n", IP_PORT);
  99. return;
  100. }
  101. TIntrusivePtr<TComplectionQueue> cqRC = new TComplectionQueue(port->GetCtx(), MAX_CQ_EVENTS);
  102. TIntrusivePtr<TUDQueuePair> welcomeQP = new TUDQueuePair(port, cq, bp.GetSRQ(), QP_SEND_QUEUE_SIZE);
  103. welcomeQP->Init(WELCOME_QKEY);
  104. TWelcomeSocketAddr info;
  105. info.LID = port->GetLID();
  106. info.QPN = welcomeQP->GetQPN();
  107. TIntrusivePtr<TAddressHandle> ahPeer1;
  108. for (;;) {
  109. ipSocket.Respond(info);
  110. // poll srq
  111. ibv_wc wc;
  112. if (cq->Poll(&wc, 1) == 1 && (wc.opcode & IBV_WC_RECV)) {
  113. printf("Got IB handshake\n");
  114. TRCQueuePairHandshake remoteHandshake;
  115. ibv_ah_attr clientAddr;
  116. {
  117. TIBRecvPacketProcess pkt(bp, wc);
  118. remoteHandshake = *(TRCQueuePairHandshake*)pkt.GetUDData();
  119. port->GetAHAttr(&wc, pkt.GetGRH(), &clientAddr);
  120. }
  121. TIntrusivePtr<TAddressHandle> ahPeer2;
  122. ahPeer2 = new TAddressHandle(port->GetCtx(), &clientAddr);
  123. TIntrusivePtr<TRCQueuePair> rcTest = new TRCQueuePair(port->GetCtx(), cqRC, bp.GetSRQ(), QP_SEND_QUEUE_SIZE);
  124. rcTest->Init(clientAddr, remoteHandshake.QPN, remoteHandshake.PSN);
  125. TRCQueuePairHandshake handshake;
  126. handshake.PSN = rcTest->GetPSN();
  127. handshake.QPN = rcTest->GetQPN();
  128. bp.PostSend(welcomeQP, ahPeer2, wc.src_qp, WELCOME_QKEY, &handshake, sizeof(handshake));
  129. WaitForRecv(&bp, cqRC, &wc);
  130. {
  131. TIBRecvPacketProcess pkt(bp, wc);
  132. printf("Got RC ping: %s\n", pkt.GetData());
  133. const char* ret = "Puk";
  134. bp.PostSend(rcTest, ret, strlen(ret) + 1);
  135. }
  136. for (int i = 0; i < 5; ++i) {
  137. WaitForRecv(&bp, cqRC, &wc);
  138. TIBRecvPacketProcess pkt(bp, wc);
  139. printf("Got RC ping: %s\n", pkt.GetData());
  140. const char* ret = "Fine";
  141. bp.PostSend(rcTest, ret, strlen(ret) + 1);
  142. }
  143. }
  144. }
  145. } else {
  146. // client
  147. ibv_wc wc;
  148. TIPSocket ipSocket;
  149. ipSocket.Init(0);
  150. if (!ipSocket.IsValid()) {
  151. printf("Failed to create UDP socket\n");
  152. return;
  153. }
  154. printf("Connecting to %s\n", serverName);
  155. TWelcomeSocketAddr info;
  156. ipSocket.Request(serverName, IP_PORT, &info);
  157. printf("Got welcome info, lid %d, qpn %d\n", info.LID, info.QPN);
  158. TIntrusivePtr<TUDQueuePair> welcomeQP = new TUDQueuePair(port, cq, bp.GetSRQ(), QP_SEND_QUEUE_SIZE);
  159. welcomeQP->Init(WELCOME_QKEY);
  160. TIntrusivePtr<TRCQueuePair> rcTest = new TRCQueuePair(port->GetCtx(), cq, bp.GetSRQ(), QP_SEND_QUEUE_SIZE);
  161. TRCQueuePairHandshake handshake;
  162. handshake.PSN = rcTest->GetPSN();
  163. handshake.QPN = rcTest->GetQPN();
  164. TIntrusivePtr<TAddressHandle> serverAH = new TAddressHandle(port, info.LID, 0);
  165. bp.PostSend(welcomeQP, serverAH, info.QPN, WELCOME_QKEY, &handshake, sizeof(handshake));
  166. WaitForRecv(&bp, cq, &wc);
  167. ibv_ah_attr serverAddr;
  168. TRCQueuePairHandshake remoteHandshake;
  169. {
  170. TIBRecvPacketProcess pkt(bp, wc);
  171. printf("Got handshake response\n");
  172. remoteHandshake = *(TRCQueuePairHandshake*)pkt.GetUDData();
  173. port->GetAHAttr(&wc, pkt.GetGRH(), &serverAddr);
  174. }
  175. rcTest->Init(serverAddr, remoteHandshake.QPN, remoteHandshake.PSN);
  176. char hiAndy[] = "Hi, Andy";
  177. bp.PostSend(rcTest, hiAndy, sizeof(hiAndy));
  178. WaitForRecv(&bp, cq, &wc);
  179. {
  180. TIBRecvPacketProcess pkt(bp, wc);
  181. printf("Got RC pong: %s\n", pkt.GetData());
  182. }
  183. for (int i = 0; i < 5; ++i) {
  184. char howAreYou[] = "How are you?";
  185. bp.PostSend(rcTest, howAreYou, sizeof(howAreYou));
  186. WaitForRecv(&bp, cq, &wc);
  187. {
  188. TIBRecvPacketProcess pkt(bp, wc);
  189. printf("Got RC pong: %s\n", pkt.GetData());
  190. }
  191. }
  192. }
  193. }
  194. }