socket.cpp 31 KB

12345678910111213141516171819202122232425262728293031323334353637383940414243444546474849505152535455565758596061626364656667686970717273747576777879808182838485868788899091929394959697989910010110210310410510610710810911011111211311411511611711811912012112212312412512612712812913013113213313413513613713813914014114214314414514614714814915015115215315415515615715815916016116216316416516616716816917017117217317417517617717817918018118218318418518618718818919019119219319419519619719819920020120220320420520620720820921021121221321421521621721821922022122222322422522622722822923023123223323423523623723823924024124224324424524624724824925025125225325425525625725825926026126226326426526626726826927027127227327427527627727827928028128228328428528628728828929029129229329429529629729829930030130230330430530630730830931031131231331431531631731831932032132232332432532632732832933033133233333433533633733833934034134234334434534634734834935035135235335435535635735835936036136236336436536636736836937037137237337437537637737837938038138238338438538638738838939039139239339439539639739839940040140240340440540640740840941041141241341441541641741841942042142242342442542642742842943043143243343443543643743843944044144244344444544644744844945045145245345445545645745845946046146246346446546646746846947047147247347447547647747847948048148248348448548648748848949049149249349449549649749849950050150250350450550650750850951051151251351451551651751851952052152252352452552652752852953053153253353453553653753853954054154254354454554654754854955055155255355455555655755855956056156256356456556656756856957057157257357457557657757857958058158258358458558658758858959059159259359459559659759859960060160260360460560660760860961061161261361461561661761861962062162262362462562662762862963063163263363463563663763863964064164264364464564664764864965065165265365465565665765865966066166266366466566666766866967067167267367467567667767867968068168268368468568668768868969069169269369469569669769869970070170270370470570670770870971071171271371471571671771871972072172272372472572672772872973073173273373473573673773873974074174274374474574674774874975075175275375475575675775875976076176276376476576676776876977077177277377477577677777877978078178278378478578678778878979079179279379479579679779879980080180280380480580680780880981081181281381481581681781881982082182282382482582682782882983083183283383483583683783883984084184284384484584684784884985085185285385485585685785885986086186286386486586686786886987087187287387487587687787887988088188288388488588688788888989089189289389489589689789889990090190290390490590690790890991091191291391491591691791891992092192292392492592692792892993093193293393493593693793893994094194294394494594694794894995095195295395495595695795895996096196296396496596696796896997097197297397497597697797897998098198298398498598698798898999099199299399499599699799899910001001100210031004100510061007100810091010101110121013101410151016101710181019102010211022102310241025102610271028102910301031103210331034103510361037103810391040104110421043104410451046104710481049105010511052105310541055105610571058105910601061106210631064106510661067106810691070107110721073107410751076107710781079108010811082108310841085108610871088108910901091109210931094109510961097109810991100110111021103110411051106110711081109111011111112111311141115111611171118111911201121112211231124112511261127112811291130113111321133113411351136113711381139114011411142114311441145114611471148114911501151115211531154115511561157115811591160116111621163116411651166116711681169117011711172117311741175117611771178117911801181118211831184118511861187118811891190119111921193119411951196119711981199120012011202120312041205120612071208120912101211121212131214121512161217121812191220122112221223122412251226122712281229123012311232123312341235123612371238123912401241124212431244124512461247124812491250125112521253125412551256125712581259126012611262
  1. #include "ip.h"
  2. #include "socket.h"
  3. #include "address.h"
  4. #include "pollerimpl.h"
  5. #include "iovec.h"
  6. #include <util/system/defaults.h>
  7. #include <util/system/byteorder.h>
  8. #if defined(_unix_)
  9. #include <netdb.h>
  10. #include <sys/types.h>
  11. #include <sys/socket.h>
  12. #include <sys/un.h>
  13. #include <sys/ioctl.h>
  14. #include <netinet/in.h>
  15. #include <netinet/tcp.h>
  16. #include <arpa/inet.h>
  17. #endif
  18. #if defined(_freebsd_)
  19. #include <sys/module.h>
  20. #define ACCEPT_FILTER_MOD
  21. #include <sys/socketvar.h>
  22. #endif
  23. #if defined(_win_)
  24. #include <cerrno>
  25. #include <winsock2.h>
  26. #include <ws2tcpip.h>
  27. #include <wspiapi.h>
  28. #include <util/system/compat.h>
  29. #endif
  30. #include <util/generic/ylimits.h>
  31. #include <util/string/cast.h>
  32. #include <util/stream/mem.h>
  33. #include <util/system/datetime.h>
  34. #include <util/system/error.h>
  35. #include <util/memory/tempbuf.h>
  36. #include <util/generic/singleton.h>
  37. #include <util/generic/hash_set.h>
  38. #include <stddef.h>
  39. #include <sys/uio.h>
  40. using namespace NAddr;
  41. #if defined(_win_)
  42. int inet_aton(const char* cp, struct in_addr* inp) {
  43. sockaddr_in addr;
  44. addr.sin_family = AF_INET;
  45. int psz = sizeof(addr);
  46. if (0 == WSAStringToAddress((char*)cp, AF_INET, nullptr, (LPSOCKADDR)&addr, &psz)) {
  47. memcpy(inp, &addr.sin_addr, sizeof(in_addr));
  48. return 1;
  49. }
  50. return 0;
  51. }
  52. #if (_WIN32_WINNT < 0x0600)
  53. const char* inet_ntop(int af, const void* src, char* dst, socklen_t size) {
  54. if (af != AF_INET) {
  55. errno = EINVAL;
  56. return 0;
  57. }
  58. const ui8* ia = (ui8*)src;
  59. if (snprintf(dst, size, "%u.%u.%u.%u", ia[0], ia[1], ia[2], ia[3]) >= (int)size) {
  60. errno = ENOSPC;
  61. return 0;
  62. }
  63. return dst;
  64. }
  65. struct evpair {
  66. int event;
  67. int winevent;
  68. };
  69. static const evpair evpairs_to_win[] = {
  70. {POLLIN, FD_READ | FD_CLOSE | FD_ACCEPT},
  71. {POLLRDNORM, FD_READ | FD_CLOSE | FD_ACCEPT},
  72. {POLLRDBAND, -1},
  73. {POLLPRI, -1},
  74. {POLLOUT, FD_WRITE | FD_CLOSE},
  75. {POLLWRNORM, FD_WRITE | FD_CLOSE},
  76. {POLLWRBAND, -1},
  77. {POLLERR, 0},
  78. {POLLHUP, 0},
  79. {POLLNVAL, 0}};
  80. static const size_t nevpairs_to_win = sizeof(evpairs_to_win) / sizeof(evpairs_to_win[0]);
  81. static const evpair evpairs_to_unix[] = {
  82. {FD_ACCEPT, POLLIN | POLLRDNORM},
  83. {FD_READ, POLLIN | POLLRDNORM},
  84. {FD_WRITE, POLLOUT | POLLWRNORM},
  85. {FD_CLOSE, POLLHUP},
  86. };
  87. static const size_t nevpairs_to_unix = sizeof(evpairs_to_unix) / sizeof(evpairs_to_unix[0]);
  88. static int convert_events(int events, const evpair* evpairs, size_t nevpairs, bool ignoreUnknown) noexcept {
  89. int result = 0;
  90. for (size_t i = 0; i < nevpairs; ++i) {
  91. int event = evpairs[i].event;
  92. if (events & event) {
  93. events ^= event;
  94. long winEvent = evpairs[i].winevent;
  95. if (winEvent == -1) {
  96. return -1;
  97. }
  98. if (winEvent == 0) {
  99. continue;
  100. }
  101. result |= winEvent;
  102. }
  103. }
  104. if (events != 0 && !ignoreUnknown) {
  105. return -1;
  106. }
  107. return result;
  108. }
  109. class TWSAEventHolder {
  110. private:
  111. HANDLE Event;
  112. public:
  113. inline TWSAEventHolder(HANDLE event) noexcept
  114. : Event(event)
  115. {
  116. }
  117. inline ~TWSAEventHolder() {
  118. WSACloseEvent(Event);
  119. }
  120. inline HANDLE Get() noexcept {
  121. return Event;
  122. }
  123. };
  124. int poll(struct pollfd fds[], nfds_t nfds, int timeout) noexcept {
  125. HANDLE rawEvent = WSACreateEvent();
  126. if (rawEvent == WSA_INVALID_EVENT) {
  127. errno = EIO;
  128. return -1;
  129. }
  130. TWSAEventHolder event(rawEvent);
  131. int checked_sockets = 0;
  132. for (pollfd* fd = fds; fd < fds + nfds; ++fd) {
  133. int win_events = convert_events(fd->events, evpairs_to_win, nevpairs_to_win, false);
  134. if (win_events == -1) {
  135. errno = EINVAL;
  136. return -1;
  137. }
  138. fd->revents = 0;
  139. if (WSAEventSelect(fd->fd, event.Get(), win_events)) {
  140. int error = WSAGetLastError();
  141. if (error == WSAEINVAL || error == WSAENOTSOCK) {
  142. fd->revents = POLLNVAL;
  143. ++checked_sockets;
  144. } else {
  145. errno = EIO;
  146. return -1;
  147. }
  148. }
  149. fd_set readfds;
  150. fd_set writefds;
  151. struct timeval timeout = {0, 0};
  152. FD_ZERO(&readfds);
  153. FD_ZERO(&writefds);
  154. if (fd->events & POLLIN) {
  155. FD_SET(fd->fd, &readfds);
  156. }
  157. if (fd->events & POLLOUT) {
  158. FD_SET(fd->fd, &writefds);
  159. }
  160. int error = select(0, &readfds, &writefds, nullptr, &timeout);
  161. if (error > 0) {
  162. if (FD_ISSET(fd->fd, &readfds)) {
  163. fd->revents |= POLLIN;
  164. }
  165. if (FD_ISSET(fd->fd, &writefds)) {
  166. fd->revents |= POLLOUT;
  167. }
  168. ++checked_sockets;
  169. }
  170. }
  171. if (checked_sockets > 0) {
  172. // returns without wait since we already have sockets in desired conditions
  173. return checked_sockets;
  174. }
  175. HANDLE events[] = {event.Get()};
  176. DWORD wait_result = WSAWaitForMultipleEvents(1, events, TRUE, timeout, FALSE);
  177. if (wait_result == WSA_WAIT_TIMEOUT) {
  178. return 0;
  179. } else if (wait_result == WSA_WAIT_EVENT_0) {
  180. for (pollfd* fd = fds; fd < fds + nfds; ++fd) {
  181. if (fd->revents == POLLNVAL) {
  182. continue;
  183. }
  184. WSANETWORKEVENTS network_events;
  185. if (WSAEnumNetworkEvents(fd->fd, event.Get(), &network_events)) {
  186. errno = EIO;
  187. return -1;
  188. }
  189. fd->revents = 0;
  190. for (int i = 0; i < FD_MAX_EVENTS; ++i) {
  191. if ((network_events.lNetworkEvents & (1 << i)) != 0 && network_events.iErrorCode[i]) {
  192. fd->revents = POLLERR;
  193. break;
  194. }
  195. }
  196. if (fd->revents == POLLERR) {
  197. continue;
  198. }
  199. if (network_events.lNetworkEvents) {
  200. fd->revents = static_cast<short>(convert_events(network_events.lNetworkEvents, evpairs_to_unix, nevpairs_to_unix, true));
  201. if (fd->revents & POLLHUP) {
  202. fd->revents &= POLLHUP | POLLIN | POLLRDNORM;
  203. }
  204. }
  205. }
  206. int chanded_sockets = 0;
  207. for (pollfd* fd = fds; fd < fds + nfds; ++fd) {
  208. if (fd->revents != 0) {
  209. ++chanded_sockets;
  210. }
  211. }
  212. return chanded_sockets;
  213. } else {
  214. errno = EIO;
  215. return -1;
  216. }
  217. }
  218. #endif
  219. #endif
  220. bool GetRemoteAddr(SOCKET Socket, char* str, socklen_t size) {
  221. if (!size) {
  222. return false;
  223. }
  224. TOpaqueAddr addr;
  225. if (getpeername(Socket, addr.MutableAddr(), addr.LenPtr()) != 0) {
  226. return false;
  227. }
  228. try {
  229. TMemoryOutput out(str, size - 1);
  230. PrintHost(out, addr);
  231. *out.Buf() = 0;
  232. return true;
  233. } catch (...) {
  234. // ¯\_(ツ)_/¯
  235. }
  236. return false;
  237. }
  238. void SetSocketTimeout(SOCKET s, long timeout) {
  239. SetSocketTimeout(s, timeout, 0);
  240. }
  241. void SetSocketTimeout(SOCKET s, long sec, long msec) {
  242. #ifdef SO_SNDTIMEO
  243. #ifdef _darwin_
  244. const timeval timeout = {sec, (__darwin_suseconds_t)msec * 1000};
  245. #elif defined(_emscripten_)
  246. const timeval timeout = {sec, static_cast<suseconds_t>(msec * 1000)};
  247. #elif defined(_unix_)
  248. const timeval timeout = {sec, msec * 1000};
  249. #else
  250. const int timeout = sec * 1000 + msec;
  251. #endif
  252. CheckedSetSockOpt(s, SOL_SOCKET, SO_RCVTIMEO, timeout, "recv timeout");
  253. CheckedSetSockOpt(s, SOL_SOCKET, SO_SNDTIMEO, timeout, "send timeout");
  254. #endif
  255. }
  256. void SetLinger(SOCKET s, bool on, unsigned len) {
  257. #ifdef SO_LINGER
  258. struct linger l = {on, (u_short)len};
  259. CheckedSetSockOpt(s, SOL_SOCKET, SO_LINGER, l, "linger");
  260. #endif
  261. }
  262. void SetZeroLinger(SOCKET s) {
  263. SetLinger(s, 1, 0);
  264. }
  265. void SetKeepAlive(SOCKET s, bool value) {
  266. CheckedSetSockOpt(s, SOL_SOCKET, SO_KEEPALIVE, (int)value, "keepalive");
  267. }
  268. void SetOutputBuffer(SOCKET s, unsigned value) {
  269. CheckedSetSockOpt(s, SOL_SOCKET, SO_SNDBUF, value, "output buffer");
  270. }
  271. void SetInputBuffer(SOCKET s, unsigned value) {
  272. CheckedSetSockOpt(s, SOL_SOCKET, SO_RCVBUF, value, "input buffer");
  273. }
  274. void SetReusePort(SOCKET s, bool value) {
  275. #if defined(_unix_)
  276. CheckedSetSockOpt(s, SOL_SOCKET, SO_REUSEPORT, (int)value, "reuse port");
  277. #else
  278. Y_UNUSED(s);
  279. Y_UNUSED(value);
  280. ythrow TSystemError(ENOSYS) << "SO_REUSEPORT is not available on Windows";
  281. #endif
  282. }
  283. void SetNoDelay(SOCKET s, bool value) {
  284. CheckedSetSockOpt(s, IPPROTO_TCP, TCP_NODELAY, (int)value, "tcp no delay");
  285. }
  286. void SetCloseOnExec(SOCKET s, bool value) {
  287. #if defined(_unix_)
  288. int flags = fcntl(s, F_GETFD);
  289. if (flags == -1) {
  290. ythrow TSystemError() << "fcntl() failed";
  291. }
  292. if (value) {
  293. flags |= FD_CLOEXEC;
  294. } else {
  295. flags &= ~FD_CLOEXEC;
  296. }
  297. if (fcntl(s, F_SETFD, flags) == -1) {
  298. ythrow TSystemError() << "fcntl() failed";
  299. }
  300. #else
  301. Y_UNUSED(s);
  302. Y_UNUSED(value);
  303. #endif
  304. }
  305. size_t GetMaximumSegmentSize(SOCKET s) {
  306. #if defined(TCP_MAXSEG)
  307. int val;
  308. if (GetSockOpt(s, IPPROTO_TCP, TCP_MAXSEG, val) == 0) {
  309. return (size_t)val;
  310. }
  311. #endif
  312. /*
  313. * probably a good guess...
  314. */
  315. return 8192;
  316. }
  317. size_t GetMaximumTransferUnit(SOCKET /*s*/) {
  318. // for someone who'll dare to write it
  319. // Linux: there rummored to be IP_MTU getsockopt() request
  320. // FreeBSD: request to a socket of type PF_ROUTE
  321. // with peer address as a destination argument
  322. return 8192;
  323. }
  324. int GetSocketToS(SOCKET s) {
  325. TOpaqueAddr addr;
  326. if (getsockname(s, addr.MutableAddr(), addr.LenPtr()) < 0) {
  327. ythrow TSystemError() << "getsockname() failed";
  328. }
  329. return GetSocketToS(s, &addr);
  330. }
  331. int GetSocketToS(SOCKET s, const IRemoteAddr* addr) {
  332. int result = 0;
  333. switch (addr->Addr()->sa_family) {
  334. case AF_INET:
  335. CheckedGetSockOpt(s, IPPROTO_IP, IP_TOS, result, "tos");
  336. break;
  337. case AF_INET6:
  338. #ifdef IPV6_TCLASS
  339. CheckedGetSockOpt(s, IPPROTO_IPV6, IPV6_TCLASS, result, "tos");
  340. #endif
  341. break;
  342. }
  343. return result;
  344. }
  345. void SetSocketToS(SOCKET s, const NAddr::IRemoteAddr* addr, int tos) {
  346. switch (addr->Addr()->sa_family) {
  347. case AF_INET:
  348. CheckedSetSockOpt(s, IPPROTO_IP, IP_TOS, tos, "tos");
  349. return;
  350. case AF_INET6:
  351. #ifdef IPV6_TCLASS
  352. CheckedSetSockOpt(s, IPPROTO_IPV6, IPV6_TCLASS, tos, "tos");
  353. return;
  354. #endif
  355. break;
  356. }
  357. ythrow yexception() << "SetSocketToS unsupported for family " << addr->Addr()->sa_family;
  358. }
  359. void SetSocketToS(SOCKET s, int tos) {
  360. TOpaqueAddr addr;
  361. if (getsockname(s, addr.MutableAddr(), addr.LenPtr()) < 0) {
  362. ythrow TSystemError() << "getsockname() failed";
  363. }
  364. SetSocketToS(s, &addr, tos);
  365. }
  366. void SetSocketPriority(SOCKET s, int priority) {
  367. #if defined(SO_PRIORITY)
  368. CheckedSetSockOpt(s, SOL_SOCKET, SO_PRIORITY, priority, "priority");
  369. #else
  370. Y_UNUSED(s);
  371. Y_UNUSED(priority);
  372. #endif
  373. }
  374. bool HasLocalAddress(SOCKET socket) {
  375. TOpaqueAddr localAddr;
  376. if (getsockname(socket, localAddr.MutableAddr(), localAddr.LenPtr()) != 0) {
  377. ythrow TSystemError() << "HasLocalAddress: getsockname() failed. ";
  378. }
  379. if (IsLoopback(localAddr)) {
  380. return true;
  381. }
  382. TOpaqueAddr remoteAddr;
  383. if (getpeername(socket, remoteAddr.MutableAddr(), remoteAddr.LenPtr()) != 0) {
  384. ythrow TSystemError() << "HasLocalAddress: getpeername() failed. ";
  385. }
  386. return IsSame(localAddr, remoteAddr);
  387. }
  388. namespace {
  389. #if defined(_linux_)
  390. #if !defined(TCP_FASTOPEN)
  391. #define TCP_FASTOPEN 23
  392. #endif
  393. #endif
  394. #if defined(TCP_FASTOPEN)
  395. struct TTcpFastOpenFeature {
  396. inline TTcpFastOpenFeature()
  397. : HasFastOpen_(false)
  398. {
  399. TSocketHolder tmp(socket(AF_INET, SOCK_STREAM, 0));
  400. int val = 1;
  401. int ret = SetSockOpt(tmp, IPPROTO_TCP, TCP_FASTOPEN, val);
  402. HasFastOpen_ = (ret == 0);
  403. }
  404. inline void SetFastOpen(SOCKET s, int qlen) const {
  405. if (HasFastOpen_) {
  406. CheckedSetSockOpt(s, IPPROTO_TCP, TCP_FASTOPEN, qlen, "setting TCP_FASTOPEN");
  407. }
  408. }
  409. static inline const TTcpFastOpenFeature* Instance() noexcept {
  410. return Singleton<TTcpFastOpenFeature>();
  411. }
  412. bool HasFastOpen_;
  413. };
  414. #endif
  415. } // namespace
  416. void SetTcpFastOpen(SOCKET s, int qlen) {
  417. #if defined(TCP_FASTOPEN)
  418. TTcpFastOpenFeature::Instance()->SetFastOpen(s, qlen);
  419. #else
  420. Y_UNUSED(s);
  421. Y_UNUSED(qlen);
  422. #endif
  423. }
  424. static bool IsBlocked(int lasterr) noexcept {
  425. return lasterr == EAGAIN || lasterr == EWOULDBLOCK;
  426. }
  427. struct TUnblockingGuard {
  428. SOCKET S_;
  429. TUnblockingGuard(SOCKET s)
  430. : S_(s)
  431. {
  432. SetNonBlock(S_, true);
  433. }
  434. ~TUnblockingGuard() {
  435. SetNonBlock(S_, false);
  436. }
  437. };
  438. static int MsgPeek(SOCKET s) {
  439. int flags = MSG_PEEK;
  440. #if defined(_win_)
  441. TUnblockingGuard unblocker(s);
  442. Y_UNUSED(unblocker);
  443. #else
  444. flags |= MSG_DONTWAIT;
  445. #endif
  446. char c;
  447. return recv(s, &c, 1, flags);
  448. }
  449. bool IsNotSocketClosedByOtherSide(SOCKET s) {
  450. return HasSocketDataToRead(s) != ESocketReadStatus::SocketClosed;
  451. }
  452. ESocketReadStatus HasSocketDataToRead(SOCKET s) {
  453. const int r = MsgPeek(s);
  454. if (r == -1 && IsBlocked(LastSystemError())) {
  455. return ESocketReadStatus::NoData;
  456. }
  457. if (r > 0) {
  458. return ESocketReadStatus::HasData;
  459. }
  460. return ESocketReadStatus::SocketClosed;
  461. }
  462. #if defined(_win_)
  463. static ssize_t DoSendMsg(SOCKET sock, const struct iovec* iov, int iovcnt) {
  464. return writev(sock, iov, iovcnt);
  465. }
  466. #else
  467. static ssize_t DoSendMsg(SOCKET sock, const struct iovec* iov, int iovcnt) {
  468. struct msghdr message;
  469. Zero(message);
  470. message.msg_iov = const_cast<struct iovec*>(iov);
  471. message.msg_iovlen = iovcnt;
  472. return sendmsg(sock, &message, MSG_NOSIGNAL);
  473. }
  474. #endif
  475. void TSocketHolder::Close() noexcept {
  476. if (Fd_ != INVALID_SOCKET) {
  477. bool ok = (closesocket(Fd_) == 0);
  478. if (!ok) {
  479. // Do not quietly close bad descriptor,
  480. // because often it means double close
  481. // that is disasterous
  482. #ifdef _win_
  483. Y_ABORT_UNLESS(WSAGetLastError() != WSAENOTSOCK, "must not quietly close bad socket descriptor");
  484. #elif defined(_unix_)
  485. Y_ABORT_UNLESS(errno != EBADF, "must not quietly close bad descriptor: fd=%d", int(Fd_));
  486. #else
  487. #error unsupported platform
  488. #endif
  489. }
  490. Fd_ = INVALID_SOCKET;
  491. }
  492. }
  493. class TSocket::TImpl: public TAtomicRefCount<TImpl> {
  494. using TOps = TSocket::TOps;
  495. public:
  496. inline TImpl(SOCKET fd, TOps* ops)
  497. : Fd_(fd)
  498. , Ops_(ops)
  499. {
  500. }
  501. inline ~TImpl() = default;
  502. inline SOCKET Fd() const noexcept {
  503. return Fd_;
  504. }
  505. inline ssize_t Send(const void* data, size_t len) {
  506. return Ops_->Send(Fd_, data, len);
  507. }
  508. inline ssize_t Recv(void* buf, size_t len) {
  509. return Ops_->Recv(Fd_, buf, len);
  510. }
  511. inline ssize_t SendV(const TPart* parts, size_t count) {
  512. return Ops_->SendV(Fd_, parts, count);
  513. }
  514. inline void Close() {
  515. Fd_.Close();
  516. }
  517. private:
  518. TSocketHolder Fd_;
  519. TOps* Ops_;
  520. };
  521. template <>
  522. void Out<const struct addrinfo*>(IOutputStream& os, const struct addrinfo* ai) {
  523. if (ai->ai_flags & AI_CANONNAME) {
  524. os << "`" << ai->ai_canonname << "' ";
  525. }
  526. os << '[';
  527. for (int i = 0; ai; ++i, ai = ai->ai_next) {
  528. if (i > 0) {
  529. os << ", ";
  530. }
  531. os << (const IRemoteAddr&)TAddrInfo(ai);
  532. }
  533. os << ']';
  534. }
  535. template <>
  536. void Out<struct addrinfo*>(IOutputStream& os, struct addrinfo* ai) {
  537. Out<const struct addrinfo*>(os, static_cast<const struct addrinfo*>(ai));
  538. }
  539. template <>
  540. void Out<TNetworkAddress>(IOutputStream& os, const TNetworkAddress& addr) {
  541. os << &*addr.Begin();
  542. }
  543. static inline const struct addrinfo* Iterate(const struct addrinfo* addr, const struct addrinfo* addr0, const int sockerr) {
  544. if (addr->ai_next) {
  545. return addr->ai_next;
  546. }
  547. ythrow TSystemError(sockerr) << "can not connect to " << addr0;
  548. }
  549. static inline SOCKET DoConnectImpl(const struct addrinfo* res, const TInstant& deadLine) {
  550. const struct addrinfo* addr0 = res;
  551. while (res) {
  552. TSocketHolder s(socket(res->ai_family, res->ai_socktype, res->ai_protocol));
  553. if (s.Closed()) {
  554. res = Iterate(res, addr0, LastSystemError());
  555. continue;
  556. }
  557. SetNonBlock(s, true);
  558. if (connect(s, res->ai_addr, (int)res->ai_addrlen)) {
  559. int err = LastSystemError();
  560. if (err == EINPROGRESS || err == EAGAIN || err == EWOULDBLOCK) {
  561. /*
  562. * must wait
  563. */
  564. struct pollfd p = {
  565. (SOCKET)s,
  566. POLLOUT,
  567. 0};
  568. const ssize_t n = PollD(&p, 1, deadLine);
  569. /*
  570. * timeout occured
  571. */
  572. if (n < 0) {
  573. ythrow TSystemError(-(int)n) << "can not connect";
  574. }
  575. CheckedGetSockOpt(s, SOL_SOCKET, SO_ERROR, err, "socket error");
  576. if (!err) {
  577. return s.Release();
  578. }
  579. }
  580. res = Iterate(res, addr0, err);
  581. continue;
  582. }
  583. return s.Release();
  584. }
  585. ythrow yexception() << "something went wrong: nullptr at addrinfo";
  586. }
  587. static inline SOCKET DoConnect(const struct addrinfo* res, const TInstant& deadLine) {
  588. TSocketHolder ret(DoConnectImpl(res, deadLine));
  589. SetNonBlock(ret, false);
  590. return ret.Release();
  591. }
  592. static inline ssize_t DoSendV(SOCKET fd, const struct iovec* iov, size_t count) {
  593. ssize_t ret = -1;
  594. do {
  595. ret = DoSendMsg(fd, iov, (int)count);
  596. } while (ret == -1 && errno == EINTR);
  597. if (ret < 0) {
  598. return -LastSystemError();
  599. }
  600. return ret;
  601. }
  602. template <bool isCompat>
  603. struct TSender {
  604. using TPart = TSocket::TPart;
  605. static inline ssize_t SendV(SOCKET fd, const TPart* parts, size_t count) {
  606. return DoSendV(fd, (const iovec*)parts, count);
  607. }
  608. };
  609. template <>
  610. struct TSender<false> {
  611. using TPart = TSocket::TPart;
  612. static inline ssize_t SendV(SOCKET fd, const TPart* parts, size_t count) {
  613. TTempBuf tempbuf(sizeof(struct iovec) * count);
  614. struct iovec* iov = (struct iovec*)tempbuf.Data();
  615. for (size_t i = 0; i < count; ++i) {
  616. struct iovec& io = iov[i];
  617. const TPart& part = parts[i];
  618. io.iov_base = (char*)part.buf;
  619. io.iov_len = part.len;
  620. }
  621. return DoSendV(fd, iov, count);
  622. }
  623. };
  624. class TCommonSockOps: public TSocket::TOps {
  625. using TPart = TSocket::TPart;
  626. public:
  627. inline TCommonSockOps() noexcept {
  628. }
  629. ~TCommonSockOps() override = default;
  630. ssize_t Send(SOCKET fd, const void* data, size_t len) override {
  631. ssize_t ret = -1;
  632. do {
  633. ret = send(fd, (const char*)data, (int)len, MSG_NOSIGNAL);
  634. } while (ret == -1 && errno == EINTR);
  635. if (ret < 0) {
  636. return -LastSystemError();
  637. }
  638. return ret;
  639. }
  640. ssize_t Recv(SOCKET fd, void* buf, size_t len) override {
  641. ssize_t ret = -1;
  642. do {
  643. ret = recv(fd, (char*)buf, (int)len, 0);
  644. } while (ret == -1 && errno == EINTR);
  645. if (ret < 0) {
  646. return -LastSystemError();
  647. }
  648. return ret;
  649. }
  650. ssize_t SendV(SOCKET fd, const TPart* parts, size_t count) override {
  651. ssize_t ret = SendVImpl(fd, parts, count);
  652. if (ret < 0) {
  653. return ret;
  654. }
  655. size_t len = TContIOVector::Bytes(parts, count);
  656. if ((size_t)ret == len) {
  657. return ret;
  658. }
  659. return SendVPartial(fd, parts, count, ret);
  660. }
  661. inline ssize_t SendVImpl(SOCKET fd, const TPart* parts, size_t count) {
  662. return TSender < (sizeof(iovec) == sizeof(TPart)) && (offsetof(iovec, iov_base) == offsetof(TPart, buf)) && (offsetof(iovec, iov_len) == offsetof(TPart, len)) > ::SendV(fd, parts, count);
  663. }
  664. ssize_t SendVPartial(SOCKET fd, const TPart* constParts, size_t count, size_t written);
  665. };
  666. ssize_t TCommonSockOps::SendVPartial(SOCKET fd, const TPart* constParts, size_t count, size_t written) {
  667. TTempBuf tempbuf(sizeof(TPart) * count);
  668. TPart* parts = (TPart*)tempbuf.Data();
  669. for (size_t i = 0; i < count; ++i) {
  670. parts[i] = constParts[i];
  671. }
  672. TContIOVector vec(parts, count);
  673. vec.Proceed(written);
  674. while (!vec.Complete()) {
  675. ssize_t ret = SendVImpl(fd, vec.Parts(), vec.Count());
  676. if (ret < 0) {
  677. return ret;
  678. }
  679. written += ret;
  680. vec.Proceed((size_t)ret);
  681. }
  682. return written;
  683. }
  684. static inline TSocket::TOps* GetCommonSockOps() noexcept {
  685. return Singleton<TCommonSockOps>();
  686. }
  687. TSocket::TSocket()
  688. : Impl_(new TImpl(INVALID_SOCKET, GetCommonSockOps()))
  689. {
  690. }
  691. TSocket::TSocket(SOCKET fd)
  692. : Impl_(new TImpl(fd, GetCommonSockOps()))
  693. {
  694. }
  695. TSocket::TSocket(SOCKET fd, TOps* ops)
  696. : Impl_(new TImpl(fd, ops))
  697. {
  698. }
  699. TSocket::TSocket(const TNetworkAddress& addr)
  700. : Impl_(new TImpl(DoConnect(addr.Info(), TInstant::Max()), GetCommonSockOps()))
  701. {
  702. }
  703. TSocket::TSocket(const TNetworkAddress& addr, const TDuration& timeOut)
  704. : Impl_(new TImpl(DoConnect(addr.Info(), timeOut.ToDeadLine()), GetCommonSockOps()))
  705. {
  706. }
  707. TSocket::TSocket(const TNetworkAddress& addr, const TInstant& deadLine)
  708. : Impl_(new TImpl(DoConnect(addr.Info(), deadLine), GetCommonSockOps()))
  709. {
  710. }
  711. TSocket::~TSocket() = default;
  712. SOCKET TSocket::Fd() const noexcept {
  713. return Impl_->Fd();
  714. }
  715. ssize_t TSocket::Send(const void* data, size_t len) {
  716. return Impl_->Send(data, len);
  717. }
  718. ssize_t TSocket::Recv(void* buf, size_t len) {
  719. return Impl_->Recv(buf, len);
  720. }
  721. ssize_t TSocket::SendV(const TPart* parts, size_t count) {
  722. return Impl_->SendV(parts, count);
  723. }
  724. void TSocket::Close() {
  725. Impl_->Close();
  726. }
  727. TSocketInput::TSocketInput(const TSocket& s) noexcept
  728. : S_(s)
  729. {
  730. }
  731. TSocketInput::~TSocketInput() = default;
  732. size_t TSocketInput::DoRead(void* buf, size_t len) {
  733. const ssize_t ret = S_.Recv(buf, len);
  734. if (ret >= 0) {
  735. return (size_t)ret;
  736. }
  737. ythrow TSystemError(-(int)ret) << "can not read from socket input stream";
  738. }
  739. TSocketOutput::TSocketOutput(const TSocket& s) noexcept
  740. : S_(s)
  741. {
  742. }
  743. TSocketOutput::~TSocketOutput() {
  744. try {
  745. Finish();
  746. } catch (...) {
  747. // ¯\_(ツ)_/¯
  748. }
  749. }
  750. void TSocketOutput::DoWrite(const void* buf, size_t len) {
  751. size_t send = 0;
  752. while (len) {
  753. const ssize_t ret = S_.Send(buf, len);
  754. if (ret < 0) {
  755. ythrow TSystemError(-(int)ret) << "can not write to socket output stream; " << send << " bytes already send";
  756. }
  757. buf = (const char*)buf + ret;
  758. len -= ret;
  759. send += ret;
  760. }
  761. }
  762. void TSocketOutput::DoWriteV(const TPart* parts, size_t count) {
  763. const ssize_t ret = S_.SendV(parts, count);
  764. if (ret < 0) {
  765. ythrow TSystemError(-(int)ret) << "can not writev to socket output stream";
  766. }
  767. /*
  768. * todo for nonblocking sockets?
  769. */
  770. }
  771. namespace {
  772. // https://bugzilla.mozilla.org/attachment.cgi?id=503263&action=diff
  773. struct TLocalNames: public THashSet<TStringBuf> {
  774. inline TLocalNames() {
  775. insert("localhost");
  776. insert("localhost.localdomain");
  777. insert("localhost6");
  778. insert("localhost6.localdomain6");
  779. insert("::1");
  780. }
  781. inline bool IsLocalName(const char* name) const noexcept {
  782. struct sockaddr_in sa;
  783. memset(&sa, 0, sizeof(sa));
  784. if (inet_pton(AF_INET, name, &(sa.sin_addr)) == 1) {
  785. return (InetToHost(sa.sin_addr.s_addr) >> 24) == 127;
  786. }
  787. return contains(name);
  788. }
  789. };
  790. } // namespace
  791. class TNetworkAddress::TImpl: public TAtomicRefCount<TImpl> {
  792. private:
  793. class TAddrInfoDeleter {
  794. public:
  795. TAddrInfoDeleter(bool useFreeAddrInfo = true)
  796. : UseFreeAddrInfo_(useFreeAddrInfo)
  797. {
  798. }
  799. void operator()(struct addrinfo* ai) noexcept {
  800. if (!UseFreeAddrInfo_ && ai != NULL) {
  801. if (ai->ai_addr != NULL) {
  802. free(ai->ai_addr);
  803. }
  804. struct addrinfo* p;
  805. while (ai != NULL) {
  806. p = ai;
  807. ai = ai->ai_next;
  808. free(p->ai_canonname);
  809. free(p);
  810. }
  811. } else if (ai != NULL) {
  812. freeaddrinfo(ai);
  813. }
  814. }
  815. private:
  816. bool UseFreeAddrInfo_ = true;
  817. };
  818. public:
  819. inline TImpl(const char* host, ui16 port, int flags)
  820. : Info_(nullptr, TAddrInfoDeleter{})
  821. {
  822. const TString port_st(ToString(port));
  823. struct addrinfo hints;
  824. memset(&hints, 0, sizeof(hints));
  825. hints.ai_flags = flags;
  826. hints.ai_family = PF_UNSPEC;
  827. hints.ai_socktype = SOCK_STREAM;
  828. if (!host) {
  829. hints.ai_flags |= AI_PASSIVE;
  830. } else {
  831. if (!Singleton<TLocalNames>()->IsLocalName(host)) {
  832. hints.ai_flags |= AI_ADDRCONFIG;
  833. }
  834. }
  835. struct addrinfo* pai = NULL;
  836. const int error = getaddrinfo(host, port_st.data(), &hints, &pai);
  837. if (error) {
  838. TAddrInfoDeleter()(pai);
  839. ythrow TNetworkResolutionError(error) << ": can not resolve " << host << ":" << port;
  840. }
  841. Info_.reset(pai);
  842. }
  843. inline TImpl(const char* path, int flags)
  844. : Info_(nullptr, TAddrInfoDeleter{/* useFreeAddrInfo = */ false})
  845. {
  846. THolder<struct sockaddr_un, TFree> sockAddr(
  847. reinterpret_cast<struct sockaddr_un*>(malloc(sizeof(struct sockaddr_un))));
  848. Y_ENSURE(strlen(path) < sizeof(sockAddr->sun_path), "Unix socket path more than " << sizeof(sockAddr->sun_path));
  849. sockAddr->sun_family = AF_UNIX;
  850. strcpy(sockAddr->sun_path, path);
  851. TAddrInfoPtr hints(reinterpret_cast<struct addrinfo*>(malloc(sizeof(struct addrinfo))), TAddrInfoDeleter{/* useFreeAddrInfo = */ false});
  852. memset(hints.get(), 0, sizeof(*hints));
  853. hints->ai_flags = flags;
  854. hints->ai_family = AF_UNIX;
  855. hints->ai_socktype = SOCK_STREAM;
  856. hints->ai_addrlen = sizeof(*sockAddr);
  857. hints->ai_addr = (struct sockaddr*)sockAddr.Release();
  858. Info_.reset(hints.release());
  859. }
  860. inline struct addrinfo* Info() const noexcept {
  861. return Info_.get();
  862. }
  863. private:
  864. using TAddrInfoPtr = std::unique_ptr<struct addrinfo, TAddrInfoDeleter>;
  865. TAddrInfoPtr Info_;
  866. };
  867. TNetworkAddress::TNetworkAddress(const TUnixSocketPath& unixSocketPath, int flags)
  868. : Impl_(new TImpl(unixSocketPath.Path.data(), flags))
  869. {
  870. }
  871. TNetworkAddress::TNetworkAddress(const TString& host, ui16 port, int flags)
  872. : Impl_(new TImpl(host.data(), port, flags))
  873. {
  874. }
  875. TNetworkAddress::TNetworkAddress(const TString& host, ui16 port)
  876. : Impl_(new TImpl(host.data(), port, 0))
  877. {
  878. }
  879. TNetworkAddress::TNetworkAddress(ui16 port)
  880. : Impl_(new TImpl(nullptr, port, 0))
  881. {
  882. }
  883. TNetworkAddress::~TNetworkAddress() = default;
  884. struct addrinfo* TNetworkAddress::Info() const noexcept {
  885. return Impl_->Info();
  886. }
  887. TNetworkResolutionError::TNetworkResolutionError(int error) {
  888. const char* errMsg = nullptr;
  889. #ifdef _win_
  890. errMsg = LastSystemErrorText(error); // gai_strerror is not thread-safe on Windows
  891. #else
  892. errMsg = gai_strerror(error);
  893. #endif
  894. (*this) << errMsg << "(" << error;
  895. #if defined(_unix_)
  896. if (error == EAI_SYSTEM) {
  897. (*this) << "; errno=" << LastSystemError();
  898. }
  899. #endif
  900. (*this) << "): ";
  901. }
  902. #if defined(_unix_)
  903. static inline int GetFlags(int fd) {
  904. const int ret = fcntl(fd, F_GETFL);
  905. if (ret == -1) {
  906. ythrow TSystemError() << "can not get fd flags";
  907. }
  908. return ret;
  909. }
  910. static inline void SetFlags(int fd, int flags) {
  911. if (fcntl(fd, F_SETFL, flags) == -1) {
  912. ythrow TSystemError() << "can not set fd flags";
  913. }
  914. }
  915. static inline void EnableFlag(int fd, int flag) {
  916. const int oldf = GetFlags(fd);
  917. const int newf = oldf | flag;
  918. if (oldf != newf) {
  919. SetFlags(fd, newf);
  920. }
  921. }
  922. static inline void DisableFlag(int fd, int flag) {
  923. const int oldf = GetFlags(fd);
  924. const int newf = oldf & (~flag);
  925. if (oldf != newf) {
  926. SetFlags(fd, newf);
  927. }
  928. }
  929. static inline void SetFlag(int fd, int flag, bool value) {
  930. if (value) {
  931. EnableFlag(fd, flag);
  932. } else {
  933. DisableFlag(fd, flag);
  934. }
  935. }
  936. static inline bool FlagsAreEnabled(int fd, int flags) {
  937. return GetFlags(fd) & flags;
  938. }
  939. #endif
  940. #if defined(_win_)
  941. static inline void SetNonBlockSocket(SOCKET fd, int value) {
  942. unsigned long inbuf = value;
  943. unsigned long outbuf = 0;
  944. DWORD written = 0;
  945. if (!inbuf) {
  946. WSAEventSelect(fd, nullptr, 0);
  947. }
  948. if (WSAIoctl(fd, FIONBIO, &inbuf, sizeof(inbuf), &outbuf, sizeof(outbuf), &written, 0, 0) == SOCKET_ERROR) {
  949. ythrow TSystemError() << "can not set non block socket state";
  950. }
  951. }
  952. static inline bool IsNonBlockSocket(SOCKET fd) {
  953. unsigned long buf = 0;
  954. if (WSAIoctl(fd, FIONBIO, 0, 0, &buf, sizeof(buf), 0, 0, 0) == SOCKET_ERROR) {
  955. ythrow TSystemError() << "can not get non block socket state";
  956. }
  957. return buf;
  958. }
  959. #endif
  960. void SetNonBlock(SOCKET fd, bool value) {
  961. #if defined(_unix_)
  962. #if defined(FIONBIO)
  963. Y_UNUSED(SetFlag); // shut up clang about unused function
  964. int nb = value;
  965. if (ioctl(fd, FIONBIO, &nb) < 0) {
  966. ythrow TSystemError() << "ioctl failed";
  967. }
  968. #else
  969. SetFlag(fd, O_NONBLOCK, value);
  970. #endif
  971. #elif defined(_win_)
  972. SetNonBlockSocket(fd, value);
  973. #else
  974. #error todo
  975. #endif
  976. }
  977. bool IsNonBlock(SOCKET fd) {
  978. #if defined(_unix_)
  979. return FlagsAreEnabled(fd, O_NONBLOCK);
  980. #elif defined(_win_)
  981. return IsNonBlockSocket(fd);
  982. #else
  983. #error todo
  984. #endif
  985. }
  986. void SetDeferAccept(SOCKET s) {
  987. (void)s;
  988. #if defined(TCP_DEFER_ACCEPT)
  989. CheckedSetSockOpt(s, IPPROTO_TCP, TCP_DEFER_ACCEPT, 10, "defer accept");
  990. #endif
  991. #if defined(SO_ACCEPTFILTER)
  992. struct accept_filter_arg afa;
  993. Zero(afa);
  994. strcpy(afa.af_name, "dataready");
  995. SetSockOpt(s, SOL_SOCKET, SO_ACCEPTFILTER, afa);
  996. #endif
  997. }
  998. ssize_t PollD(struct pollfd fds[], nfds_t nfds, const TInstant& deadLine) noexcept {
  999. TInstant now = TInstant::Now();
  1000. do {
  1001. const TDuration toWait = PollStep(deadLine, now);
  1002. const int res = poll(fds, nfds, MicroToMilli(toWait.MicroSeconds()));
  1003. if (res > 0) {
  1004. return res;
  1005. }
  1006. if (res < 0) {
  1007. const int err = LastSystemError();
  1008. if (err != ETIMEDOUT && err != EINTR) {
  1009. return -err;
  1010. }
  1011. }
  1012. } while ((now = TInstant::Now()) < deadLine);
  1013. return -ETIMEDOUT;
  1014. }
  1015. void ShutDown(SOCKET s, int mode) {
  1016. if (shutdown(s, mode)) {
  1017. ythrow TSystemError() << "shutdown socket error";
  1018. }
  1019. }