socket.cpp 31 KB

1234567891011121314151617181920212223242526272829303132333435363738394041424344454647484950515253545556575859606162636465666768697071727374757677787980818283848586878889909192939495969798991001011021031041051061071081091101111121131141151161171181191201211221231241251261271281291301311321331341351361371381391401411421431441451461471481491501511521531541551561571581591601611621631641651661671681691701711721731741751761771781791801811821831841851861871881891901911921931941951961971981992002012022032042052062072082092102112122132142152162172182192202212222232242252262272282292302312322332342352362372382392402412422432442452462472482492502512522532542552562572582592602612622632642652662672682692702712722732742752762772782792802812822832842852862872882892902912922932942952962972982993003013023033043053063073083093103113123133143153163173183193203213223233243253263273283293303313323333343353363373383393403413423433443453463473483493503513523533543553563573583593603613623633643653663673683693703713723733743753763773783793803813823833843853863873883893903913923933943953963973983994004014024034044054064074084094104114124134144154164174184194204214224234244254264274284294304314324334344354364374384394404414424434444454464474484494504514524534544554564574584594604614624634644654664674684694704714724734744754764774784794804814824834844854864874884894904914924934944954964974984995005015025035045055065075085095105115125135145155165175185195205215225235245255265275285295305315325335345355365375385395405415425435445455465475485495505515525535545555565575585595605615625635645655665675685695705715725735745755765775785795805815825835845855865875885895905915925935945955965975985996006016026036046056066076086096106116126136146156166176186196206216226236246256266276286296306316326336346356366376386396406416426436446456466476486496506516526536546556566576586596606616626636646656666676686696706716726736746756766776786796806816826836846856866876886896906916926936946956966976986997007017027037047057067077087097107117127137147157167177187197207217227237247257267277287297307317327337347357367377387397407417427437447457467477487497507517527537547557567577587597607617627637647657667677687697707717727737747757767777787797807817827837847857867877887897907917927937947957967977987998008018028038048058068078088098108118128138148158168178188198208218228238248258268278288298308318328338348358368378388398408418428438448458468478488498508518528538548558568578588598608618628638648658668678688698708718728738748758768778788798808818828838848858868878888898908918928938948958968978988999009019029039049059069079089099109119129139149159169179189199209219229239249259269279289299309319329339349359369379389399409419429439449459469479489499509519529539549559569579589599609619629639649659669679689699709719729739749759769779789799809819829839849859869879889899909919929939949959969979989991000100110021003100410051006100710081009101010111012101310141015101610171018101910201021102210231024102510261027102810291030103110321033103410351036103710381039104010411042104310441045104610471048104910501051105210531054105510561057105810591060106110621063106410651066106710681069107010711072107310741075107610771078107910801081108210831084108510861087108810891090109110921093109410951096109710981099110011011102110311041105110611071108110911101111111211131114111511161117111811191120112111221123112411251126112711281129113011311132113311341135113611371138113911401141114211431144114511461147114811491150115111521153115411551156115711581159116011611162116311641165116611671168116911701171117211731174117511761177117811791180118111821183118411851186118711881189119011911192119311941195119611971198119912001201120212031204120512061207120812091210121112121213121412151216121712181219122012211222122312241225122612271228122912301231123212331234123512361237123812391240124112421243124412451246124712481249125012511252125312541255
  1. #include "ip.h"
  2. #include "socket.h"
  3. #include "address.h"
  4. #include "pollerimpl.h"
  5. #include "iovec.h"
  6. #include <util/system/defaults.h>
  7. #include <util/system/byteorder.h>
  8. #if defined(_unix_)
  9. #include <netdb.h>
  10. #include <sys/types.h>
  11. #include <sys/socket.h>
  12. #include <sys/un.h>
  13. #include <sys/ioctl.h>
  14. #include <netinet/in.h>
  15. #include <netinet/tcp.h>
  16. #include <arpa/inet.h>
  17. #endif
  18. #if defined(_freebsd_)
  19. #include <sys/module.h>
  20. #define ACCEPT_FILTER_MOD
  21. #include <sys/socketvar.h>
  22. #endif
  23. #if defined(_win_)
  24. #include <cerrno>
  25. #include <winsock2.h>
  26. #include <ws2tcpip.h>
  27. #include <wspiapi.h>
  28. #include <util/system/compat.h>
  29. #endif
  30. #include <util/generic/ylimits.h>
  31. #include <util/string/cast.h>
  32. #include <util/stream/mem.h>
  33. #include <util/system/datetime.h>
  34. #include <util/system/error.h>
  35. #include <util/memory/tempbuf.h>
  36. #include <util/generic/singleton.h>
  37. #include <util/generic/hash_set.h>
  38. #include <stddef.h>
  39. #include <sys/uio.h>
  40. using namespace NAddr;
  41. #if defined(_win_)
  42. int inet_aton(const char* cp, struct in_addr* inp) {
  43. sockaddr_in addr;
  44. addr.sin_family = AF_INET;
  45. int psz = sizeof(addr);
  46. if (0 == WSAStringToAddress((char*)cp, AF_INET, nullptr, (LPSOCKADDR)&addr, &psz)) {
  47. memcpy(inp, &addr.sin_addr, sizeof(in_addr));
  48. return 1;
  49. }
  50. return 0;
  51. }
  52. #if (_WIN32_WINNT < 0x0600)
  53. const char* inet_ntop(int af, const void* src, char* dst, socklen_t size) {
  54. if (af != AF_INET) {
  55. errno = EINVAL;
  56. return 0;
  57. }
  58. const ui8* ia = (ui8*)src;
  59. if (snprintf(dst, size, "%u.%u.%u.%u", ia[0], ia[1], ia[2], ia[3]) >= (int)size) {
  60. errno = ENOSPC;
  61. return 0;
  62. }
  63. return dst;
  64. }
  65. struct evpair {
  66. int event;
  67. int winevent;
  68. };
  69. static const evpair evpairs_to_win[] = {
  70. {POLLIN, FD_READ | FD_CLOSE | FD_ACCEPT},
  71. {POLLRDNORM, FD_READ | FD_CLOSE | FD_ACCEPT},
  72. {POLLRDBAND, -1},
  73. {POLLPRI, -1},
  74. {POLLOUT, FD_WRITE | FD_CLOSE},
  75. {POLLWRNORM, FD_WRITE | FD_CLOSE},
  76. {POLLWRBAND, -1},
  77. {POLLERR, 0},
  78. {POLLHUP, 0},
  79. {POLLNVAL, 0}};
  80. static const size_t nevpairs_to_win = sizeof(evpairs_to_win) / sizeof(evpairs_to_win[0]);
  81. static const evpair evpairs_to_unix[] = {
  82. {FD_ACCEPT, POLLIN | POLLRDNORM},
  83. {FD_READ, POLLIN | POLLRDNORM},
  84. {FD_WRITE, POLLOUT | POLLWRNORM},
  85. {FD_CLOSE, POLLHUP},
  86. };
  87. static const size_t nevpairs_to_unix = sizeof(evpairs_to_unix) / sizeof(evpairs_to_unix[0]);
  88. static int convert_events(int events, const evpair* evpairs, size_t nevpairs, bool ignoreUnknown) noexcept {
  89. int result = 0;
  90. for (size_t i = 0; i < nevpairs; ++i) {
  91. int event = evpairs[i].event;
  92. if (events & event) {
  93. events ^= event;
  94. long winEvent = evpairs[i].winevent;
  95. if (winEvent == -1)
  96. return -1;
  97. if (winEvent == 0)
  98. continue;
  99. result |= winEvent;
  100. }
  101. }
  102. if (events != 0 && !ignoreUnknown)
  103. return -1;
  104. return result;
  105. }
  106. class TWSAEventHolder {
  107. private:
  108. HANDLE Event;
  109. public:
  110. inline TWSAEventHolder(HANDLE event) noexcept
  111. : Event(event)
  112. {
  113. }
  114. inline ~TWSAEventHolder() {
  115. WSACloseEvent(Event);
  116. }
  117. inline HANDLE Get() noexcept {
  118. return Event;
  119. }
  120. };
  121. int poll(struct pollfd fds[], nfds_t nfds, int timeout) noexcept {
  122. HANDLE rawEvent = WSACreateEvent();
  123. if (rawEvent == WSA_INVALID_EVENT) {
  124. errno = EIO;
  125. return -1;
  126. }
  127. TWSAEventHolder event(rawEvent);
  128. int checked_sockets = 0;
  129. for (pollfd* fd = fds; fd < fds + nfds; ++fd) {
  130. int win_events = convert_events(fd->events, evpairs_to_win, nevpairs_to_win, false);
  131. if (win_events == -1) {
  132. errno = EINVAL;
  133. return -1;
  134. }
  135. fd->revents = 0;
  136. if (WSAEventSelect(fd->fd, event.Get(), win_events)) {
  137. int error = WSAGetLastError();
  138. if (error == WSAEINVAL || error == WSAENOTSOCK) {
  139. fd->revents = POLLNVAL;
  140. ++checked_sockets;
  141. } else {
  142. errno = EIO;
  143. return -1;
  144. }
  145. }
  146. fd_set readfds;
  147. fd_set writefds;
  148. struct timeval timeout = {0, 0};
  149. FD_ZERO(&readfds);
  150. FD_ZERO(&writefds);
  151. if (fd->events & POLLIN) {
  152. FD_SET(fd->fd, &readfds);
  153. }
  154. if (fd->events & POLLOUT) {
  155. FD_SET(fd->fd, &writefds);
  156. }
  157. int error = select(0, &readfds, &writefds, nullptr, &timeout);
  158. if (error > 0) {
  159. if (FD_ISSET(fd->fd, &readfds)) {
  160. fd->revents |= POLLIN;
  161. }
  162. if (FD_ISSET(fd->fd, &writefds)) {
  163. fd->revents |= POLLOUT;
  164. }
  165. ++checked_sockets;
  166. }
  167. }
  168. if (checked_sockets > 0) {
  169. // returns without wait since we already have sockets in desired conditions
  170. return checked_sockets;
  171. }
  172. HANDLE events[] = {event.Get()};
  173. DWORD wait_result = WSAWaitForMultipleEvents(1, events, TRUE, timeout, FALSE);
  174. if (wait_result == WSA_WAIT_TIMEOUT)
  175. return 0;
  176. else if (wait_result == WSA_WAIT_EVENT_0) {
  177. for (pollfd* fd = fds; fd < fds + nfds; ++fd) {
  178. if (fd->revents == POLLNVAL)
  179. continue;
  180. WSANETWORKEVENTS network_events;
  181. if (WSAEnumNetworkEvents(fd->fd, event.Get(), &network_events)) {
  182. errno = EIO;
  183. return -1;
  184. }
  185. fd->revents = 0;
  186. for (int i = 0; i < FD_MAX_EVENTS; ++i) {
  187. if ((network_events.lNetworkEvents & (1 << i)) != 0 && network_events.iErrorCode[i]) {
  188. fd->revents = POLLERR;
  189. break;
  190. }
  191. }
  192. if (fd->revents == POLLERR)
  193. continue;
  194. if (network_events.lNetworkEvents) {
  195. fd->revents = static_cast<short>(convert_events(network_events.lNetworkEvents, evpairs_to_unix, nevpairs_to_unix, true));
  196. if (fd->revents & POLLHUP) {
  197. fd->revents &= POLLHUP | POLLIN | POLLRDNORM;
  198. }
  199. }
  200. }
  201. int chanded_sockets = 0;
  202. for (pollfd* fd = fds; fd < fds + nfds; ++fd)
  203. if (fd->revents != 0)
  204. ++chanded_sockets;
  205. return chanded_sockets;
  206. } else {
  207. errno = EIO;
  208. return -1;
  209. }
  210. }
  211. #endif
  212. #endif
  213. bool GetRemoteAddr(SOCKET Socket, char* str, socklen_t size) {
  214. if (!size) {
  215. return false;
  216. }
  217. TOpaqueAddr addr;
  218. if (getpeername(Socket, addr.MutableAddr(), addr.LenPtr()) != 0) {
  219. return false;
  220. }
  221. try {
  222. TMemoryOutput out(str, size - 1);
  223. PrintHost(out, addr);
  224. *out.Buf() = 0;
  225. return true;
  226. } catch (...) {
  227. // ¯\_(ツ)_/¯
  228. }
  229. return false;
  230. }
  231. void SetSocketTimeout(SOCKET s, long timeout) {
  232. SetSocketTimeout(s, timeout, 0);
  233. }
  234. void SetSocketTimeout(SOCKET s, long sec, long msec) {
  235. #ifdef SO_SNDTIMEO
  236. #ifdef _darwin_
  237. const timeval timeout = {sec, (__darwin_suseconds_t)msec * 1000};
  238. #elif defined(_emscripten_)
  239. const timeval timeout = {sec, static_cast<suseconds_t>(msec * 1000)};
  240. #elif defined(_unix_)
  241. const timeval timeout = {sec, msec * 1000};
  242. #else
  243. const int timeout = sec * 1000 + msec;
  244. #endif
  245. CheckedSetSockOpt(s, SOL_SOCKET, SO_RCVTIMEO, timeout, "recv timeout");
  246. CheckedSetSockOpt(s, SOL_SOCKET, SO_SNDTIMEO, timeout, "send timeout");
  247. #endif
  248. }
  249. void SetLinger(SOCKET s, bool on, unsigned len) {
  250. #ifdef SO_LINGER
  251. struct linger l = {on, (u_short)len};
  252. CheckedSetSockOpt(s, SOL_SOCKET, SO_LINGER, l, "linger");
  253. #endif
  254. }
  255. void SetZeroLinger(SOCKET s) {
  256. SetLinger(s, 1, 0);
  257. }
  258. void SetKeepAlive(SOCKET s, bool value) {
  259. CheckedSetSockOpt(s, SOL_SOCKET, SO_KEEPALIVE, (int)value, "keepalive");
  260. }
  261. void SetOutputBuffer(SOCKET s, unsigned value) {
  262. CheckedSetSockOpt(s, SOL_SOCKET, SO_SNDBUF, value, "output buffer");
  263. }
  264. void SetInputBuffer(SOCKET s, unsigned value) {
  265. CheckedSetSockOpt(s, SOL_SOCKET, SO_RCVBUF, value, "input buffer");
  266. }
  267. void SetReusePort(SOCKET s, bool value) {
  268. #if defined(_unix_)
  269. CheckedSetSockOpt(s, SOL_SOCKET, SO_REUSEPORT, (int)value, "reuse port");
  270. #else
  271. Y_UNUSED(s);
  272. Y_UNUSED(value);
  273. ythrow TSystemError(ENOSYS) << "SO_REUSEPORT is not available on Windows";
  274. #endif
  275. }
  276. void SetNoDelay(SOCKET s, bool value) {
  277. CheckedSetSockOpt(s, IPPROTO_TCP, TCP_NODELAY, (int)value, "tcp no delay");
  278. }
  279. void SetCloseOnExec(SOCKET s, bool value) {
  280. #if defined(_unix_)
  281. int flags = fcntl(s, F_GETFD);
  282. if (flags == -1) {
  283. ythrow TSystemError() << "fcntl() failed";
  284. }
  285. if (value) {
  286. flags |= FD_CLOEXEC;
  287. } else {
  288. flags &= ~FD_CLOEXEC;
  289. }
  290. if (fcntl(s, F_SETFD, flags) == -1) {
  291. ythrow TSystemError() << "fcntl() failed";
  292. }
  293. #else
  294. Y_UNUSED(s);
  295. Y_UNUSED(value);
  296. #endif
  297. }
  298. size_t GetMaximumSegmentSize(SOCKET s) {
  299. #if defined(TCP_MAXSEG)
  300. int val;
  301. if (GetSockOpt(s, IPPROTO_TCP, TCP_MAXSEG, val) == 0) {
  302. return (size_t)val;
  303. }
  304. #endif
  305. /*
  306. * probably a good guess...
  307. */
  308. return 8192;
  309. }
  310. size_t GetMaximumTransferUnit(SOCKET /*s*/) {
  311. // for someone who'll dare to write it
  312. // Linux: there rummored to be IP_MTU getsockopt() request
  313. // FreeBSD: request to a socket of type PF_ROUTE
  314. // with peer address as a destination argument
  315. return 8192;
  316. }
  317. int GetSocketToS(SOCKET s) {
  318. TOpaqueAddr addr;
  319. if (getsockname(s, addr.MutableAddr(), addr.LenPtr()) < 0) {
  320. ythrow TSystemError() << "getsockname() failed";
  321. }
  322. return GetSocketToS(s, &addr);
  323. }
  324. int GetSocketToS(SOCKET s, const IRemoteAddr* addr) {
  325. int result = 0;
  326. switch (addr->Addr()->sa_family) {
  327. case AF_INET:
  328. CheckedGetSockOpt(s, IPPROTO_IP, IP_TOS, result, "tos");
  329. break;
  330. case AF_INET6:
  331. #ifdef IPV6_TCLASS
  332. CheckedGetSockOpt(s, IPPROTO_IPV6, IPV6_TCLASS, result, "tos");
  333. #endif
  334. break;
  335. }
  336. return result;
  337. }
  338. void SetSocketToS(SOCKET s, const NAddr::IRemoteAddr* addr, int tos) {
  339. switch (addr->Addr()->sa_family) {
  340. case AF_INET:
  341. CheckedSetSockOpt(s, IPPROTO_IP, IP_TOS, tos, "tos");
  342. return;
  343. case AF_INET6:
  344. #ifdef IPV6_TCLASS
  345. CheckedSetSockOpt(s, IPPROTO_IPV6, IPV6_TCLASS, tos, "tos");
  346. return;
  347. #endif
  348. break;
  349. }
  350. ythrow yexception() << "SetSocketToS unsupported for family " << addr->Addr()->sa_family;
  351. }
  352. void SetSocketToS(SOCKET s, int tos) {
  353. TOpaqueAddr addr;
  354. if (getsockname(s, addr.MutableAddr(), addr.LenPtr()) < 0) {
  355. ythrow TSystemError() << "getsockname() failed";
  356. }
  357. SetSocketToS(s, &addr, tos);
  358. }
  359. void SetSocketPriority(SOCKET s, int priority) {
  360. #if defined(SO_PRIORITY)
  361. CheckedSetSockOpt(s, SOL_SOCKET, SO_PRIORITY, priority, "priority");
  362. #else
  363. Y_UNUSED(s);
  364. Y_UNUSED(priority);
  365. #endif
  366. }
  367. bool HasLocalAddress(SOCKET socket) {
  368. TOpaqueAddr localAddr;
  369. if (getsockname(socket, localAddr.MutableAddr(), localAddr.LenPtr()) != 0) {
  370. ythrow TSystemError() << "HasLocalAddress: getsockname() failed. ";
  371. }
  372. if (IsLoopback(localAddr)) {
  373. return true;
  374. }
  375. TOpaqueAddr remoteAddr;
  376. if (getpeername(socket, remoteAddr.MutableAddr(), remoteAddr.LenPtr()) != 0) {
  377. ythrow TSystemError() << "HasLocalAddress: getpeername() failed. ";
  378. }
  379. return IsSame(localAddr, remoteAddr);
  380. }
  381. namespace {
  382. #if defined(_linux_)
  383. #if !defined(TCP_FASTOPEN)
  384. #define TCP_FASTOPEN 23
  385. #endif
  386. #endif
  387. #if defined(TCP_FASTOPEN)
  388. struct TTcpFastOpenFeature {
  389. inline TTcpFastOpenFeature()
  390. : HasFastOpen_(false)
  391. {
  392. TSocketHolder tmp(socket(AF_INET, SOCK_STREAM, 0));
  393. int val = 1;
  394. int ret = SetSockOpt(tmp, IPPROTO_TCP, TCP_FASTOPEN, val);
  395. HasFastOpen_ = (ret == 0);
  396. }
  397. inline void SetFastOpen(SOCKET s, int qlen) const {
  398. if (HasFastOpen_) {
  399. CheckedSetSockOpt(s, IPPROTO_TCP, TCP_FASTOPEN, qlen, "setting TCP_FASTOPEN");
  400. }
  401. }
  402. static inline const TTcpFastOpenFeature* Instance() noexcept {
  403. return Singleton<TTcpFastOpenFeature>();
  404. }
  405. bool HasFastOpen_;
  406. };
  407. #endif
  408. } // namespace
  409. void SetTcpFastOpen(SOCKET s, int qlen) {
  410. #if defined(TCP_FASTOPEN)
  411. TTcpFastOpenFeature::Instance()->SetFastOpen(s, qlen);
  412. #else
  413. Y_UNUSED(s);
  414. Y_UNUSED(qlen);
  415. #endif
  416. }
  417. static bool IsBlocked(int lasterr) noexcept {
  418. return lasterr == EAGAIN || lasterr == EWOULDBLOCK;
  419. }
  420. struct TUnblockingGuard {
  421. SOCKET S_;
  422. TUnblockingGuard(SOCKET s)
  423. : S_(s)
  424. {
  425. SetNonBlock(S_, true);
  426. }
  427. ~TUnblockingGuard() {
  428. SetNonBlock(S_, false);
  429. }
  430. };
  431. static int MsgPeek(SOCKET s) {
  432. int flags = MSG_PEEK;
  433. #if defined(_win_)
  434. TUnblockingGuard unblocker(s);
  435. Y_UNUSED(unblocker);
  436. #else
  437. flags |= MSG_DONTWAIT;
  438. #endif
  439. char c;
  440. return recv(s, &c, 1, flags);
  441. }
  442. bool IsNotSocketClosedByOtherSide(SOCKET s) {
  443. return HasSocketDataToRead(s) != ESocketReadStatus::SocketClosed;
  444. }
  445. ESocketReadStatus HasSocketDataToRead(SOCKET s) {
  446. const int r = MsgPeek(s);
  447. if (r == -1 && IsBlocked(LastSystemError())) {
  448. return ESocketReadStatus::NoData;
  449. }
  450. if (r > 0) {
  451. return ESocketReadStatus::HasData;
  452. }
  453. return ESocketReadStatus::SocketClosed;
  454. }
  455. #if defined(_win_)
  456. static ssize_t DoSendMsg(SOCKET sock, const struct iovec* iov, int iovcnt) {
  457. return writev(sock, iov, iovcnt);
  458. }
  459. #else
  460. static ssize_t DoSendMsg(SOCKET sock, const struct iovec* iov, int iovcnt) {
  461. struct msghdr message;
  462. Zero(message);
  463. message.msg_iov = const_cast<struct iovec*>(iov);
  464. message.msg_iovlen = iovcnt;
  465. return sendmsg(sock, &message, MSG_NOSIGNAL);
  466. }
  467. #endif
  468. void TSocketHolder::Close() noexcept {
  469. if (Fd_ != INVALID_SOCKET) {
  470. bool ok = (closesocket(Fd_) == 0);
  471. if (!ok) {
  472. // Do not quietly close bad descriptor,
  473. // because often it means double close
  474. // that is disasterous
  475. #ifdef _win_
  476. Y_ABORT_UNLESS(WSAGetLastError() != WSAENOTSOCK, "must not quietly close bad socket descriptor");
  477. #elif defined(_unix_)
  478. Y_ABORT_UNLESS(errno != EBADF, "must not quietly close bad descriptor: fd=%d", int(Fd_));
  479. #else
  480. #error unsupported platform
  481. #endif
  482. }
  483. Fd_ = INVALID_SOCKET;
  484. }
  485. }
  486. class TSocket::TImpl: public TAtomicRefCount<TImpl> {
  487. using TOps = TSocket::TOps;
  488. public:
  489. inline TImpl(SOCKET fd, TOps* ops)
  490. : Fd_(fd)
  491. , Ops_(ops)
  492. {
  493. }
  494. inline ~TImpl() = default;
  495. inline SOCKET Fd() const noexcept {
  496. return Fd_;
  497. }
  498. inline ssize_t Send(const void* data, size_t len) {
  499. return Ops_->Send(Fd_, data, len);
  500. }
  501. inline ssize_t Recv(void* buf, size_t len) {
  502. return Ops_->Recv(Fd_, buf, len);
  503. }
  504. inline ssize_t SendV(const TPart* parts, size_t count) {
  505. return Ops_->SendV(Fd_, parts, count);
  506. }
  507. inline void Close() {
  508. Fd_.Close();
  509. }
  510. private:
  511. TSocketHolder Fd_;
  512. TOps* Ops_;
  513. };
  514. template <>
  515. void Out<const struct addrinfo*>(IOutputStream& os, const struct addrinfo* ai) {
  516. if (ai->ai_flags & AI_CANONNAME) {
  517. os << "`" << ai->ai_canonname << "' ";
  518. }
  519. os << '[';
  520. for (int i = 0; ai; ++i, ai = ai->ai_next) {
  521. if (i > 0) {
  522. os << ", ";
  523. }
  524. os << (const IRemoteAddr&)TAddrInfo(ai);
  525. }
  526. os << ']';
  527. }
  528. template <>
  529. void Out<struct addrinfo*>(IOutputStream& os, struct addrinfo* ai) {
  530. Out<const struct addrinfo*>(os, static_cast<const struct addrinfo*>(ai));
  531. }
  532. template <>
  533. void Out<TNetworkAddress>(IOutputStream& os, const TNetworkAddress& addr) {
  534. os << &*addr.Begin();
  535. }
  536. static inline const struct addrinfo* Iterate(const struct addrinfo* addr, const struct addrinfo* addr0, const int sockerr) {
  537. if (addr->ai_next) {
  538. return addr->ai_next;
  539. }
  540. ythrow TSystemError(sockerr) << "can not connect to " << addr0;
  541. }
  542. static inline SOCKET DoConnectImpl(const struct addrinfo* res, const TInstant& deadLine) {
  543. const struct addrinfo* addr0 = res;
  544. while (res) {
  545. TSocketHolder s(socket(res->ai_family, res->ai_socktype, res->ai_protocol));
  546. if (s.Closed()) {
  547. res = Iterate(res, addr0, LastSystemError());
  548. continue;
  549. }
  550. SetNonBlock(s, true);
  551. if (connect(s, res->ai_addr, (int)res->ai_addrlen)) {
  552. int err = LastSystemError();
  553. if (err == EINPROGRESS || err == EAGAIN || err == EWOULDBLOCK) {
  554. /*
  555. * must wait
  556. */
  557. struct pollfd p = {
  558. (SOCKET)s,
  559. POLLOUT,
  560. 0};
  561. const ssize_t n = PollD(&p, 1, deadLine);
  562. /*
  563. * timeout occured
  564. */
  565. if (n < 0) {
  566. ythrow TSystemError(-(int)n) << "can not connect";
  567. }
  568. CheckedGetSockOpt(s, SOL_SOCKET, SO_ERROR, err, "socket error");
  569. if (!err) {
  570. return s.Release();
  571. }
  572. }
  573. res = Iterate(res, addr0, err);
  574. continue;
  575. }
  576. return s.Release();
  577. }
  578. ythrow yexception() << "something went wrong: nullptr at addrinfo";
  579. }
  580. static inline SOCKET DoConnect(const struct addrinfo* res, const TInstant& deadLine) {
  581. TSocketHolder ret(DoConnectImpl(res, deadLine));
  582. SetNonBlock(ret, false);
  583. return ret.Release();
  584. }
  585. static inline ssize_t DoSendV(SOCKET fd, const struct iovec* iov, size_t count) {
  586. ssize_t ret = -1;
  587. do {
  588. ret = DoSendMsg(fd, iov, (int)count);
  589. } while (ret == -1 && errno == EINTR);
  590. if (ret < 0) {
  591. return -LastSystemError();
  592. }
  593. return ret;
  594. }
  595. template <bool isCompat>
  596. struct TSender {
  597. using TPart = TSocket::TPart;
  598. static inline ssize_t SendV(SOCKET fd, const TPart* parts, size_t count) {
  599. return DoSendV(fd, (const iovec*)parts, count);
  600. }
  601. };
  602. template <>
  603. struct TSender<false> {
  604. using TPart = TSocket::TPart;
  605. static inline ssize_t SendV(SOCKET fd, const TPart* parts, size_t count) {
  606. TTempBuf tempbuf(sizeof(struct iovec) * count);
  607. struct iovec* iov = (struct iovec*)tempbuf.Data();
  608. for (size_t i = 0; i < count; ++i) {
  609. struct iovec& io = iov[i];
  610. const TPart& part = parts[i];
  611. io.iov_base = (char*)part.buf;
  612. io.iov_len = part.len;
  613. }
  614. return DoSendV(fd, iov, count);
  615. }
  616. };
  617. class TCommonSockOps: public TSocket::TOps {
  618. using TPart = TSocket::TPart;
  619. public:
  620. inline TCommonSockOps() noexcept {
  621. }
  622. ~TCommonSockOps() override = default;
  623. ssize_t Send(SOCKET fd, const void* data, size_t len) override {
  624. ssize_t ret = -1;
  625. do {
  626. ret = send(fd, (const char*)data, (int)len, MSG_NOSIGNAL);
  627. } while (ret == -1 && errno == EINTR);
  628. if (ret < 0) {
  629. return -LastSystemError();
  630. }
  631. return ret;
  632. }
  633. ssize_t Recv(SOCKET fd, void* buf, size_t len) override {
  634. ssize_t ret = -1;
  635. do {
  636. ret = recv(fd, (char*)buf, (int)len, 0);
  637. } while (ret == -1 && errno == EINTR);
  638. if (ret < 0) {
  639. return -LastSystemError();
  640. }
  641. return ret;
  642. }
  643. ssize_t SendV(SOCKET fd, const TPart* parts, size_t count) override {
  644. ssize_t ret = SendVImpl(fd, parts, count);
  645. if (ret < 0) {
  646. return ret;
  647. }
  648. size_t len = TContIOVector::Bytes(parts, count);
  649. if ((size_t)ret == len) {
  650. return ret;
  651. }
  652. return SendVPartial(fd, parts, count, ret);
  653. }
  654. inline ssize_t SendVImpl(SOCKET fd, const TPart* parts, size_t count) {
  655. return TSender < (sizeof(iovec) == sizeof(TPart)) && (offsetof(iovec, iov_base) == offsetof(TPart, buf)) && (offsetof(iovec, iov_len) == offsetof(TPart, len)) > ::SendV(fd, parts, count);
  656. }
  657. ssize_t SendVPartial(SOCKET fd, const TPart* constParts, size_t count, size_t written);
  658. };
  659. ssize_t TCommonSockOps::SendVPartial(SOCKET fd, const TPart* constParts, size_t count, size_t written) {
  660. TTempBuf tempbuf(sizeof(TPart) * count);
  661. TPart* parts = (TPart*)tempbuf.Data();
  662. for (size_t i = 0; i < count; ++i) {
  663. parts[i] = constParts[i];
  664. }
  665. TContIOVector vec(parts, count);
  666. vec.Proceed(written);
  667. while (!vec.Complete()) {
  668. ssize_t ret = SendVImpl(fd, vec.Parts(), vec.Count());
  669. if (ret < 0) {
  670. return ret;
  671. }
  672. written += ret;
  673. vec.Proceed((size_t)ret);
  674. }
  675. return written;
  676. }
  677. static inline TSocket::TOps* GetCommonSockOps() noexcept {
  678. return Singleton<TCommonSockOps>();
  679. }
  680. TSocket::TSocket()
  681. : Impl_(new TImpl(INVALID_SOCKET, GetCommonSockOps()))
  682. {
  683. }
  684. TSocket::TSocket(SOCKET fd)
  685. : Impl_(new TImpl(fd, GetCommonSockOps()))
  686. {
  687. }
  688. TSocket::TSocket(SOCKET fd, TOps* ops)
  689. : Impl_(new TImpl(fd, ops))
  690. {
  691. }
  692. TSocket::TSocket(const TNetworkAddress& addr)
  693. : Impl_(new TImpl(DoConnect(addr.Info(), TInstant::Max()), GetCommonSockOps()))
  694. {
  695. }
  696. TSocket::TSocket(const TNetworkAddress& addr, const TDuration& timeOut)
  697. : Impl_(new TImpl(DoConnect(addr.Info(), timeOut.ToDeadLine()), GetCommonSockOps()))
  698. {
  699. }
  700. TSocket::TSocket(const TNetworkAddress& addr, const TInstant& deadLine)
  701. : Impl_(new TImpl(DoConnect(addr.Info(), deadLine), GetCommonSockOps()))
  702. {
  703. }
  704. TSocket::~TSocket() = default;
  705. SOCKET TSocket::Fd() const noexcept {
  706. return Impl_->Fd();
  707. }
  708. ssize_t TSocket::Send(const void* data, size_t len) {
  709. return Impl_->Send(data, len);
  710. }
  711. ssize_t TSocket::Recv(void* buf, size_t len) {
  712. return Impl_->Recv(buf, len);
  713. }
  714. ssize_t TSocket::SendV(const TPart* parts, size_t count) {
  715. return Impl_->SendV(parts, count);
  716. }
  717. void TSocket::Close() {
  718. Impl_->Close();
  719. }
  720. TSocketInput::TSocketInput(const TSocket& s) noexcept
  721. : S_(s)
  722. {
  723. }
  724. TSocketInput::~TSocketInput() = default;
  725. size_t TSocketInput::DoRead(void* buf, size_t len) {
  726. const ssize_t ret = S_.Recv(buf, len);
  727. if (ret >= 0) {
  728. return (size_t)ret;
  729. }
  730. ythrow TSystemError(-(int)ret) << "can not read from socket input stream";
  731. }
  732. TSocketOutput::TSocketOutput(const TSocket& s) noexcept
  733. : S_(s)
  734. {
  735. }
  736. TSocketOutput::~TSocketOutput() {
  737. try {
  738. Finish();
  739. } catch (...) {
  740. // ¯\_(ツ)_/¯
  741. }
  742. }
  743. void TSocketOutput::DoWrite(const void* buf, size_t len) {
  744. size_t send = 0;
  745. while (len) {
  746. const ssize_t ret = S_.Send(buf, len);
  747. if (ret < 0) {
  748. ythrow TSystemError(-(int)ret) << "can not write to socket output stream; " << send << " bytes already send";
  749. }
  750. buf = (const char*)buf + ret;
  751. len -= ret;
  752. send += ret;
  753. }
  754. }
  755. void TSocketOutput::DoWriteV(const TPart* parts, size_t count) {
  756. const ssize_t ret = S_.SendV(parts, count);
  757. if (ret < 0) {
  758. ythrow TSystemError(-(int)ret) << "can not writev to socket output stream";
  759. }
  760. /*
  761. * todo for nonblocking sockets?
  762. */
  763. }
  764. namespace {
  765. // https://bugzilla.mozilla.org/attachment.cgi?id=503263&action=diff
  766. struct TLocalNames: public THashSet<TStringBuf> {
  767. inline TLocalNames() {
  768. insert("localhost");
  769. insert("localhost.localdomain");
  770. insert("localhost6");
  771. insert("localhost6.localdomain6");
  772. insert("::1");
  773. }
  774. inline bool IsLocalName(const char* name) const noexcept {
  775. struct sockaddr_in sa;
  776. memset(&sa, 0, sizeof(sa));
  777. if (inet_pton(AF_INET, name, &(sa.sin_addr)) == 1) {
  778. return (InetToHost(sa.sin_addr.s_addr) >> 24) == 127;
  779. }
  780. return contains(name);
  781. }
  782. };
  783. } // namespace
  784. class TNetworkAddress::TImpl: public TAtomicRefCount<TImpl> {
  785. private:
  786. class TAddrInfoDeleter {
  787. public:
  788. TAddrInfoDeleter(bool useFreeAddrInfo = true)
  789. : UseFreeAddrInfo_(useFreeAddrInfo)
  790. {
  791. }
  792. void operator()(struct addrinfo* ai) noexcept {
  793. if (!UseFreeAddrInfo_ && ai != NULL) {
  794. if (ai->ai_addr != NULL) {
  795. free(ai->ai_addr);
  796. }
  797. struct addrinfo* p;
  798. while (ai != NULL) {
  799. p = ai;
  800. ai = ai->ai_next;
  801. free(p->ai_canonname);
  802. free(p);
  803. }
  804. } else if (ai != NULL) {
  805. freeaddrinfo(ai);
  806. }
  807. }
  808. private:
  809. bool UseFreeAddrInfo_ = true;
  810. };
  811. public:
  812. inline TImpl(const char* host, ui16 port, int flags)
  813. : Info_(nullptr, TAddrInfoDeleter{})
  814. {
  815. const TString port_st(ToString(port));
  816. struct addrinfo hints;
  817. memset(&hints, 0, sizeof(hints));
  818. hints.ai_flags = flags;
  819. hints.ai_family = PF_UNSPEC;
  820. hints.ai_socktype = SOCK_STREAM;
  821. if (!host) {
  822. hints.ai_flags |= AI_PASSIVE;
  823. } else {
  824. if (!Singleton<TLocalNames>()->IsLocalName(host)) {
  825. hints.ai_flags |= AI_ADDRCONFIG;
  826. }
  827. }
  828. struct addrinfo* pai = NULL;
  829. const int error = getaddrinfo(host, port_st.data(), &hints, &pai);
  830. if (error) {
  831. TAddrInfoDeleter()(pai);
  832. ythrow TNetworkResolutionError(error) << ": can not resolve " << host << ":" << port;
  833. }
  834. Info_.reset(pai);
  835. }
  836. inline TImpl(const char* path, int flags)
  837. : Info_(nullptr, TAddrInfoDeleter{/* useFreeAddrInfo = */ false})
  838. {
  839. THolder<struct sockaddr_un, TFree> sockAddr(
  840. reinterpret_cast<struct sockaddr_un*>(malloc(sizeof(struct sockaddr_un))));
  841. Y_ENSURE(strlen(path) < sizeof(sockAddr->sun_path), "Unix socket path more than " << sizeof(sockAddr->sun_path));
  842. sockAddr->sun_family = AF_UNIX;
  843. strcpy(sockAddr->sun_path, path);
  844. TAddrInfoPtr hints(reinterpret_cast<struct addrinfo*>(malloc(sizeof(struct addrinfo))), TAddrInfoDeleter{/* useFreeAddrInfo = */ false});
  845. memset(hints.get(), 0, sizeof(*hints));
  846. hints->ai_flags = flags;
  847. hints->ai_family = AF_UNIX;
  848. hints->ai_socktype = SOCK_STREAM;
  849. hints->ai_addrlen = sizeof(*sockAddr);
  850. hints->ai_addr = (struct sockaddr*)sockAddr.Release();
  851. Info_.reset(hints.release());
  852. }
  853. inline struct addrinfo* Info() const noexcept {
  854. return Info_.get();
  855. }
  856. private:
  857. using TAddrInfoPtr = std::unique_ptr<struct addrinfo, TAddrInfoDeleter>;
  858. TAddrInfoPtr Info_;
  859. };
  860. TNetworkAddress::TNetworkAddress(const TUnixSocketPath& unixSocketPath, int flags)
  861. : Impl_(new TImpl(unixSocketPath.Path.data(), flags))
  862. {
  863. }
  864. TNetworkAddress::TNetworkAddress(const TString& host, ui16 port, int flags)
  865. : Impl_(new TImpl(host.data(), port, flags))
  866. {
  867. }
  868. TNetworkAddress::TNetworkAddress(const TString& host, ui16 port)
  869. : Impl_(new TImpl(host.data(), port, 0))
  870. {
  871. }
  872. TNetworkAddress::TNetworkAddress(ui16 port)
  873. : Impl_(new TImpl(nullptr, port, 0))
  874. {
  875. }
  876. TNetworkAddress::~TNetworkAddress() = default;
  877. struct addrinfo* TNetworkAddress::Info() const noexcept {
  878. return Impl_->Info();
  879. }
  880. TNetworkResolutionError::TNetworkResolutionError(int error) {
  881. const char* errMsg = nullptr;
  882. #ifdef _win_
  883. errMsg = LastSystemErrorText(error); // gai_strerror is not thread-safe on Windows
  884. #else
  885. errMsg = gai_strerror(error);
  886. #endif
  887. (*this) << errMsg << "(" << error;
  888. #if defined(_unix_)
  889. if (error == EAI_SYSTEM) {
  890. (*this) << "; errno=" << LastSystemError();
  891. }
  892. #endif
  893. (*this) << "): ";
  894. }
  895. #if defined(_unix_)
  896. static inline int GetFlags(int fd) {
  897. const int ret = fcntl(fd, F_GETFL);
  898. if (ret == -1) {
  899. ythrow TSystemError() << "can not get fd flags";
  900. }
  901. return ret;
  902. }
  903. static inline void SetFlags(int fd, int flags) {
  904. if (fcntl(fd, F_SETFL, flags) == -1) {
  905. ythrow TSystemError() << "can not set fd flags";
  906. }
  907. }
  908. static inline void EnableFlag(int fd, int flag) {
  909. const int oldf = GetFlags(fd);
  910. const int newf = oldf | flag;
  911. if (oldf != newf) {
  912. SetFlags(fd, newf);
  913. }
  914. }
  915. static inline void DisableFlag(int fd, int flag) {
  916. const int oldf = GetFlags(fd);
  917. const int newf = oldf & (~flag);
  918. if (oldf != newf) {
  919. SetFlags(fd, newf);
  920. }
  921. }
  922. static inline void SetFlag(int fd, int flag, bool value) {
  923. if (value) {
  924. EnableFlag(fd, flag);
  925. } else {
  926. DisableFlag(fd, flag);
  927. }
  928. }
  929. static inline bool FlagsAreEnabled(int fd, int flags) {
  930. return GetFlags(fd) & flags;
  931. }
  932. #endif
  933. #if defined(_win_)
  934. static inline void SetNonBlockSocket(SOCKET fd, int value) {
  935. unsigned long inbuf = value;
  936. unsigned long outbuf = 0;
  937. DWORD written = 0;
  938. if (!inbuf) {
  939. WSAEventSelect(fd, nullptr, 0);
  940. }
  941. if (WSAIoctl(fd, FIONBIO, &inbuf, sizeof(inbuf), &outbuf, sizeof(outbuf), &written, 0, 0) == SOCKET_ERROR) {
  942. ythrow TSystemError() << "can not set non block socket state";
  943. }
  944. }
  945. static inline bool IsNonBlockSocket(SOCKET fd) {
  946. unsigned long buf = 0;
  947. if (WSAIoctl(fd, FIONBIO, 0, 0, &buf, sizeof(buf), 0, 0, 0) == SOCKET_ERROR) {
  948. ythrow TSystemError() << "can not get non block socket state";
  949. }
  950. return buf;
  951. }
  952. #endif
  953. void SetNonBlock(SOCKET fd, bool value) {
  954. #if defined(_unix_)
  955. #if defined(FIONBIO)
  956. Y_UNUSED(SetFlag); // shut up clang about unused function
  957. int nb = value;
  958. if (ioctl(fd, FIONBIO, &nb) < 0) {
  959. ythrow TSystemError() << "ioctl failed";
  960. }
  961. #else
  962. SetFlag(fd, O_NONBLOCK, value);
  963. #endif
  964. #elif defined(_win_)
  965. SetNonBlockSocket(fd, value);
  966. #else
  967. #error todo
  968. #endif
  969. }
  970. bool IsNonBlock(SOCKET fd) {
  971. #if defined(_unix_)
  972. return FlagsAreEnabled(fd, O_NONBLOCK);
  973. #elif defined(_win_)
  974. return IsNonBlockSocket(fd);
  975. #else
  976. #error todo
  977. #endif
  978. }
  979. void SetDeferAccept(SOCKET s) {
  980. (void)s;
  981. #if defined(TCP_DEFER_ACCEPT)
  982. CheckedSetSockOpt(s, IPPROTO_TCP, TCP_DEFER_ACCEPT, 10, "defer accept");
  983. #endif
  984. #if defined(SO_ACCEPTFILTER)
  985. struct accept_filter_arg afa;
  986. Zero(afa);
  987. strcpy(afa.af_name, "dataready");
  988. SetSockOpt(s, SOL_SOCKET, SO_ACCEPTFILTER, afa);
  989. #endif
  990. }
  991. ssize_t PollD(struct pollfd fds[], nfds_t nfds, const TInstant& deadLine) noexcept {
  992. TInstant now = TInstant::Now();
  993. do {
  994. const TDuration toWait = PollStep(deadLine, now);
  995. const int res = poll(fds, nfds, MicroToMilli(toWait.MicroSeconds()));
  996. if (res > 0) {
  997. return res;
  998. }
  999. if (res < 0) {
  1000. const int err = LastSystemError();
  1001. if (err != ETIMEDOUT && err != EINTR) {
  1002. return -err;
  1003. }
  1004. }
  1005. } while ((now = TInstant::Now()) < deadLine);
  1006. return -ETIMEDOUT;
  1007. }
  1008. void ShutDown(SOCKET s, int mode) {
  1009. if (shutdown(s, mode)) {
  1010. ythrow TSystemError() << "shutdown socket error";
  1011. }
  1012. }