socket.cpp 37 KB


  1. #include "stdafx.h"
  2. #include <util/datetime/cputimer.h>
  3. #include <util/draft/holder_vector.h>
  4. #include <util/generic/utility.h>
  5. #include <util/generic/vector.h>
  6. #include <util/network/init.h>
  7. #include <util/network/poller.h>
  8. #include <library/cpp/deprecated/atomic/atomic.h>
  9. #include <util/system/byteorder.h>
  10. #include <util/system/defaults.h>
  11. #include <util/system/error.h>
  12. #include <util/system/event.h>
  13. #include <util/system/thread.h>
  14. #include <util/system/yassert.h>
  15. #include <util/system/rwlock.h>
  16. #include <util/system/env.h>
  17. #include "socket.h"
  18. #include "packet_queue.h"
  19. #include "udp_recv_packet.h"
  20. #include <array>
  21. #include <stdlib.h>
  22. ///////////////////////////////////////////////////////////////////////////////
  23. #ifndef _win_
  24. #include <netinet/in.h>
  25. #endif
  26. #ifdef _linux_
  27. #include <dlfcn.h> // dlsym
  28. #endif
  29. template <class T>
  30. static T GetAddressOf(const char* name) {
  31. #ifdef _linux_
  32. if (!GetEnv("DISABLE_MMSG")) {
  33. return (T)dlsym(RTLD_DEFAULT, name);
  34. }
  35. #endif
  36. Y_UNUSED(name);
  37. return nullptr;
  38. }
  39. ///////////////////////////////////////////////////////////////////////////////
  40. namespace NNetlibaSocket {
  41. ///////////////////////////////////////////////////////////////////////////////
  42. struct timespec; // we use it only as NULL pointer
  43. typedef int (*TSendMMsgFunc)(SOCKET, TMMsgHdr*, unsigned int, unsigned int);
  44. typedef int (*TRecvMMsgFunc)(SOCKET, TMMsgHdr*, unsigned int, unsigned int, timespec*);
  45. static const TSendMMsgFunc SendMMsgFunc = GetAddressOf<TSendMMsgFunc>("sendmmsg");
  46. static const TRecvMMsgFunc RecvMMsgFunc = GetAddressOf<TRecvMMsgFunc>("recvmmsg");
  47. ///////////////////////////////////////////////////////////////////////////////
  48. bool ReadTos(const TMsgHdr& msgHdr, ui8* tos) {
  49. #ifdef _win_
  50. Y_UNUSED(msgHdr);
  51. Y_UNUSED(tos);
  52. return false;
  53. #else
  54. cmsghdr* cmsg = CMSG_FIRSTHDR(&msgHdr);
  55. if (!cmsg)
  56. return false;
  57. //Y_ASSERT(cmsg->cmsg_level == IPPROTO_IPV6);
  58. //Y_ASSERT(cmsg->cmsg_type == IPV6_TCLASS);
  59. if (cmsg->cmsg_len != CMSG_LEN(sizeof(int)))
  60. return false;
  61. *tos = *(ui8*)CMSG_DATA(cmsg);
  62. return true;
  63. #endif
  64. }
  65. bool ExtractDestinationAddress(TMsgHdr& msgHdr, sockaddr_in6* addrBuf) {
  66. Zero(*addrBuf);
  67. #ifdef _win_
  68. Y_UNUSED(msgHdr);
  69. Y_UNUSED(addrBuf);
  70. return false;
  71. #else
  72. cmsghdr* cmsg;
  73. for (cmsg = CMSG_FIRSTHDR(&msgHdr); cmsg != nullptr; cmsg = CMSG_NXTHDR(&msgHdr, cmsg)) {
  74. if ((cmsg->cmsg_level == IPPROTO_IPV6) && (cmsg->cmsg_type == IPV6_PKTINFO)) {
  75. in6_pktinfo* i = (in6_pktinfo*)CMSG_DATA(cmsg);
  76. addrBuf->sin6_addr = i->ipi6_addr;
  77. addrBuf->sin6_family = AF_INET6;
  78. return true;
  79. }
  80. }
  81. return false;
  82. #endif
  83. }
  84. // all send and recv methods are thread safe!
  85. class TAbstractSocket: public ISocket {
  86. private:
  87. SOCKET S;
  88. mutable TSocketPoller Poller;
  89. sockaddr_in6 SelfAddress;
  90. int SendSysSocketSize;
  91. int SendSysSocketSizePrev;
  92. int CreateSocket(int netPort);
  93. int DetectSelfAddress();
  94. protected:
  95. int SetSockOpt(int level, int option_name, const void* option_value, socklen_t option_len);
  96. int OpenImpl(int port);
  97. void CloseImpl();
  98. void WaitImpl(float timeoutSec) const;
  99. void CancelWaitImpl(const sockaddr_in6* address = nullptr); // NULL means "self"
  100. ssize_t RecvMsgImpl(TMsgHdr* hdr, int flags);
  101. TUdpRecvPacket* RecvImpl(TUdpHostRecvBufAlloc* buf, sockaddr_in6* srcAddr, sockaddr_in6* dstAddr);
  102. int RecvMMsgImpl(TMMsgHdr* msgvec, unsigned int vlen, unsigned int flags, timespec* timeout);
  103. bool IsFragmentationForbiden();
  104. void ForbidFragmentation();
  105. void EnableFragmentation();
  106. //Shared state for setsockopt. Forbid simultaneous transfer while sender asking for specific options (i.e. DONOT_FRAG)
  107. TRWMutex Mutex;
  108. TAtomic RecvLag = 0;
  109. public:
  110. TAbstractSocket();
  111. ~TAbstractSocket() override;
  112. #ifdef _unix_
  113. void Reset(const TAbstractSocket& rhv);
  114. #endif
  115. bool IsValid() const override;
  116. const sockaddr_in6& GetSelfAddress() const override;
  117. int GetNetworkOrderPort() const override;
  118. int GetPort() const override;
  119. int GetSockOpt(int level, int option_name, void* option_value, socklen_t* option_len) override;
  120. // send all packets to this and only this address by default
  121. int Connect(const struct sockaddr* address, socklen_t address_len) override;
  122. void CancelWaitHost(const sockaddr_in6 addr) override;
  123. bool IsSendMMsgSupported() const override;
  124. int SendMMsg(TMMsgHdr* msgvec, unsigned int vlen, unsigned int flags) override;
  125. ssize_t SendMsg(const TMsgHdr* hdr, int flags, const EFragFlag frag) override;
  126. bool IncreaseSendBuff() override;
  127. int GetSendSysSocketSize() override;
  128. void SetRecvLagTime(NHPTimer::STime time) override;
  129. };
  130. TAbstractSocket::TAbstractSocket()
  131. : S(INVALID_SOCKET)
  132. , SendSysSocketSize(0)
  133. , SendSysSocketSizePrev(0)
  134. {
  135. Zero(SelfAddress);
  136. }
  137. TAbstractSocket::~TAbstractSocket() {
  138. CloseImpl();
  139. }
  140. #ifdef _unix_
  141. void TAbstractSocket::Reset(const TAbstractSocket& rhv) {
  142. Close();
  143. S = dup(rhv.S);
  144. SelfAddress = rhv.SelfAddress;
  145. }
  146. #endif
  147. int TAbstractSocket::CreateSocket(int netPort) {
  148. if (IsValid()) {
  149. Y_ASSERT(0);
  150. return 0;
  151. }
  152. S = socket(AF_INET6, SOCK_DGRAM, IPPROTO_UDP);
  153. if (S == INVALID_SOCKET) {
  154. return -1;
  155. }
  156. {
  157. int flag = 0;
  158. Y_ABORT_UNLESS(SetSockOpt(IPPROTO_IPV6, IPV6_V6ONLY, (const char*)&flag, sizeof(flag)) == 0, "IPV6_V6ONLY failed");
  159. }
  160. {
  161. int flag = 1;
  162. Y_ABORT_UNLESS(SetSockOpt(SOL_SOCKET, SO_REUSEADDR, (const char*)&flag, sizeof(flag)) == 0, "SO_REUSEADDR failed");
  163. }
  164. #if defined(_win_)
  165. unsigned long dummy = 1;
  166. ioctlsocket(S, FIONBIO, &dummy);
  167. #else
  168. Y_ABORT_UNLESS(fcntl(S, F_SETFL, O_NONBLOCK) == 0, "fnctl failed: %s (errno = %d)", LastSystemErrorText(), LastSystemError());
  169. Y_ABORT_UNLESS(fcntl(S, F_SETFD, FD_CLOEXEC) == 0, "fnctl failed: %s (errno = %d)", LastSystemErrorText(), LastSystemError());
  170. {
  171. int flag = 1;
  172. #ifndef IPV6_RECVPKTINFO /* Darwin platforms require this */
  173. Y_ABORT_UNLESS(SetSockOpt(IPPROTO_IPV6, IPV6_PKTINFO, (const char*)&flag, sizeof(flag)) == 0, "IPV6_PKTINFO failed");
  174. #else
  175. Y_ABORT_UNLESS(SetSockOpt(IPPROTO_IPV6, IPV6_RECVPKTINFO, (const char*)&flag, sizeof(flag)) == 0, "IPV6_RECVPKTINFO failed");
  176. #endif
  177. }
  178. #endif
  179. Poller.WaitRead(S, nullptr);
  180. {
  181. // bind socket
  182. sockaddr_in6 name;
  183. Zero(name);
  184. name.sin6_family = AF_INET6;
  185. name.sin6_addr = in6addr_any;
  186. name.sin6_port = netPort;
  187. if (bind(S, (sockaddr*)&name, sizeof(name)) != 0) {
  188. fprintf(stderr, "netliba_socket could not bind to port %d: %s (errno = %d)\n", InetToHost((ui16)netPort), LastSystemErrorText(), LastSystemError());
  189. CloseImpl(); // we call this CloseImpl after Poller initialization
  190. return -1;
  191. }
  192. }
  193. //Default behavior is allowing fragmentation (according to netliba v6 behavior)
  194. //If we want to sent packet with DF flag we have to use SendMsg()
  195. EnableFragmentation();
  196. {
  197. socklen_t sz = sizeof(SendSysSocketSize);
  198. if (GetSockOpt(SOL_SOCKET, SO_SNDBUF, &SendSysSocketSize, &sz)) {
  199. fprintf(stderr, "Can`t get SO_SNDBUF");
  200. }
  201. }
  202. return 0;
  203. }
  204. bool TAbstractSocket::IsValid() const {
  205. return S != INVALID_SOCKET;
  206. }
  207. int TAbstractSocket::DetectSelfAddress() {
  208. socklen_t nameLen = sizeof(SelfAddress);
  209. if (getsockname(S, (sockaddr*)&SelfAddress, &nameLen) != 0) { // actually we use only sin6_port
  210. return -1;
  211. }
  212. Y_ASSERT(SelfAddress.sin6_family == AF_INET6);
  213. SelfAddress.sin6_addr = in6addr_loopback;
  214. return 0;
  215. }
  216. const sockaddr_in6& TAbstractSocket::GetSelfAddress() const {
  217. return SelfAddress;
  218. }
  219. int TAbstractSocket::GetNetworkOrderPort() const {
  220. return SelfAddress.sin6_port;
  221. }
  222. int TAbstractSocket::GetPort() const {
  223. return InetToHost((ui16)SelfAddress.sin6_port);
  224. }
  225. int TAbstractSocket::SetSockOpt(int level, int option_name, const void* option_value, socklen_t option_len) {
  226. const int rv = setsockopt(S, level, option_name, (const char*)option_value, option_len);
  227. Y_DEBUG_ABORT_UNLESS(rv == 0, "SetSockOpt failed: %s (errno = %d)", LastSystemErrorText(), LastSystemError());
  228. return rv;
  229. }
  230. int TAbstractSocket::GetSockOpt(int level, int option_name, void* option_value, socklen_t* option_len) {
  231. const int rv = getsockopt(S, level, option_name, (char*)option_value, option_len);
  232. Y_DEBUG_ABORT_UNLESS(rv == 0, "GetSockOpt failed: %s (errno = %d)", LastSystemErrorText(), LastSystemError());
  233. return rv;
  234. }
  235. bool TAbstractSocket::IsFragmentationForbiden() {
  236. #if defined(_win_)
  237. DWORD flag = 0;
  238. socklen_t sz = sizeof(flag);
  239. Y_ABORT_UNLESS(GetSockOpt(IPPROTO_IP, IP_DONTFRAGMENT, (char*)&flag, &sz) == 0, "");
  240. return flag;
  241. #elif defined(_linux_)
  242. int flag = 0;
  243. socklen_t sz = sizeof(flag);
  244. Y_ABORT_UNLESS(GetSockOpt(IPPROTO_IPV6, IPV6_MTU_DISCOVER, (char*)&flag, &sz) == 0, "");
  245. return flag == IPV6_PMTUDISC_DO;
  246. #elif !defined(_darwin_)
  247. int flag = 0;
  248. socklen_t sz = sizeof(flag);
  249. Y_ABORT_UNLESS(GetSockOpt(IPPROTO_IPV6, IPV6_DONTFRAG, (char*)&flag, &sz) == 0, "");
  250. return flag;
  251. #endif
  252. return false;
  253. }
  254. void TAbstractSocket::ForbidFragmentation() {
  255. // do not fragment ping packets
  256. #if defined(_win_)
  257. DWORD flag = 1;
  258. SetSockOpt(IPPROTO_IP, IP_DONTFRAGMENT, (const char*)&flag, sizeof(flag));
  259. #elif defined(_linux_)
  260. int flag = IP_PMTUDISC_DO;
  261. SetSockOpt(IPPROTO_IP, IP_MTU_DISCOVER, (const char*)&flag, sizeof(flag));
  262. flag = IPV6_PMTUDISC_DO;
  263. SetSockOpt(IPPROTO_IPV6, IPV6_MTU_DISCOVER, (const char*)&flag, sizeof(flag));
  264. #elif !defined(_darwin_)
  265. int flag = 1;
  266. //SetSockOpt(IPPROTO_IP, IP_DONTFRAG, (const char*)&flag, sizeof(flag));
  267. SetSockOpt(IPPROTO_IPV6, IPV6_DONTFRAG, (const char*)&flag, sizeof(flag));
  268. #endif
  269. }
  270. void TAbstractSocket::EnableFragmentation() {
  271. #if defined(_win_)
  272. DWORD flag = 0;
  273. SetSockOpt(IPPROTO_IP, IP_DONTFRAGMENT, (const char*)&flag, sizeof(flag));
  274. #elif defined(_linux_)
  275. int flag = IP_PMTUDISC_WANT;
  276. SetSockOpt(IPPROTO_IP, IP_MTU_DISCOVER, (const char*)&flag, sizeof(flag));
  277. flag = IPV6_PMTUDISC_WANT;
  278. SetSockOpt(IPPROTO_IPV6, IPV6_MTU_DISCOVER, (const char*)&flag, sizeof(flag));
  279. #elif !defined(_darwin_)
  280. int flag = 0;
  281. //SetSockOpt(IPPROTO_IP, IP_DONTFRAG, (const char*)&flag, sizeof(flag));
  282. SetSockOpt(IPPROTO_IPV6, IPV6_DONTFRAG, (const char*)&flag, sizeof(flag));
  283. #endif
  284. }
  285. int TAbstractSocket::Connect(const sockaddr* address, socklen_t address_len) {
  286. Y_ASSERT(IsValid());
  287. return connect(S, address, address_len);
  288. }
  289. void TAbstractSocket::CancelWaitHost(const sockaddr_in6 addr) {
  290. CancelWaitImpl(&addr);
  291. }
  292. bool TAbstractSocket::IsSendMMsgSupported() const {
  293. return SendMMsgFunc != nullptr;
  294. }
  295. int TAbstractSocket::SendMMsg(TMMsgHdr* msgvec, unsigned int vlen, unsigned int flags) {
  296. Y_ASSERT(IsValid());
  297. Y_ABORT_UNLESS(SendMMsgFunc, "sendmmsg is not supported!");
  298. TReadGuard rg(Mutex);
  299. static bool checked = 0;
  300. Y_ABORT_UNLESS(checked || (checked = !IsFragmentationForbiden()), "Send methods of this class expect default EnableFragmentation behavior");
  301. return SendMMsgFunc(S, msgvec, vlen, flags);
  302. }
  303. ssize_t TAbstractSocket::SendMsg(const TMsgHdr* hdr, int flags, const EFragFlag frag) {
  304. Y_ASSERT(IsValid());
  305. #ifdef _win32_
  306. static bool checked = 0;
  307. Y_ABORT_UNLESS(hdr->msg_iov->iov_len == 1, "Scatter/gather is currenly not supported on Windows");
  308. if (hdr->Tos || frag == FF_DONT_FRAG) {
  309. TWriteGuard wg(Mutex);
  310. if (frag == FF_DONT_FRAG) {
  311. ForbidFragmentation();
  312. } else {
  313. Y_ABORT_UNLESS(checked || (checked = !IsFragmentationForbiden()), "Send methods of this class expect default EnableFragmentation behavior");
  314. }
  315. int originalTos;
  316. if (hdr->Tos) {
  317. socklen_t sz = sizeof(originalTos);
  318. Y_ABORT_UNLESS(GetSockOpt(IPPROTO_IP, IP_TOS, (char*)&originalTos, &sz) == 0, "");
  319. Y_ABORT_UNLESS(SetSockOpt(IPPROTO_IP, IP_TOS, (char*)&hdr->Tos, sizeof(hdr->Tos)) == 0, "");
  320. }
  321. const ssize_t rv = sendto(S, hdr->msg_iov->iov_base, hdr->msg_iov->iov_len, flags, (sockaddr*)hdr->msg_name, hdr->msg_namelen);
  322. if (hdr->Tos) {
  323. Y_ABORT_UNLESS(SetSockOpt(IPPROTO_IP, IP_TOS, (char*)&originalTos, sizeof(originalTos)) == 0, "");
  324. }
  325. if (frag == FF_DONT_FRAG) {
  326. EnableFragmentation();
  327. }
  328. return rv;
  329. }
  330. TReadGuard rg(Mutex);
  331. Y_ABORT_UNLESS(checked || (checked = !IsFragmentationForbiden()), "Send methods of this class expect default EnableFragmentation behavior");
  332. return sendto(S, hdr->msg_iov->iov_base, hdr->msg_iov->iov_len, flags, (sockaddr*)hdr->msg_name, hdr->msg_namelen);
  333. #else
  334. if (frag == FF_DONT_FRAG) {
  335. TWriteGuard wg(Mutex);
  336. ForbidFragmentation();
  337. const ssize_t rv = sendmsg(S, hdr, flags);
  338. EnableFragmentation();
  339. return rv;
  340. }
  341. TReadGuard rg(Mutex);
  342. #ifndef _darwin_
  343. static bool checked = 0;
  344. Y_ABORT_UNLESS(checked || (checked = !IsFragmentationForbiden()), "Send methods of this class expect default EnableFragmentation behavior");
  345. #endif
  346. return sendmsg(S, hdr, flags);
  347. #endif
  348. }
  349. bool TAbstractSocket::IncreaseSendBuff() {
  350. int buffSize;
  351. socklen_t sz = sizeof(buffSize);
  352. if (GetSockOpt(SOL_SOCKET, SO_SNDBUF, &buffSize, &sz)) {
  353. return false;
  354. }
  355. // worst case: 200000 pps * 8k * 0.01sec = 16Mb so 32Mb hard limit is reasonable value
  356. if (buffSize < 0 || buffSize > (1 << 25)) {
  357. fprintf(stderr, "GetSockOpt returns wrong or too big value for SO_SNDBUF: %d\n", buffSize);
  358. return false;
  359. }
  360. //linux returns the doubled value. man 7 socket:
  361. //
  362. // SO_SNDBUF
  363. // Sets or gets the maximum socket send buffer in bytes. The ker-
  364. // nel doubles this value (to allow space for bookkeeping overhead)
  365. // when it is set using setsockopt(), and this doubled value is
  366. // returned by getsockopt(). The default value is set by the
  367. // wmem_default sysctl and the maximum allowed value is set by the
  368. // wmem_max sysctl. The minimum (doubled) value for this option is
  369. // 2048.
  370. //
  371. #ifndef _linux_
  372. buffSize += buffSize;
  373. #endif
  374. // false if previous value was less than current value.
  375. // It means setsockopt was not successful. (for example: system limits)
  376. // we will try to set it again but return false
  377. const bool rv = !(buffSize <= SendSysSocketSizePrev);
  378. if (SetSockOpt(SOL_SOCKET, SO_SNDBUF, &buffSize, sz) == 0) {
  379. SendSysSocketSize = buffSize;
  380. SendSysSocketSizePrev = buffSize;
  381. return rv;
  382. }
  383. return false;
  384. }
  385. int TAbstractSocket::GetSendSysSocketSize() {
  386. return SendSysSocketSize;
  387. }
  388. void TAbstractSocket::SetRecvLagTime(NHPTimer::STime time) {
  389. AtomicSet(RecvLag, time);
  390. }
  391. int TAbstractSocket::OpenImpl(int port) {
  392. Y_ASSERT(!IsValid());
  393. const int netPort = port ? htons((u_short)port) : 0;
  394. #ifdef _freebsd_
  395. // alternative OS
  396. if (netPort == 0) {
  397. static ui64 pp = GetCycleCount();
  398. for (int attempt = 0; attempt < 100; ++attempt) {
  399. const int tryPort = htons((pp & 0x3fff) + 0xc000);
  400. ++pp;
  401. if (CreateSocket(tryPort) != 0) {
  402. Y_ASSERT(!IsValid());
  403. continue;
  404. }
  405. if (DetectSelfAddress() != 0 || tryPort != SelfAddress.sin6_port) {
  406. // FreeBSD suck!
  407. CloseImpl();
  408. Y_ASSERT(!IsValid());
  409. continue;
  410. }
  411. break;
  412. }
  413. if (!IsValid()) {
  414. return -1;
  415. }
  416. } else {
  417. if (CreateSocket(netPort) != 0) {
  418. Y_ASSERT(!IsValid());
  419. return -1;
  420. }
  421. }
  422. #else
  423. // regular OS
  424. if (CreateSocket(netPort) != 0) {
  425. Y_ASSERT(!IsValid());
  426. return -1;
  427. }
  428. #endif
  429. if (IsValid() && DetectSelfAddress() != 0) {
  430. CloseImpl();
  431. Y_ASSERT(!IsValid());
  432. return -1;
  433. }
  434. Y_ASSERT(IsValid());
  435. return 0;
  436. }
  437. void TAbstractSocket::CloseImpl() {
  438. if (IsValid()) {
  439. Poller.Unwait(S);
  440. Y_ABORT_UNLESS(closesocket(S) == 0, "closesocket failed: %s (errno = %d)", LastSystemErrorText(), LastSystemError());
  441. }
  442. S = INVALID_SOCKET;
  443. }
  444. void TAbstractSocket::WaitImpl(float timeoutSec) const {
  445. Y_ABORT_UNLESS(IsValid(), "something went wrong");
  446. Poller.WaitT(TDuration::Seconds(timeoutSec));
  447. }
  448. void TAbstractSocket::CancelWaitImpl(const sockaddr_in6* address) {
  449. Y_ASSERT(IsValid());
  450. // darwin ignores packets with msg_iovlen == 0, also windows implementation uses sendto of first iovec.
  451. TIoVec v = CreateIoVec(nullptr, 0);
  452. TMsgHdr hdr = CreateSendMsgHdr((address ? *address : SelfAddress), v, nullptr);
  453. // send self fake packet
  454. TAbstractSocket::SendMsg(&hdr, 0, FF_ALLOW_FRAG);
  455. }
  456. ssize_t TAbstractSocket::RecvMsgImpl(TMsgHdr* hdr, int flags) {
  457. Y_ASSERT(IsValid());
  458. #ifdef _win32_
  459. Y_ABORT_UNLESS(hdr->msg_iov->iov_len == 1, "Scatter/gather is currenly not supported on Windows");
  460. return recvfrom(S, hdr->msg_iov->iov_base, hdr->msg_iov->iov_len, flags, (sockaddr*)hdr->msg_name, &hdr->msg_namelen);
  461. #else
  462. return recvmsg(S, hdr, flags);
  463. #endif
  464. }
  465. TUdpRecvPacket* TAbstractSocket::RecvImpl(TUdpHostRecvBufAlloc* buf, sockaddr_in6* srcAddr, sockaddr_in6* dstAddr) {
  466. Y_ASSERT(IsValid());
  467. const TIoVec iov = CreateIoVec(buf->GetDataPtr(), buf->GetBufSize());
  468. char controllBuffer[CTRL_BUFFER_SIZE]; //used to get dst address from socket
  469. TMsgHdr hdr = CreateRecvMsgHdr(srcAddr, iov, controllBuffer);
  470. const ssize_t rv = TAbstractSocket::RecvMsgImpl(&hdr, 0);
  471. if (rv < 0) {
  472. Y_ASSERT(LastSystemError() == EAGAIN || LastSystemError() == EWOULDBLOCK);
  473. return nullptr;
  474. }
  475. if (dstAddr && !ExtractDestinationAddress(hdr, dstAddr)) {
  476. //fprintf(stderr, "can`t get destination ip\n");
  477. }
  478. // we extract packet and allocate new buffer only if packet arrived
  479. TUdpRecvPacket* result = buf->ExtractPacket();
  480. result->DataStart = 0;
  481. result->DataSize = (int)rv;
  482. return result;
  483. }
  484. // thread-safe
  485. int TAbstractSocket::RecvMMsgImpl(TMMsgHdr* msgvec, unsigned int vlen, unsigned int flags, timespec* timeout) {
  486. Y_ASSERT(IsValid());
  487. Y_ABORT_UNLESS(RecvMMsgFunc, "recvmmsg is not supported!");
  488. return RecvMMsgFunc(S, msgvec, vlen, flags, timeout);
  489. }
  490. ///////////////////////////////////////////////////////////////////////////////
  491. class TSocket: public TAbstractSocket {
  492. public:
  493. int Open(int port) override;
  494. void Close() override;
  495. void Wait(float timeoutSec, int netlibaVersion) const override;
  496. void CancelWait(int netlibaVersion) override;
  497. bool IsRecvMsgSupported() const override;
  498. ssize_t RecvMsg(TMsgHdr* hdr, int flags) override;
  499. TUdpRecvPacket* Recv(sockaddr_in6* srcAddr, sockaddr_in6* dstAddr, int netlibaVersion) override;
  500. private:
  501. TUdpHostRecvBufAlloc RecvBuf;
  502. };
  503. int TSocket::Open(int port) {
  504. return OpenImpl(port);
  505. }
  506. void TSocket::Close() {
  507. CloseImpl();
  508. }
  509. void TSocket::Wait(float timeoutSec, int netlibaVersion) const {
  510. Y_UNUSED(netlibaVersion);
  511. WaitImpl(timeoutSec);
  512. }
  513. void TSocket::CancelWait(int netlibaVersion) {
  514. Y_UNUSED(netlibaVersion);
  515. CancelWaitImpl();
  516. }
  517. bool TSocket::IsRecvMsgSupported() const {
  518. return true;
  519. }
  520. ssize_t TSocket::RecvMsg(TMsgHdr* hdr, int flags) {
  521. return RecvMsgImpl(hdr, flags);
  522. }
  523. TUdpRecvPacket* TSocket::Recv(sockaddr_in6* srcAddr, sockaddr_in6* dstAddr, int netlibaVersion) {
  524. Y_UNUSED(netlibaVersion);
  525. return RecvImpl(&RecvBuf, srcAddr, dstAddr);
  526. }
  527. ///////////////////////////////////////////////////////////////////////////////
  528. class TTryToRecvMMsgSocket: public TAbstractSocket {
  529. private:
  530. THolderVector<TUdpHostRecvBufAlloc> RecvPackets;
  531. TVector<sockaddr_in6> RecvPacketsSrcAddresses;
  532. TVector<TIoVec> RecvPacketsIoVecs;
  533. size_t RecvPacketsBegin; // first non returned to user
  534. size_t RecvPacketsHeadersEnd; // next after last one with data
  535. TVector<TMMsgHdr> RecvPacketsHeaders;
  536. TVector<std::array<char, CTRL_BUFFER_SIZE>> RecvPacketsCtrlBuffers;
  537. int FillRecvBuffers();
  538. public:
  539. static bool IsRecvMMsgSupported();
  540. // Tests showed best performance on queue size 128 (+7%).
  541. // If memory is limited you can use 12 - it gives +4%.
  542. // Do not use lower values - for example recvmmsg with 1 element is 3% slower that recvmsg!
  543. // (tested with junk/f0b0s/neTBasicSocket_queue_test).
  544. TTryToRecvMMsgSocket(const size_t recvQueueSize = 128);
  545. ~TTryToRecvMMsgSocket() override;
  546. int Open(int port) override;
  547. void Close() override;
  548. void Wait(float timeoutSec, int netlibaVersion) const override;
  549. void CancelWait(int netlibaVersion) override;
  550. bool IsRecvMsgSupported() const override {
  551. return false;
  552. }
  553. ssize_t RecvMsg(TMsgHdr* hdr, int flags) override {
  554. Y_UNUSED(hdr);
  555. Y_UNUSED(flags);
  556. Y_ABORT_UNLESS(false, "Use TBasicSocket for RecvMsg call! TRecvMMsgSocket implementation must use memcpy which is suboptimal and thus forbidden!");
  557. }
  558. TUdpRecvPacket* Recv(sockaddr_in6* addr, sockaddr_in6* dstAddr, int netlibaVersion) override;
  559. };
  560. TTryToRecvMMsgSocket::TTryToRecvMMsgSocket(const size_t recvQueueSize)
  561. : RecvPacketsBegin(0)
  562. , RecvPacketsHeadersEnd(0)
  563. {
  564. // recvmmsg is not supported - will act like TSocket,
  565. // we can't just VERIFY - TTryToRecvMMsgSocket is used as base class for TDualStackSocket.
  566. if (!IsRecvMMsgSupported()) {
  567. RecvPackets.reserve(1);
  568. RecvPackets.PushBack(new TUdpHostRecvBufAlloc);
  569. return;
  570. }
  571. RecvPackets.reserve(recvQueueSize);
  572. for (size_t i = 0; i != recvQueueSize; ++i) {
  573. RecvPackets.PushBack(new TUdpHostRecvBufAlloc);
  574. }
  575. RecvPacketsSrcAddresses.resize(recvQueueSize);
  576. RecvPacketsIoVecs.resize(recvQueueSize);
  577. RecvPacketsHeaders.resize(recvQueueSize);
  578. RecvPacketsCtrlBuffers.resize(recvQueueSize);
  579. for (size_t i = 0; i != recvQueueSize; ++i) {
  580. TMMsgHdr& mhdr = RecvPacketsHeaders[i];
  581. Zero(mhdr);
  582. RecvPacketsIoVecs[i] = CreateIoVec(RecvPackets[i]->GetDataPtr(), RecvPackets[i]->GetBufSize());
  583. char* buf = RecvPacketsCtrlBuffers[i].data();
  584. memset(buf, 0, CTRL_BUFFER_SIZE);
  585. mhdr.msg_hdr = CreateRecvMsgHdr(&RecvPacketsSrcAddresses[i], RecvPacketsIoVecs[i], buf);
  586. }
  587. }
  588. TTryToRecvMMsgSocket::~TTryToRecvMMsgSocket() {
  589. Close();
  590. }
  591. int TTryToRecvMMsgSocket::Open(int port) {
  592. return OpenImpl(port);
  593. }
  594. void TTryToRecvMMsgSocket::Close() {
  595. CloseImpl();
  596. }
  597. void TTryToRecvMMsgSocket::Wait(float timeoutSec, int netlibaVersion) const {
  598. Y_UNUSED(netlibaVersion);
  599. Y_ASSERT(RecvPacketsBegin == RecvPacketsHeadersEnd || IsRecvMMsgSupported());
  600. if (RecvPacketsBegin == RecvPacketsHeadersEnd) {
  601. WaitImpl(timeoutSec);
  602. }
  603. }
  604. void TTryToRecvMMsgSocket::CancelWait(int netlibaVersion) {
  605. Y_UNUSED(netlibaVersion);
  606. CancelWaitImpl();
  607. }
  608. bool TTryToRecvMMsgSocket::IsRecvMMsgSupported() {
  609. return RecvMMsgFunc != nullptr;
  610. }
  611. int TTryToRecvMMsgSocket::FillRecvBuffers() {
  612. Y_ASSERT(IsRecvMMsgSupported());
  613. Y_ASSERT(RecvPacketsBegin <= RecvPacketsHeadersEnd);
  614. if (RecvPacketsBegin < RecvPacketsHeadersEnd) {
  615. return RecvPacketsHeadersEnd - RecvPacketsBegin;
  616. }
  617. // no packets left from last recvmmsg call
  618. for (size_t i = 0; i != RecvPacketsHeadersEnd; ++i) { // reinit only used by last recvmmsg call headers
  619. RecvPacketsIoVecs[i] = CreateIoVec(RecvPackets[i]->GetDataPtr(), RecvPackets[i]->GetBufSize());
  620. }
  621. RecvPacketsBegin = RecvPacketsHeadersEnd = 0;
  622. const int r = RecvMMsgImpl(&RecvPacketsHeaders[0], (unsigned int)RecvPacketsHeaders.size(), 0, nullptr);
  623. if (r >= 0) {
  624. RecvPacketsHeadersEnd = r;
  625. } else {
  626. Y_ASSERT(LastSystemError() == EAGAIN || LastSystemError() == EWOULDBLOCK);
  627. }
  628. return r;
  629. }
  630. // not thread-safe
  631. TUdpRecvPacket* TTryToRecvMMsgSocket::Recv(sockaddr_in6* fromAddress, sockaddr_in6* dstAddr, int) {
  632. // act like TSocket
  633. if (!IsRecvMMsgSupported()) {
  634. return RecvImpl(RecvPackets[0], fromAddress, dstAddr);
  635. }
  636. if (FillRecvBuffers() <= 0) {
  637. return nullptr;
  638. }
  639. TUdpRecvPacket* result = RecvPackets[RecvPacketsBegin]->ExtractPacket();
  640. TMMsgHdr& mmsgHdr = RecvPacketsHeaders[RecvPacketsBegin];
  641. result->DataSize = (ssize_t)mmsgHdr.msg_len;
  642. if (dstAddr && !ExtractDestinationAddress(mmsgHdr.msg_hdr, dstAddr)) {
  643. // fprintf(stderr, "can`t get destination ip\n");
  644. }
  645. *fromAddress = RecvPacketsSrcAddresses[RecvPacketsBegin];
  646. //we must clean ctrlbuffer to be able to use it later
  647. #ifndef _win_
  648. memset(mmsgHdr.msg_hdr.msg_control, 0, CTRL_BUFFER_SIZE);
  649. mmsgHdr.msg_hdr.msg_controllen = CTRL_BUFFER_SIZE;
  650. #endif
  651. RecvPacketsBegin++;
  652. return result;
  653. }
  654. ///////////////////////////////////////////////////////////////////////////////
  655. /* TODO: too slow, needs to be optimized
  656. template<size_t TTNumRecvThreads>
  657. class TMTRecvSocket: public TAbstractSocket
  658. {
  659. private:
  660. typedef TLockFreePacketQueue<TTNumRecvThreads> TPacketQueue;
  661. static void* RecvThreadFunc(void* that)
  662. {
  663. static_cast<TMTRecvSocket*>(that)->RecvLoop();
  664. return NULL;
  665. }
  666. void RecvLoop()
  667. {
  668. TBestUnixRecvSocket impl;
  669. impl.Reset(*this);
  670. while (AtomicAdd(NumThreadsToDie, 0) == -1) {
  671. sockaddr_in6 addr;
  672. TUdpRecvPacket* packet = impl.Recv(&addr, NETLIBA_ANY_VERSION);
  673. if (!packet) {
  674. impl.Wait(0.0001, NETLIBA_ANY_VERSION); // so small tiomeout because we can't guarantee that 1 thread won't get all packets
  675. continue;
  676. }
  677. Queue.Push(packet, addr);
  678. }
  679. if (AtomicDecrement(NumThreadsToDie)) {
  680. impl.CancelWait(NETLIBA_ANY_VERSION);
  681. } else {
  682. AllThreadsAreDead.Signal();
  683. }
  684. }
  685. THolderVector<TThread> RecvThreads;
  686. TAtomic NumThreadsToDie;
  687. TSystemEvent AllThreadsAreDead;
  688. TPacketQueue Queue;
  689. public:
  690. TMTRecvSocket()
  691. : NumThreadsToDie(-1) {}
  692. ~TMTRecvSocket()
  693. {
  694. Close();
  695. }
  696. int Open(int port)
  697. {
  698. if (OpenImpl(port) != 0) {
  699. Y_ASSERT(!IsValid());
  700. return -1;
  701. }
  702. NumThreadsToDie = -1;
  703. RecvThreads.reserve(TTNumRecvThreads);
  704. for (size_t i = 0; i != TTNumRecvThreads; ++i) {
  705. RecvThreads.PushBack(new TThread(TThread::TParams(RecvThreadFunc, this).SetName("nl12_recv_skt")));
  706. RecvThreads.back()->Start();
  707. RecvThreads.back()->Detach();
  708. }
  709. return 0;
  710. }
  711. void Close()
  712. {
  713. if (!IsValid()) {
  714. return;
  715. }
  716. AtomicSwap(&NumThreadsToDie, (int)RecvThreads.size());
  717. CancelWaitImpl();
  718. Y_ABORT_UNLESS(AllThreadsAreDead.WaitT(TDuration::Seconds(30)), "TMTRecvSocket destruction failed");
  719. CloseImpl();
  720. }
  721. void Wait(float timeoutSec, int netlibaVersion) const
  722. {
  723. Y_UNUSED(netlibaVersion);
  724. Queue.GetEvent().WaitT(TDuration::Seconds(timeoutSec));
  725. }
  726. void CancelWait(int netlibaVersion)
  727. {
  728. Y_UNUSED(netlibaVersion);
  729. Queue.GetEvent().Signal();
  730. }
  731. TUdpRecvPacket* Recv(sockaddr_in6 *addr, int netlibaVersion)
  732. {
  733. Y_UNUSED(netlibaVersion);
  734. TUdpRecvPacket* result;
  735. if (!Queue.Pop(&result, addr)) {
  736. return NULL;
  737. }
  738. return result;
  739. }
  740. bool IsRecvMsgSupported() const { return false; }
  741. ssize_t RecvMsg(TMsgHdr* hdr, int flags) { Y_ABORT_UNLESS(false, "Use TBasicSocket for RecvMsg call! TMTRecvSocket implementation must use memcpy which is suboptimal and thus forbidden!"); }
  742. };
  743. */
  744. ///////////////////////////////////////////////////////////////////////////////
  745. // Send.*, Recv, Wait and CancelWait are thread-safe.
  746. class TDualStackSocket: public TTryToRecvMMsgSocket {
  747. private:
  748. typedef TTryToRecvMMsgSocket TBase;
  749. typedef TLockFreePacketQueue<1> TPacketQueue;
  750. static void* RecvThreadFunc(void* that);
  751. void RecvLoop();
  752. struct TFilteredPacketQueue {
  753. enum EPushResult {
  754. PR_FULL = 0,
  755. PR_OK = 1,
  756. PR_FILTERED = 2
  757. };
  758. const ui8 F1;
  759. const ui8 F2;
  760. const ui8 CmdPos;
  761. TFilteredPacketQueue(ui8 f1, ui8 f2, ui8 cmdPos)
  762. : F1(f1)
  763. , F2(f2)
  764. , CmdPos(cmdPos)
  765. {
  766. }
  767. bool Pop(TUdpRecvPacket** packet, sockaddr_in6* srcAddr, sockaddr_in6* dstAddr) {
  768. return Queue.Pop(packet, srcAddr, dstAddr);
  769. }
  770. ui8 Push(TUdpRecvPacket* packet, const TPacketMeta& meta) {
  771. if (Queue.IsDataPartFull()) {
  772. const ui8 cmd = packet->Data.get()[CmdPos];
  773. if (cmd == F1 || cmd == F2)
  774. return PR_FILTERED;
  775. }
  776. return Queue.Push(packet, meta); //false - PR_FULL, true - PR_OK
  777. }
  778. TPacketQueue Queue;
  779. };
  780. TFilteredPacketQueue& GetRecvQueue(int netlibaVersion) const;
  781. TSystemEvent& GetQueueEvent(const TFilteredPacketQueue& queue) const;
  782. TThread RecvThread;
  783. TAtomic ShouldDie;
  784. TSystemEvent DieEvent;
  785. mutable TFilteredPacketQueue RecvQueue6;
  786. mutable TFilteredPacketQueue RecvQueue12;
  787. public:
  788. TDualStackSocket();
  789. ~TDualStackSocket() override;
  790. int Open(int port) override;
  791. void Close() override;
  792. void Wait(float timeoutSec, int netlibaVersion) const override;
  793. void CancelWait(int netlibaVersion) override;
  794. bool IsRecvMsgSupported() const override {
  795. return false;
  796. }
  797. ssize_t RecvMsg(TMsgHdr* hdr, int flags) override {
  798. Y_UNUSED(hdr);
  799. Y_UNUSED(flags);
  800. Y_ABORT_UNLESS(false, "Use TBasicSocket for RecvMsg call! TDualStackSocket implementation must use memcpy which is suboptimal and thus forbidden!");
  801. }
  802. TUdpRecvPacket* Recv(sockaddr_in6* addr, sockaddr_in6* dstAddr, int netlibaVersion) override;
  803. };
  804. TDualStackSocket::TDualStackSocket()
  805. : RecvThread(TThread::TParams(RecvThreadFunc, this).SetName("nl12_dual_stack"))
  806. , ShouldDie(0)
  807. , RecvQueue6(NNetliba::DATA, NNetliba::DATA_SMALL, NNetliba::CMD_POS)
  808. , RecvQueue12(NNetliba_v12::DATA, NNetliba_v12::DATA_SMALL, NNetliba_v12::CMD_POS)
  809. {
  810. }
  811. // virtual functions don't work in dtors!
  812. TDualStackSocket::~TDualStackSocket() {
  813. Close();
  814. sockaddr_in6 srcAdd;
  815. sockaddr_in6 dstAddr;
  816. TUdpRecvPacket* ptr = nullptr;
  817. while (GetRecvQueue(NETLIBA_ANY_VERSION).Pop(&ptr, &srcAdd, &dstAddr)) {
  818. delete ptr;
  819. }
  820. while (GetRecvQueue(NETLIBA_V12_VERSION).Pop(&ptr, &srcAdd, &dstAddr)) {
  821. delete ptr;
  822. }
  823. }
  824. int TDualStackSocket::Open(int port) {
  825. if (TBase::Open(port) != 0) {
  826. Y_ASSERT(!IsValid());
  827. return -1;
  828. }
  829. AtomicSet(ShouldDie, 0);
  830. DieEvent.Reset();
  831. RecvThread.Start();
  832. RecvThread.Detach();
  833. return 0;
  834. }
  835. void TDualStackSocket::Close() {
  836. if (!IsValid()) {
  837. return;
  838. }
  839. AtomicSwap(&ShouldDie, 1);
  840. CancelWaitImpl();
  841. Y_ABORT_UNLESS(DieEvent.WaitT(TDuration::Seconds(30)), "TDualStackSocket::Close failed");
  842. TBase::Close();
  843. }
  844. TDualStackSocket::TFilteredPacketQueue& TDualStackSocket::GetRecvQueue(int netlibaVersion) const {
  845. return netlibaVersion == NETLIBA_V12_VERSION ? RecvQueue12 : RecvQueue6;
  846. }
  847. TSystemEvent& TDualStackSocket::GetQueueEvent(const TFilteredPacketQueue& queue) const {
  848. return queue.Queue.GetEvent();
  849. }
  850. void* TDualStackSocket::RecvThreadFunc(void* that) {
  851. SetHighestThreadPriority();
  852. static_cast<TDualStackSocket*>(that)->RecvLoop();
  853. return nullptr;
  854. }
  855. void TDualStackSocket::RecvLoop() {
  856. for (;;) {
  857. TUdpRecvPacket* p = nullptr;
  858. sockaddr_in6 srcAddr;
  859. sockaddr_in6 dstAddr;
  860. while (AtomicAdd(ShouldDie, 0) == 0 && (p = TBase::Recv(&srcAddr, &dstAddr, NETLIBA_ANY_VERSION))) {
  861. Y_ASSERT(p->DataStart == 0);
  862. if (p->DataSize < 12) {
  863. continue;
  864. }
  865. TFilteredPacketQueue& q = GetRecvQueue(p->Data.get()[8]);
  866. const ui8 res = q.Push(p, {srcAddr, dstAddr});
  867. if (res == TFilteredPacketQueue::PR_OK) {
  868. GetQueueEvent(q).Signal();
  869. } else {
  870. // simulate OS behavior on buffer overflow - drop packets.
  871. const NHPTimer::STime time = AtomicGet(RecvLag);
  872. const float sec = NHPTimer::GetSeconds(time);
  873. fprintf(stderr, "TDualStackSocket::RecvLoop netliba v%d queue overflow, recv lag: %f sec, dropping packet, res: %u\n",
  874. &q == &RecvQueue12 ? 12 : 6, sec, res);
  875. delete p;
  876. }
  877. }
  878. if (AtomicAdd(ShouldDie, 0)) {
  879. DieEvent.Signal();
  880. return;
  881. }
  882. TBase::Wait(0.1f, NETLIBA_ANY_VERSION);
  883. }
  884. }
  885. void TDualStackSocket::Wait(float timeoutSec, int netlibaVersion) const {
  886. TFilteredPacketQueue& q = GetRecvQueue(netlibaVersion);
  887. if (q.Queue.IsEmpty()) {
  888. GetQueueEvent(q).Reset();
  889. if (q.Queue.IsEmpty()) {
  890. GetQueueEvent(q).WaitT(TDuration::Seconds(timeoutSec));
  891. }
  892. }
  893. }
  894. void TDualStackSocket::CancelWait(int netlibaVersion) {
  895. GetQueueEvent(GetRecvQueue(netlibaVersion)).Signal();
  896. }
  897. // thread-safe
  898. TUdpRecvPacket* TDualStackSocket::Recv(sockaddr_in6* srcAddr, sockaddr_in6* dstAddr, int netlibaVersion) {
  899. TUdpRecvPacket* result = nullptr;
  900. if (!GetRecvQueue(netlibaVersion).Pop(&result, srcAddr, dstAddr)) {
  901. return nullptr;
  902. }
  903. return result;
  904. }
  905. ///////////////////////////////////////////////////////////////////////////////
  906. TIntrusivePtr<ISocket> CreateSocket() {
  907. return new TSocket();
  908. }
  909. TIntrusivePtr<ISocket> CreateDualStackSocket() {
  910. return new TDualStackSocket();
  911. }
  912. TIntrusivePtr<ISocket> CreateBestRecvSocket() {
  913. // TSocket is faster than TRecvMMsgFunc in case of unsupported recvmmsg
  914. if (!TTryToRecvMMsgSocket::IsRecvMMsgSupported()) {
  915. return new TSocket();
  916. }
  917. return new TTryToRecvMMsgSocket();
  918. }
  919. }