ib_test.cpp 8.2 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232
  1. #include "stdafx.h"
  2. #include "ib_test.h"
  3. #include "ib_buffers.h"
  4. #include "udp_socket.h"
  5. #include "udp_address.h"
  6. #include <util/system/hp_timer.h>
  7. namespace NNetliba {
  8. struct TWelcomeSocketAddr {
  9. int LID;
  10. int QPN;
  11. };
  12. struct TRCQueuePairHandshake {
  13. int QPN, PSN;
  14. };
  15. class TIPSocket {
  16. TNetSocket s;
  17. enum {
  18. HDR_SIZE = UDP_LOW_LEVEL_HEADER_SIZE
  19. };
  20. public:
  21. void Init(int port) {
  22. if (!InitLocalIPList()) {
  23. Y_ASSERT(0 && "Can not determine self IP address");
  24. return;
  25. }
  26. s.Open(port);
  27. }
  28. bool IsValid() {
  29. return s.IsValid();
  30. }
  31. void Respond(const TWelcomeSocketAddr& info) {
  32. char buf[10000];
  33. int sz = sizeof(buf);
  34. sockaddr_in6 fromAddress;
  35. bool rv = s.RecvFrom(buf, &sz, &fromAddress);
  36. if (rv && strcmp(buf + HDR_SIZE, "Hello_IB") == 0) {
  37. printf("send welcome info\n");
  38. memcpy(buf + HDR_SIZE, &info, sizeof(info));
  39. TNetSocket::ESendError err = s.SendTo(buf, sizeof(info) + HDR_SIZE, fromAddress, FF_ALLOW_FRAG);
  40. if (err != TNetSocket::SEND_OK) {
  41. printf("SendTo() fail %d\n", err);
  42. }
  43. }
  44. }
  45. void Request(const char* hostName, int port, TWelcomeSocketAddr* res) {
  46. TUdpAddress addr = CreateAddress(hostName, port);
  47. printf("addr = %s\n", GetAddressAsString(addr).c_str());
  48. sockaddr_in6 sockAddr;
  49. GetWinsockAddr(&sockAddr, addr);
  50. for (;;) {
  51. char buf[10000];
  52. int sz = sizeof(buf);
  53. sockaddr_in6 fromAddress;
  54. bool rv = s.RecvFrom(buf, &sz, &fromAddress);
  55. if (rv) {
  56. if (sz == sizeof(TWelcomeSocketAddr) + HDR_SIZE) {
  57. *res = *(TWelcomeSocketAddr*)(buf + HDR_SIZE);
  58. break;
  59. }
  60. printf("Get unexpected %d bytes from somewhere?\n", sz);
  61. }
  62. strcpy(buf + HDR_SIZE, "Hello_IB");
  63. TNetSocket::ESendError err = s.SendTo(buf, strlen(buf) + 1 + HDR_SIZE, sockAddr, FF_ALLOW_FRAG);
  64. if (err != TNetSocket::SEND_OK) {
  65. printf("SendTo() fail %d\n", err);
  66. }
  67. Sleep(TDuration::MilliSeconds(100));
  68. }
  69. }
  70. };
  71. // can hang if opposite side exits, but it's only basic test
  72. static void WaitForRecv(TIBBufferPool* bp, TPtrArg<TComplectionQueue> cq, ibv_wc* wc) {
  73. for (;;) {
  74. if (cq->Poll(wc, 1) == 1) {
  75. if (wc->opcode & IBV_WC_RECV) {
  76. break;
  77. }
  78. bp->FreeBuf(wc->wr_id);
  79. }
  80. }
  81. }
  82. void RunIBTest(bool isClient, const char* serverName) {
  83. TIntrusivePtr<TIBPort> port = GetIBDevice();
  84. if (port.Get() == nullptr) {
  85. printf("No IB device found\n");
  86. return;
  87. }
  88. const int IP_PORT = 13666;
  89. const int WELCOME_QKEY = 0x1113013;
  90. const int MAX_SRQ_WORK_REQUESTS = 100;
  91. const int MAX_CQ_EVENTS = 1000;
  92. const int QP_SEND_QUEUE_SIZE = 3;
  93. TIntrusivePtr<TComplectionQueue> cq = new TComplectionQueue(port->GetCtx(), MAX_CQ_EVENTS);
  94. TIBBufferPool bp(port->GetCtx(), MAX_SRQ_WORK_REQUESTS);
  95. if (!isClient) {
  96. // server
  97. TIPSocket ipSocket;
  98. ipSocket.Init(IP_PORT);
  99. if (!ipSocket.IsValid()) {
  100. printf("UDP port %d is not available\n", IP_PORT);
  101. return;
  102. }
  103. TIntrusivePtr<TComplectionQueue> cqRC = new TComplectionQueue(port->GetCtx(), MAX_CQ_EVENTS);
  104. TIntrusivePtr<TUDQueuePair> welcomeQP = new TUDQueuePair(port, cq, bp.GetSRQ(), QP_SEND_QUEUE_SIZE);
  105. welcomeQP->Init(WELCOME_QKEY);
  106. TWelcomeSocketAddr info;
  107. info.LID = port->GetLID();
  108. info.QPN = welcomeQP->GetQPN();
  109. TIntrusivePtr<TAddressHandle> ahPeer1;
  110. for (;;) {
  111. ipSocket.Respond(info);
  112. // poll srq
  113. ibv_wc wc;
  114. if (cq->Poll(&wc, 1) == 1 && (wc.opcode & IBV_WC_RECV)) {
  115. printf("Got IB handshake\n");
  116. TRCQueuePairHandshake remoteHandshake;
  117. ibv_ah_attr clientAddr;
  118. {
  119. TIBRecvPacketProcess pkt(bp, wc);
  120. remoteHandshake = *(TRCQueuePairHandshake*)pkt.GetUDData();
  121. port->GetAHAttr(&wc, pkt.GetGRH(), &clientAddr);
  122. }
  123. TIntrusivePtr<TAddressHandle> ahPeer2;
  124. ahPeer2 = new TAddressHandle(port->GetCtx(), &clientAddr);
  125. TIntrusivePtr<TRCQueuePair> rcTest = new TRCQueuePair(port->GetCtx(), cqRC, bp.GetSRQ(), QP_SEND_QUEUE_SIZE);
  126. rcTest->Init(clientAddr, remoteHandshake.QPN, remoteHandshake.PSN);
  127. TRCQueuePairHandshake handshake;
  128. handshake.PSN = rcTest->GetPSN();
  129. handshake.QPN = rcTest->GetQPN();
  130. bp.PostSend(welcomeQP, ahPeer2, wc.src_qp, WELCOME_QKEY, &handshake, sizeof(handshake));
  131. WaitForRecv(&bp, cqRC, &wc);
  132. {
  133. TIBRecvPacketProcess pkt(bp, wc);
  134. printf("Got RC ping: %s\n", pkt.GetData());
  135. const char* ret = "Puk";
  136. bp.PostSend(rcTest, ret, strlen(ret) + 1);
  137. }
  138. for (int i = 0; i < 5; ++i) {
  139. WaitForRecv(&bp, cqRC, &wc);
  140. TIBRecvPacketProcess pkt(bp, wc);
  141. printf("Got RC ping: %s\n", pkt.GetData());
  142. const char* ret = "Fine";
  143. bp.PostSend(rcTest, ret, strlen(ret) + 1);
  144. }
  145. }
  146. }
  147. } else {
  148. // client
  149. ibv_wc wc;
  150. TIPSocket ipSocket;
  151. ipSocket.Init(0);
  152. if (!ipSocket.IsValid()) {
  153. printf("Failed to create UDP socket\n");
  154. return;
  155. }
  156. printf("Connecting to %s\n", serverName);
  157. TWelcomeSocketAddr info;
  158. ipSocket.Request(serverName, IP_PORT, &info);
  159. printf("Got welcome info, lid %d, qpn %d\n", info.LID, info.QPN);
  160. TIntrusivePtr<TUDQueuePair> welcomeQP = new TUDQueuePair(port, cq, bp.GetSRQ(), QP_SEND_QUEUE_SIZE);
  161. welcomeQP->Init(WELCOME_QKEY);
  162. TIntrusivePtr<TRCQueuePair> rcTest = new TRCQueuePair(port->GetCtx(), cq, bp.GetSRQ(), QP_SEND_QUEUE_SIZE);
  163. TRCQueuePairHandshake handshake;
  164. handshake.PSN = rcTest->GetPSN();
  165. handshake.QPN = rcTest->GetQPN();
  166. TIntrusivePtr<TAddressHandle> serverAH = new TAddressHandle(port, info.LID, 0);
  167. bp.PostSend(welcomeQP, serverAH, info.QPN, WELCOME_QKEY, &handshake, sizeof(handshake));
  168. WaitForRecv(&bp, cq, &wc);
  169. ibv_ah_attr serverAddr;
  170. TRCQueuePairHandshake remoteHandshake;
  171. {
  172. TIBRecvPacketProcess pkt(bp, wc);
  173. printf("Got handshake response\n");
  174. remoteHandshake = *(TRCQueuePairHandshake*)pkt.GetUDData();
  175. port->GetAHAttr(&wc, pkt.GetGRH(), &serverAddr);
  176. }
  177. rcTest->Init(serverAddr, remoteHandshake.QPN, remoteHandshake.PSN);
  178. char hiAndy[] = "Hi, Andy";
  179. bp.PostSend(rcTest, hiAndy, sizeof(hiAndy));
  180. WaitForRecv(&bp, cq, &wc);
  181. {
  182. TIBRecvPacketProcess pkt(bp, wc);
  183. printf("Got RC pong: %s\n", pkt.GetData());
  184. }
  185. for (int i = 0; i < 5; ++i) {
  186. char howAreYou[] = "How are you?";
  187. bp.PostSend(rcTest, howAreYou, sizeof(howAreYou));
  188. WaitForRecv(&bp, cq, &wc);
  189. {
  190. TIBRecvPacketProcess pkt(bp, wc);
  191. printf("Got RC pong: %s\n", pkt.GetData());
  192. }
  193. }
  194. }
  195. }
  196. }