socket.cpp 30 KB

12345678910111213141516171819202122232425262728293031323334353637383940414243444546474849505152535455565758596061626364656667686970717273747576777879808182838485868788899091929394959697989910010110210310410510610710810911011111211311411511611711811912012112212312412512612712812913013113213313413513613713813914014114214314414514614714814915015115215315415515615715815916016116216316416516616716816917017117217317417517617717817918018118218318418518618718818919019119219319419519619719819920020120220320420520620720820921021121221321421521621721821922022122222322422522622722822923023123223323423523623723823924024124224324424524624724824925025125225325425525625725825926026126226326426526626726826927027127227327427527627727827928028128228328428528628728828929029129229329429529629729829930030130230330430530630730830931031131231331431531631731831932032132232332432532632732832933033133233333433533633733833934034134234334434534634734834935035135235335435535635735835936036136236336436536636736836937037137237337437537637737837938038138238338438538638738838939039139239339439539639739839940040140240340440540640740840941041141241341441541641741841942042142242342442542642742842943043143243343443543643743843944044144244344444544644744844945045145245345445545645745845946046146246346446546646746846947047147247347447547647747847948048148248348448548648748848949049149249349449549649749849950050150250350450550650750850951051151251351451551651751851952052152252352452552652752852953053153253353453553653753853954054154254354454554654754854955055155255355455555655755855956056156256356456556656756856957057157257357457557657757857958058158258358458558658758858959059159259359459559659759859960060160260360460560660760860961061161261361461561661761861962062162262362462562662762862963063163263363463563663763863964064164264364464564664764864965065165265365465565665765865966066166266366466566666766866967067167267367467567667767867968068168268368468568668768868969069169269369469569669769869970070170270370470570670770870971071171271371471571671771871972072172272372472572672772872973073173273373473573673773873974074174274374474574674774874975075175275375475575675775875976076176276376476576676776876977077177277377477577677777877978078178278378478578678778878979079179279379479579679779879980080180280380480580680780880981081181281381481581681781881982082182282382482582682782882983083183283383483583683783883984084184284384484584684784884985085185285385485585685785885986086186286386486586686786886987087187287387487587687787887988088188288388488588688788888989089189289389489589689789889990090190290390490590690790890991091191291391491591691791891992092192292392492592692792892993093193293393493593693793893994094194294394494594694794894995095195295395495595695795895996096196296396496596696796896997097197297397497597697797897998098198298398498598698798898999099199299399499599699799899910001001100210031004100510061007100810091010101110121013101410151016101710181019102010211022102310241025102610271028102910301031103210331034103510361037103810391040104110421043104410451046104710481049105010511052105310541055105610571058105910601061106210631064106510661067106810691070107110721073107410751076107710781079108010811082108310841085108610871088108910901091109210931094109510961097109810991100110111021103110411051106110711081109111011111112111311141115111611171118111911201121112211231124112511261127112811291130113111321133113411351136113711381139114011411142114311441145114611471148114911501151115211531154115511561157115811591160116111621163116411651166116711681169117011711172117311741175117611771178117911801181118211831184118511861187118811891190119111921193119411951196119711981199120012011202120312041205120612071208120912101211121212131214121512161217121812191220122112221223122412251226122712281229123012311232123312341235123612371238123912401241124212431244124512461247124812491250125112521253
  1. #include "ip.h"
  2. #include "socket.h"
  3. #include "address.h"
  4. #include "pollerimpl.h"
  5. #include "iovec.h"
  6. #include <util/system/defaults.h>
  7. #include <util/system/byteorder.h>
  8. #if defined(_unix_)
  9. #include <netdb.h>
  10. #include <sys/types.h>
  11. #include <sys/socket.h>
  12. #include <sys/un.h>
  13. #include <sys/ioctl.h>
  14. #include <netinet/in.h>
  15. #include <netinet/tcp.h>
  16. #include <arpa/inet.h>
  17. #endif
  18. #if defined(_freebsd_)
  19. #include <sys/module.h>
  20. #define ACCEPT_FILTER_MOD
  21. #include <sys/socketvar.h>
  22. #endif
  23. #if defined(_win_)
  24. #include <cerrno>
  25. #include <winsock2.h>
  26. #include <ws2tcpip.h>
  27. #include <wspiapi.h>
  28. #include <util/system/compat.h>
  29. #endif
  30. #include <util/generic/ylimits.h>
  31. #include <util/string/cast.h>
  32. #include <util/stream/mem.h>
  33. #include <util/system/datetime.h>
  34. #include <util/system/error.h>
  35. #include <util/memory/tempbuf.h>
  36. #include <util/generic/singleton.h>
  37. #include <util/generic/hash_set.h>
  38. #include <stddef.h>
  39. #include <sys/uio.h>
  40. using namespace NAddr;
  41. #if defined(_win_)
  42. int inet_aton(const char* cp, struct in_addr* inp) {
  43. sockaddr_in addr;
  44. addr.sin_family = AF_INET;
  45. int psz = sizeof(addr);
  46. if (0 == WSAStringToAddress((char*)cp, AF_INET, nullptr, (LPSOCKADDR)&addr, &psz)) {
  47. memcpy(inp, &addr.sin_addr, sizeof(in_addr));
  48. return 1;
  49. }
  50. return 0;
  51. }
  52. #if (_WIN32_WINNT < 0x0600)
  53. const char* inet_ntop(int af, const void* src, char* dst, socklen_t size) {
  54. if (af != AF_INET) {
  55. errno = EINVAL;
  56. return 0;
  57. }
  58. const ui8* ia = (ui8*)src;
  59. if (snprintf(dst, size, "%u.%u.%u.%u", ia[0], ia[1], ia[2], ia[3]) >= (int)size) {
  60. errno = ENOSPC;
  61. return 0;
  62. }
  63. return dst;
  64. }
  65. struct evpair {
  66. int event;
  67. int winevent;
  68. };
  69. static const evpair evpairs_to_win[] = {
  70. {POLLIN, FD_READ | FD_CLOSE | FD_ACCEPT},
  71. {POLLRDNORM, FD_READ | FD_CLOSE | FD_ACCEPT},
  72. {POLLRDBAND, -1},
  73. {POLLPRI, -1},
  74. {POLLOUT, FD_WRITE | FD_CLOSE},
  75. {POLLWRNORM, FD_WRITE | FD_CLOSE},
  76. {POLLWRBAND, -1},
  77. {POLLERR, 0},
  78. {POLLHUP, 0},
  79. {POLLNVAL, 0}};
  80. static const size_t nevpairs_to_win = sizeof(evpairs_to_win) / sizeof(evpairs_to_win[0]);
  81. static const evpair evpairs_to_unix[] = {
  82. {FD_ACCEPT, POLLIN | POLLRDNORM},
  83. {FD_READ, POLLIN | POLLRDNORM},
  84. {FD_WRITE, POLLOUT | POLLWRNORM},
  85. {FD_CLOSE, POLLHUP},
  86. };
  87. static const size_t nevpairs_to_unix = sizeof(evpairs_to_unix) / sizeof(evpairs_to_unix[0]);
  88. static int convert_events(int events, const evpair* evpairs, size_t nevpairs, bool ignoreUnknown) noexcept {
  89. int result = 0;
  90. for (size_t i = 0; i < nevpairs; ++i) {
  91. int event = evpairs[i].event;
  92. if (events & event) {
  93. events ^= event;
  94. long winEvent = evpairs[i].winevent;
  95. if (winEvent == -1)
  96. return -1;
  97. if (winEvent == 0)
  98. continue;
  99. result |= winEvent;
  100. }
  101. }
  102. if (events != 0 && !ignoreUnknown)
  103. return -1;
  104. return result;
  105. }
  106. class TWSAEventHolder {
  107. private:
  108. HANDLE Event;
  109. public:
  110. inline TWSAEventHolder(HANDLE event) noexcept
  111. : Event(event)
  112. {
  113. }
  114. inline ~TWSAEventHolder() {
  115. WSACloseEvent(Event);
  116. }
  117. inline HANDLE Get() noexcept {
  118. return Event;
  119. }
  120. };
  121. int poll(struct pollfd fds[], nfds_t nfds, int timeout) noexcept {
  122. HANDLE rawEvent = WSACreateEvent();
  123. if (rawEvent == WSA_INVALID_EVENT) {
  124. errno = EIO;
  125. return -1;
  126. }
  127. TWSAEventHolder event(rawEvent);
  128. int checked_sockets = 0;
  129. for (pollfd* fd = fds; fd < fds + nfds; ++fd) {
  130. int win_events = convert_events(fd->events, evpairs_to_win, nevpairs_to_win, false);
  131. if (win_events == -1) {
  132. errno = EINVAL;
  133. return -1;
  134. }
  135. fd->revents = 0;
  136. if (WSAEventSelect(fd->fd, event.Get(), win_events)) {
  137. int error = WSAGetLastError();
  138. if (error == WSAEINVAL || error == WSAENOTSOCK) {
  139. fd->revents = POLLNVAL;
  140. ++checked_sockets;
  141. } else {
  142. errno = EIO;
  143. return -1;
  144. }
  145. }
  146. fd_set readfds;
  147. fd_set writefds;
  148. struct timeval timeout = {0, 0};
  149. FD_ZERO(&readfds);
  150. FD_ZERO(&writefds);
  151. if (fd->events & POLLIN) {
  152. FD_SET(fd->fd, &readfds);
  153. }
  154. if (fd->events & POLLOUT) {
  155. FD_SET(fd->fd, &writefds);
  156. }
  157. int error = select(0, &readfds, &writefds, nullptr, &timeout);
  158. if (error > 0) {
  159. if (FD_ISSET(fd->fd, &readfds)) {
  160. fd->revents |= POLLIN;
  161. }
  162. if (FD_ISSET(fd->fd, &writefds)) {
  163. fd->revents |= POLLOUT;
  164. }
  165. ++checked_sockets;
  166. }
  167. }
  168. if (checked_sockets > 0) {
  169. // returns without wait since we already have sockets in desired conditions
  170. return checked_sockets;
  171. }
  172. HANDLE events[] = {event.Get()};
  173. DWORD wait_result = WSAWaitForMultipleEvents(1, events, TRUE, timeout, FALSE);
  174. if (wait_result == WSA_WAIT_TIMEOUT)
  175. return 0;
  176. else if (wait_result == WSA_WAIT_EVENT_0) {
  177. for (pollfd* fd = fds; fd < fds + nfds; ++fd) {
  178. if (fd->revents == POLLNVAL)
  179. continue;
  180. WSANETWORKEVENTS network_events;
  181. if (WSAEnumNetworkEvents(fd->fd, event.Get(), &network_events)) {
  182. errno = EIO;
  183. return -1;
  184. }
  185. fd->revents = 0;
  186. for (int i = 0; i < FD_MAX_EVENTS; ++i) {
  187. if ((network_events.lNetworkEvents & (1 << i)) != 0 && network_events.iErrorCode[i]) {
  188. fd->revents = POLLERR;
  189. break;
  190. }
  191. }
  192. if (fd->revents == POLLERR)
  193. continue;
  194. if (network_events.lNetworkEvents) {
  195. fd->revents = static_cast<short>(convert_events(network_events.lNetworkEvents, evpairs_to_unix, nevpairs_to_unix, true));
  196. if (fd->revents & POLLHUP) {
  197. fd->revents &= POLLHUP | POLLIN | POLLRDNORM;
  198. }
  199. }
  200. }
  201. int chanded_sockets = 0;
  202. for (pollfd* fd = fds; fd < fds + nfds; ++fd)
  203. if (fd->revents != 0)
  204. ++chanded_sockets;
  205. return chanded_sockets;
  206. } else {
  207. errno = EIO;
  208. return -1;
  209. }
  210. }
  211. #endif
  212. #endif
  213. bool GetRemoteAddr(SOCKET Socket, char* str, socklen_t size) {
  214. if (!size) {
  215. return false;
  216. }
  217. TOpaqueAddr addr;
  218. if (getpeername(Socket, addr.MutableAddr(), addr.LenPtr()) != 0) {
  219. return false;
  220. }
  221. try {
  222. TMemoryOutput out(str, size - 1);
  223. PrintHost(out, addr);
  224. *out.Buf() = 0;
  225. return true;
  226. } catch (...) {
  227. // ¯\_(ツ)_/¯
  228. }
  229. return false;
  230. }
  231. void SetSocketTimeout(SOCKET s, long timeout) {
  232. SetSocketTimeout(s, timeout, 0);
  233. }
  234. void SetSocketTimeout(SOCKET s, long sec, long msec) {
  235. #ifdef SO_SNDTIMEO
  236. #ifdef _darwin_
  237. const timeval timeout = {sec, (__darwin_suseconds_t)msec * 1000};
  238. #elif defined(_unix_)
  239. const timeval timeout = {sec, msec * 1000};
  240. #else
  241. const int timeout = sec * 1000 + msec;
  242. #endif
  243. CheckedSetSockOpt(s, SOL_SOCKET, SO_RCVTIMEO, timeout, "recv timeout");
  244. CheckedSetSockOpt(s, SOL_SOCKET, SO_SNDTIMEO, timeout, "send timeout");
  245. #endif
  246. }
  247. void SetLinger(SOCKET s, bool on, unsigned len) {
  248. #ifdef SO_LINGER
  249. struct linger l = {on, (u_short)len};
  250. CheckedSetSockOpt(s, SOL_SOCKET, SO_LINGER, l, "linger");
  251. #endif
  252. }
  253. void SetZeroLinger(SOCKET s) {
  254. SetLinger(s, 1, 0);
  255. }
  256. void SetKeepAlive(SOCKET s, bool value) {
  257. CheckedSetSockOpt(s, SOL_SOCKET, SO_KEEPALIVE, (int)value, "keepalive");
  258. }
  259. void SetOutputBuffer(SOCKET s, unsigned value) {
  260. CheckedSetSockOpt(s, SOL_SOCKET, SO_SNDBUF, value, "output buffer");
  261. }
  262. void SetInputBuffer(SOCKET s, unsigned value) {
  263. CheckedSetSockOpt(s, SOL_SOCKET, SO_RCVBUF, value, "input buffer");
  264. }
  265. void SetReusePort(SOCKET s, bool value) {
  266. #if defined(_unix_)
  267. CheckedSetSockOpt(s, SOL_SOCKET, SO_REUSEPORT, (int)value, "reuse port");
  268. #else
  269. Y_UNUSED(s);
  270. Y_UNUSED(value);
  271. ythrow TSystemError(ENOSYS) << "SO_REUSEPORT is not available on Windows";
  272. #endif
  273. }
  274. void SetNoDelay(SOCKET s, bool value) {
  275. CheckedSetSockOpt(s, IPPROTO_TCP, TCP_NODELAY, (int)value, "tcp no delay");
  276. }
  277. void SetCloseOnExec(SOCKET s, bool value) {
  278. #if defined(_unix_)
  279. int flags = fcntl(s, F_GETFD);
  280. if (flags == -1) {
  281. ythrow TSystemError() << "fcntl() failed";
  282. }
  283. if (value) {
  284. flags |= FD_CLOEXEC;
  285. } else {
  286. flags &= ~FD_CLOEXEC;
  287. }
  288. if (fcntl(s, F_SETFD, flags) == -1) {
  289. ythrow TSystemError() << "fcntl() failed";
  290. }
  291. #else
  292. Y_UNUSED(s);
  293. Y_UNUSED(value);
  294. #endif
  295. }
  296. size_t GetMaximumSegmentSize(SOCKET s) {
  297. #if defined(TCP_MAXSEG)
  298. int val;
  299. if (GetSockOpt(s, IPPROTO_TCP, TCP_MAXSEG, val) == 0) {
  300. return (size_t)val;
  301. }
  302. #endif
  303. /*
  304. * probably a good guess...
  305. */
  306. return 8192;
  307. }
  308. size_t GetMaximumTransferUnit(SOCKET /*s*/) {
  309. // for someone who'll dare to write it
  310. // Linux: there rummored to be IP_MTU getsockopt() request
  311. // FreeBSD: request to a socket of type PF_ROUTE
  312. // with peer address as a destination argument
  313. return 8192;
  314. }
  315. int GetSocketToS(SOCKET s) {
  316. TOpaqueAddr addr;
  317. if (getsockname(s, addr.MutableAddr(), addr.LenPtr()) < 0) {
  318. ythrow TSystemError() << "getsockname() failed";
  319. }
  320. return GetSocketToS(s, &addr);
  321. }
  322. int GetSocketToS(SOCKET s, const IRemoteAddr* addr) {
  323. int result = 0;
  324. switch (addr->Addr()->sa_family) {
  325. case AF_INET:
  326. CheckedGetSockOpt(s, IPPROTO_IP, IP_TOS, result, "tos");
  327. break;
  328. case AF_INET6:
  329. #ifdef IPV6_TCLASS
  330. CheckedGetSockOpt(s, IPPROTO_IPV6, IPV6_TCLASS, result, "tos");
  331. #endif
  332. break;
  333. }
  334. return result;
  335. }
  336. void SetSocketToS(SOCKET s, const NAddr::IRemoteAddr* addr, int tos) {
  337. switch (addr->Addr()->sa_family) {
  338. case AF_INET:
  339. CheckedSetSockOpt(s, IPPROTO_IP, IP_TOS, tos, "tos");
  340. return;
  341. case AF_INET6:
  342. #ifdef IPV6_TCLASS
  343. CheckedSetSockOpt(s, IPPROTO_IPV6, IPV6_TCLASS, tos, "tos");
  344. return;
  345. #endif
  346. break;
  347. }
  348. ythrow yexception() << "SetSocketToS unsupported for family " << addr->Addr()->sa_family;
  349. }
  350. void SetSocketToS(SOCKET s, int tos) {
  351. TOpaqueAddr addr;
  352. if (getsockname(s, addr.MutableAddr(), addr.LenPtr()) < 0) {
  353. ythrow TSystemError() << "getsockname() failed";
  354. }
  355. SetSocketToS(s, &addr, tos);
  356. }
  357. void SetSocketPriority(SOCKET s, int priority) {
  358. #if defined(SO_PRIORITY)
  359. CheckedSetSockOpt(s, SOL_SOCKET, SO_PRIORITY, priority, "priority");
  360. #else
  361. Y_UNUSED(s);
  362. Y_UNUSED(priority);
  363. #endif
  364. }
  365. bool HasLocalAddress(SOCKET socket) {
  366. TOpaqueAddr localAddr;
  367. if (getsockname(socket, localAddr.MutableAddr(), localAddr.LenPtr()) != 0) {
  368. ythrow TSystemError() << "HasLocalAddress: getsockname() failed. ";
  369. }
  370. if (IsLoopback(localAddr)) {
  371. return true;
  372. }
  373. TOpaqueAddr remoteAddr;
  374. if (getpeername(socket, remoteAddr.MutableAddr(), remoteAddr.LenPtr()) != 0) {
  375. ythrow TSystemError() << "HasLocalAddress: getpeername() failed. ";
  376. }
  377. return IsSame(localAddr, remoteAddr);
  378. }
  379. namespace {
  380. #if defined(_linux_)
  381. #if !defined(TCP_FASTOPEN)
  382. #define TCP_FASTOPEN 23
  383. #endif
  384. #endif
  385. #if defined(TCP_FASTOPEN)
  386. struct TTcpFastOpenFeature {
  387. inline TTcpFastOpenFeature()
  388. : HasFastOpen_(false)
  389. {
  390. TSocketHolder tmp(socket(AF_INET, SOCK_STREAM, 0));
  391. int val = 1;
  392. int ret = SetSockOpt(tmp, IPPROTO_TCP, TCP_FASTOPEN, val);
  393. HasFastOpen_ = (ret == 0);
  394. }
  395. inline void SetFastOpen(SOCKET s, int qlen) const {
  396. if (HasFastOpen_) {
  397. CheckedSetSockOpt(s, IPPROTO_TCP, TCP_FASTOPEN, qlen, "setting TCP_FASTOPEN");
  398. }
  399. }
  400. static inline const TTcpFastOpenFeature* Instance() noexcept {
  401. return Singleton<TTcpFastOpenFeature>();
  402. }
  403. bool HasFastOpen_;
  404. };
  405. #endif
  406. }
  407. void SetTcpFastOpen(SOCKET s, int qlen) {
  408. #if defined(TCP_FASTOPEN)
  409. TTcpFastOpenFeature::Instance()->SetFastOpen(s, qlen);
  410. #else
  411. Y_UNUSED(s);
  412. Y_UNUSED(qlen);
  413. #endif
  414. }
  415. static bool IsBlocked(int lasterr) noexcept {
  416. return lasterr == EAGAIN || lasterr == EWOULDBLOCK;
  417. }
  418. struct TUnblockingGuard {
  419. SOCKET S_;
  420. TUnblockingGuard(SOCKET s)
  421. : S_(s)
  422. {
  423. SetNonBlock(S_, true);
  424. }
  425. ~TUnblockingGuard() {
  426. SetNonBlock(S_, false);
  427. }
  428. };
  429. static int MsgPeek(SOCKET s) {
  430. int flags = MSG_PEEK;
  431. #if defined(_win_)
  432. TUnblockingGuard unblocker(s);
  433. Y_UNUSED(unblocker);
  434. #else
  435. flags |= MSG_DONTWAIT;
  436. #endif
  437. char c;
  438. return recv(s, &c, 1, flags);
  439. }
  440. bool IsNotSocketClosedByOtherSide(SOCKET s) {
  441. return HasSocketDataToRead(s) != ESocketReadStatus::SocketClosed;
  442. }
  443. ESocketReadStatus HasSocketDataToRead(SOCKET s) {
  444. const int r = MsgPeek(s);
  445. if (r == -1 && IsBlocked(LastSystemError())) {
  446. return ESocketReadStatus::NoData;
  447. }
  448. if (r > 0) {
  449. return ESocketReadStatus::HasData;
  450. }
  451. return ESocketReadStatus::SocketClosed;
  452. }
  453. #if defined(_win_)
  454. static ssize_t DoSendMsg(SOCKET sock, const struct iovec* iov, int iovcnt) {
  455. return writev(sock, iov, iovcnt);
  456. }
  457. #else
  458. static ssize_t DoSendMsg(SOCKET sock, const struct iovec* iov, int iovcnt) {
  459. struct msghdr message;
  460. Zero(message);
  461. message.msg_iov = const_cast<struct iovec*>(iov);
  462. message.msg_iovlen = iovcnt;
  463. return sendmsg(sock, &message, MSG_NOSIGNAL);
  464. }
  465. #endif
  466. void TSocketHolder::Close() noexcept {
  467. if (Fd_ != INVALID_SOCKET) {
  468. bool ok = (closesocket(Fd_) == 0);
  469. if (!ok) {
  470. // Do not quietly close bad descriptor,
  471. // because often it means double close
  472. // that is disasterous
  473. #ifdef _win_
  474. Y_ABORT_UNLESS(WSAGetLastError() != WSAENOTSOCK, "must not quietly close bad socket descriptor");
  475. #elif defined(_unix_)
  476. Y_ABORT_UNLESS(errno != EBADF, "must not quietly close bad descriptor: fd=%d", int(Fd_));
  477. #else
  478. #error unsupported platform
  479. #endif
  480. }
  481. Fd_ = INVALID_SOCKET;
  482. }
  483. }
  484. class TSocket::TImpl: public TAtomicRefCount<TImpl> {
  485. using TOps = TSocket::TOps;
  486. public:
  487. inline TImpl(SOCKET fd, TOps* ops)
  488. : Fd_(fd)
  489. , Ops_(ops)
  490. {
  491. }
  492. inline ~TImpl() = default;
  493. inline SOCKET Fd() const noexcept {
  494. return Fd_;
  495. }
  496. inline ssize_t Send(const void* data, size_t len) {
  497. return Ops_->Send(Fd_, data, len);
  498. }
  499. inline ssize_t Recv(void* buf, size_t len) {
  500. return Ops_->Recv(Fd_, buf, len);
  501. }
  502. inline ssize_t SendV(const TPart* parts, size_t count) {
  503. return Ops_->SendV(Fd_, parts, count);
  504. }
  505. inline void Close() {
  506. Fd_.Close();
  507. }
  508. private:
  509. TSocketHolder Fd_;
  510. TOps* Ops_;
  511. };
  512. template <>
  513. void Out<const struct addrinfo*>(IOutputStream& os, const struct addrinfo* ai) {
  514. if (ai->ai_flags & AI_CANONNAME) {
  515. os << "`" << ai->ai_canonname << "' ";
  516. }
  517. os << '[';
  518. for (int i = 0; ai; ++i, ai = ai->ai_next) {
  519. if (i > 0) {
  520. os << ", ";
  521. }
  522. os << (const IRemoteAddr&)TAddrInfo(ai);
  523. }
  524. os << ']';
  525. }
  526. template <>
  527. void Out<struct addrinfo*>(IOutputStream& os, struct addrinfo* ai) {
  528. Out<const struct addrinfo*>(os, static_cast<const struct addrinfo*>(ai));
  529. }
  530. template <>
  531. void Out<TNetworkAddress>(IOutputStream& os, const TNetworkAddress& addr) {
  532. os << &*addr.Begin();
  533. }
  534. static inline const struct addrinfo* Iterate(const struct addrinfo* addr, const struct addrinfo* addr0, const int sockerr) {
  535. if (addr->ai_next) {
  536. return addr->ai_next;
  537. }
  538. ythrow TSystemError(sockerr) << "can not connect to " << addr0;
  539. }
  540. static inline SOCKET DoConnectImpl(const struct addrinfo* res, const TInstant& deadLine) {
  541. const struct addrinfo* addr0 = res;
  542. while (res) {
  543. TSocketHolder s(socket(res->ai_family, res->ai_socktype, res->ai_protocol));
  544. if (s.Closed()) {
  545. res = Iterate(res, addr0, LastSystemError());
  546. continue;
  547. }
  548. SetNonBlock(s, true);
  549. if (connect(s, res->ai_addr, (int)res->ai_addrlen)) {
  550. int err = LastSystemError();
  551. if (err == EINPROGRESS || err == EAGAIN || err == EWOULDBLOCK) {
  552. /*
  553. * must wait
  554. */
  555. struct pollfd p = {
  556. (SOCKET)s,
  557. POLLOUT,
  558. 0};
  559. const ssize_t n = PollD(&p, 1, deadLine);
  560. /*
  561. * timeout occured
  562. */
  563. if (n < 0) {
  564. ythrow TSystemError(-(int)n) << "can not connect";
  565. }
  566. CheckedGetSockOpt(s, SOL_SOCKET, SO_ERROR, err, "socket error");
  567. if (!err) {
  568. return s.Release();
  569. }
  570. }
  571. res = Iterate(res, addr0, err);
  572. continue;
  573. }
  574. return s.Release();
  575. }
  576. ythrow yexception() << "something went wrong: nullptr at addrinfo";
  577. }
  578. static inline SOCKET DoConnect(const struct addrinfo* res, const TInstant& deadLine) {
  579. TSocketHolder ret(DoConnectImpl(res, deadLine));
  580. SetNonBlock(ret, false);
  581. return ret.Release();
  582. }
  583. static inline ssize_t DoSendV(SOCKET fd, const struct iovec* iov, size_t count) {
  584. ssize_t ret = -1;
  585. do {
  586. ret = DoSendMsg(fd, iov, (int)count);
  587. } while (ret == -1 && errno == EINTR);
  588. if (ret < 0) {
  589. return -LastSystemError();
  590. }
  591. return ret;
  592. }
  593. template <bool isCompat>
  594. struct TSender {
  595. using TPart = TSocket::TPart;
  596. static inline ssize_t SendV(SOCKET fd, const TPart* parts, size_t count) {
  597. return DoSendV(fd, (const iovec*)parts, count);
  598. }
  599. };
  600. template <>
  601. struct TSender<false> {
  602. using TPart = TSocket::TPart;
  603. static inline ssize_t SendV(SOCKET fd, const TPart* parts, size_t count) {
  604. TTempBuf tempbuf(sizeof(struct iovec) * count);
  605. struct iovec* iov = (struct iovec*)tempbuf.Data();
  606. for (size_t i = 0; i < count; ++i) {
  607. struct iovec& io = iov[i];
  608. const TPart& part = parts[i];
  609. io.iov_base = (char*)part.buf;
  610. io.iov_len = part.len;
  611. }
  612. return DoSendV(fd, iov, count);
  613. }
  614. };
  615. class TCommonSockOps: public TSocket::TOps {
  616. using TPart = TSocket::TPart;
  617. public:
  618. inline TCommonSockOps() noexcept {
  619. }
  620. ~TCommonSockOps() override = default;
  621. ssize_t Send(SOCKET fd, const void* data, size_t len) override {
  622. ssize_t ret = -1;
  623. do {
  624. ret = send(fd, (const char*)data, (int)len, MSG_NOSIGNAL);
  625. } while (ret == -1 && errno == EINTR);
  626. if (ret < 0) {
  627. return -LastSystemError();
  628. }
  629. return ret;
  630. }
  631. ssize_t Recv(SOCKET fd, void* buf, size_t len) override {
  632. ssize_t ret = -1;
  633. do {
  634. ret = recv(fd, (char*)buf, (int)len, 0);
  635. } while (ret == -1 && errno == EINTR);
  636. if (ret < 0) {
  637. return -LastSystemError();
  638. }
  639. return ret;
  640. }
  641. ssize_t SendV(SOCKET fd, const TPart* parts, size_t count) override {
  642. ssize_t ret = SendVImpl(fd, parts, count);
  643. if (ret < 0) {
  644. return ret;
  645. }
  646. size_t len = TContIOVector::Bytes(parts, count);
  647. if ((size_t)ret == len) {
  648. return ret;
  649. }
  650. return SendVPartial(fd, parts, count, ret);
  651. }
  652. inline ssize_t SendVImpl(SOCKET fd, const TPart* parts, size_t count) {
  653. return TSender < (sizeof(iovec) == sizeof(TPart)) && (offsetof(iovec, iov_base) == offsetof(TPart, buf)) && (offsetof(iovec, iov_len) == offsetof(TPart, len)) > ::SendV(fd, parts, count);
  654. }
  655. ssize_t SendVPartial(SOCKET fd, const TPart* constParts, size_t count, size_t written);
  656. };
  657. ssize_t TCommonSockOps::SendVPartial(SOCKET fd, const TPart* constParts, size_t count, size_t written) {
  658. TTempBuf tempbuf(sizeof(TPart) * count);
  659. TPart* parts = (TPart*)tempbuf.Data();
  660. for (size_t i = 0; i < count; ++i) {
  661. parts[i] = constParts[i];
  662. }
  663. TContIOVector vec(parts, count);
  664. vec.Proceed(written);
  665. while (!vec.Complete()) {
  666. ssize_t ret = SendVImpl(fd, vec.Parts(), vec.Count());
  667. if (ret < 0) {
  668. return ret;
  669. }
  670. written += ret;
  671. vec.Proceed((size_t)ret);
  672. }
  673. return written;
  674. }
  675. static inline TSocket::TOps* GetCommonSockOps() noexcept {
  676. return Singleton<TCommonSockOps>();
  677. }
  678. TSocket::TSocket()
  679. : Impl_(new TImpl(INVALID_SOCKET, GetCommonSockOps()))
  680. {
  681. }
  682. TSocket::TSocket(SOCKET fd)
  683. : Impl_(new TImpl(fd, GetCommonSockOps()))
  684. {
  685. }
  686. TSocket::TSocket(SOCKET fd, TOps* ops)
  687. : Impl_(new TImpl(fd, ops))
  688. {
  689. }
  690. TSocket::TSocket(const TNetworkAddress& addr)
  691. : Impl_(new TImpl(DoConnect(addr.Info(), TInstant::Max()), GetCommonSockOps()))
  692. {
  693. }
  694. TSocket::TSocket(const TNetworkAddress& addr, const TDuration& timeOut)
  695. : Impl_(new TImpl(DoConnect(addr.Info(), timeOut.ToDeadLine()), GetCommonSockOps()))
  696. {
  697. }
  698. TSocket::TSocket(const TNetworkAddress& addr, const TInstant& deadLine)
  699. : Impl_(new TImpl(DoConnect(addr.Info(), deadLine), GetCommonSockOps()))
  700. {
  701. }
  702. TSocket::~TSocket() = default;
  703. SOCKET TSocket::Fd() const noexcept {
  704. return Impl_->Fd();
  705. }
  706. ssize_t TSocket::Send(const void* data, size_t len) {
  707. return Impl_->Send(data, len);
  708. }
  709. ssize_t TSocket::Recv(void* buf, size_t len) {
  710. return Impl_->Recv(buf, len);
  711. }
  712. ssize_t TSocket::SendV(const TPart* parts, size_t count) {
  713. return Impl_->SendV(parts, count);
  714. }
  715. void TSocket::Close() {
  716. Impl_->Close();
  717. }
  718. TSocketInput::TSocketInput(const TSocket& s) noexcept
  719. : S_(s)
  720. {
  721. }
  722. TSocketInput::~TSocketInput() = default;
  723. size_t TSocketInput::DoRead(void* buf, size_t len) {
  724. const ssize_t ret = S_.Recv(buf, len);
  725. if (ret >= 0) {
  726. return (size_t)ret;
  727. }
  728. ythrow TSystemError(-(int)ret) << "can not read from socket input stream";
  729. }
  730. TSocketOutput::TSocketOutput(const TSocket& s) noexcept
  731. : S_(s)
  732. {
  733. }
  734. TSocketOutput::~TSocketOutput() {
  735. try {
  736. Finish();
  737. } catch (...) {
  738. // ¯\_(ツ)_/¯
  739. }
  740. }
  741. void TSocketOutput::DoWrite(const void* buf, size_t len) {
  742. size_t send = 0;
  743. while (len) {
  744. const ssize_t ret = S_.Send(buf, len);
  745. if (ret < 0) {
  746. ythrow TSystemError(-(int)ret) << "can not write to socket output stream; " << send << " bytes already send";
  747. }
  748. buf = (const char*)buf + ret;
  749. len -= ret;
  750. send += ret;
  751. }
  752. }
  753. void TSocketOutput::DoWriteV(const TPart* parts, size_t count) {
  754. const ssize_t ret = S_.SendV(parts, count);
  755. if (ret < 0) {
  756. ythrow TSystemError(-(int)ret) << "can not writev to socket output stream";
  757. }
  758. /*
  759. * todo for nonblocking sockets?
  760. */
  761. }
  762. namespace {
  763. //https://bugzilla.mozilla.org/attachment.cgi?id=503263&action=diff
  764. struct TLocalNames: public THashSet<TStringBuf> {
  765. inline TLocalNames() {
  766. insert("localhost");
  767. insert("localhost.localdomain");
  768. insert("localhost6");
  769. insert("localhost6.localdomain6");
  770. insert("::1");
  771. }
  772. inline bool IsLocalName(const char* name) const noexcept {
  773. struct sockaddr_in sa;
  774. memset(&sa, 0, sizeof(sa));
  775. if (inet_pton(AF_INET, name, &(sa.sin_addr)) == 1) {
  776. return (InetToHost(sa.sin_addr.s_addr) >> 24) == 127;
  777. }
  778. return contains(name);
  779. }
  780. };
  781. }
  782. class TNetworkAddress::TImpl: public TAtomicRefCount<TImpl> {
  783. private:
  784. class TAddrInfoDeleter {
  785. public:
  786. TAddrInfoDeleter(bool useFreeAddrInfo = true)
  787. : UseFreeAddrInfo_(useFreeAddrInfo)
  788. {
  789. }
  790. void operator()(struct addrinfo* ai) noexcept {
  791. if (!UseFreeAddrInfo_ && ai != NULL) {
  792. if (ai->ai_addr != NULL) {
  793. free(ai->ai_addr);
  794. }
  795. struct addrinfo* p;
  796. while (ai != NULL) {
  797. p = ai;
  798. ai = ai->ai_next;
  799. free(p->ai_canonname);
  800. free(p);
  801. }
  802. } else if (ai != NULL) {
  803. freeaddrinfo(ai);
  804. }
  805. }
  806. private:
  807. bool UseFreeAddrInfo_ = true;
  808. };
  809. public:
  810. inline TImpl(const char* host, ui16 port, int flags)
  811. : Info_(nullptr, TAddrInfoDeleter{})
  812. {
  813. const TString port_st(ToString(port));
  814. struct addrinfo hints;
  815. memset(&hints, 0, sizeof(hints));
  816. hints.ai_flags = flags;
  817. hints.ai_family = PF_UNSPEC;
  818. hints.ai_socktype = SOCK_STREAM;
  819. if (!host) {
  820. hints.ai_flags |= AI_PASSIVE;
  821. } else {
  822. if (!Singleton<TLocalNames>()->IsLocalName(host)) {
  823. hints.ai_flags |= AI_ADDRCONFIG;
  824. }
  825. }
  826. struct addrinfo* pai = NULL;
  827. const int error = getaddrinfo(host, port_st.data(), &hints, &pai);
  828. if (error) {
  829. TAddrInfoDeleter()(pai);
  830. ythrow TNetworkResolutionError(error) << ": can not resolve " << host << ":" << port;
  831. }
  832. Info_.reset(pai);
  833. }
  834. inline TImpl(const char* path, int flags)
  835. : Info_(nullptr, TAddrInfoDeleter{/* useFreeAddrInfo = */ false})
  836. {
  837. THolder<struct sockaddr_un, TFree> sockAddr(
  838. reinterpret_cast<struct sockaddr_un*>(malloc(sizeof(struct sockaddr_un))));
  839. Y_ENSURE(strlen(path) < sizeof(sockAddr->sun_path), "Unix socket path more than " << sizeof(sockAddr->sun_path));
  840. sockAddr->sun_family = AF_UNIX;
  841. strcpy(sockAddr->sun_path, path);
  842. TAddrInfoPtr hints(reinterpret_cast<struct addrinfo*>(malloc(sizeof(struct addrinfo))), TAddrInfoDeleter{/* useFreeAddrInfo = */ false});
  843. memset(hints.get(), 0, sizeof(*hints));
  844. hints->ai_flags = flags;
  845. hints->ai_family = AF_UNIX;
  846. hints->ai_socktype = SOCK_STREAM;
  847. hints->ai_addrlen = sizeof(*sockAddr);
  848. hints->ai_addr = (struct sockaddr*)sockAddr.Release();
  849. Info_.reset(hints.release());
  850. }
  851. inline struct addrinfo* Info() const noexcept {
  852. return Info_.get();
  853. }
  854. private:
  855. using TAddrInfoPtr = std::unique_ptr<struct addrinfo, TAddrInfoDeleter>;
  856. TAddrInfoPtr Info_;
  857. };
  858. TNetworkAddress::TNetworkAddress(const TUnixSocketPath& unixSocketPath, int flags)
  859. : Impl_(new TImpl(unixSocketPath.Path.data(), flags))
  860. {
  861. }
  862. TNetworkAddress::TNetworkAddress(const TString& host, ui16 port, int flags)
  863. : Impl_(new TImpl(host.data(), port, flags))
  864. {
  865. }
  866. TNetworkAddress::TNetworkAddress(const TString& host, ui16 port)
  867. : Impl_(new TImpl(host.data(), port, 0))
  868. {
  869. }
  870. TNetworkAddress::TNetworkAddress(ui16 port)
  871. : Impl_(new TImpl(nullptr, port, 0))
  872. {
  873. }
  874. TNetworkAddress::~TNetworkAddress() = default;
  875. struct addrinfo* TNetworkAddress::Info() const noexcept {
  876. return Impl_->Info();
  877. }
  878. TNetworkResolutionError::TNetworkResolutionError(int error) {
  879. const char* errMsg = nullptr;
  880. #ifdef _win_
  881. errMsg = LastSystemErrorText(error); // gai_strerror is not thread-safe on Windows
  882. #else
  883. errMsg = gai_strerror(error);
  884. #endif
  885. (*this) << errMsg << "(" << error;
  886. #if defined(_unix_)
  887. if (error == EAI_SYSTEM) {
  888. (*this) << "; errno=" << LastSystemError();
  889. }
  890. #endif
  891. (*this) << "): ";
  892. }
  893. #if defined(_unix_)
  894. static inline int GetFlags(int fd) {
  895. const int ret = fcntl(fd, F_GETFL);
  896. if (ret == -1) {
  897. ythrow TSystemError() << "can not get fd flags";
  898. }
  899. return ret;
  900. }
  901. static inline void SetFlags(int fd, int flags) {
  902. if (fcntl(fd, F_SETFL, flags) == -1) {
  903. ythrow TSystemError() << "can not set fd flags";
  904. }
  905. }
  906. static inline void EnableFlag(int fd, int flag) {
  907. const int oldf = GetFlags(fd);
  908. const int newf = oldf | flag;
  909. if (oldf != newf) {
  910. SetFlags(fd, newf);
  911. }
  912. }
  913. static inline void DisableFlag(int fd, int flag) {
  914. const int oldf = GetFlags(fd);
  915. const int newf = oldf & (~flag);
  916. if (oldf != newf) {
  917. SetFlags(fd, newf);
  918. }
  919. }
  920. static inline void SetFlag(int fd, int flag, bool value) {
  921. if (value) {
  922. EnableFlag(fd, flag);
  923. } else {
  924. DisableFlag(fd, flag);
  925. }
  926. }
  927. static inline bool FlagsAreEnabled(int fd, int flags) {
  928. return GetFlags(fd) & flags;
  929. }
  930. #endif
  931. #if defined(_win_)
  932. static inline void SetNonBlockSocket(SOCKET fd, int value) {
  933. unsigned long inbuf = value;
  934. unsigned long outbuf = 0;
  935. DWORD written = 0;
  936. if (!inbuf) {
  937. WSAEventSelect(fd, nullptr, 0);
  938. }
  939. if (WSAIoctl(fd, FIONBIO, &inbuf, sizeof(inbuf), &outbuf, sizeof(outbuf), &written, 0, 0) == SOCKET_ERROR) {
  940. ythrow TSystemError() << "can not set non block socket state";
  941. }
  942. }
  943. static inline bool IsNonBlockSocket(SOCKET fd) {
  944. unsigned long buf = 0;
  945. if (WSAIoctl(fd, FIONBIO, 0, 0, &buf, sizeof(buf), 0, 0, 0) == SOCKET_ERROR) {
  946. ythrow TSystemError() << "can not get non block socket state";
  947. }
  948. return buf;
  949. }
  950. #endif
  951. void SetNonBlock(SOCKET fd, bool value) {
  952. #if defined(_unix_)
  953. #if defined(FIONBIO)
  954. Y_UNUSED(SetFlag); // shut up clang about unused function
  955. int nb = value;
  956. if (ioctl(fd, FIONBIO, &nb) < 0) {
  957. ythrow TSystemError() << "ioctl failed";
  958. }
  959. #else
  960. SetFlag(fd, O_NONBLOCK, value);
  961. #endif
  962. #elif defined(_win_)
  963. SetNonBlockSocket(fd, value);
  964. #else
  965. #error todo
  966. #endif
  967. }
  968. bool IsNonBlock(SOCKET fd) {
  969. #if defined(_unix_)
  970. return FlagsAreEnabled(fd, O_NONBLOCK);
  971. #elif defined(_win_)
  972. return IsNonBlockSocket(fd);
  973. #else
  974. #error todo
  975. #endif
  976. }
  977. void SetDeferAccept(SOCKET s) {
  978. (void)s;
  979. #if defined(TCP_DEFER_ACCEPT)
  980. CheckedSetSockOpt(s, IPPROTO_TCP, TCP_DEFER_ACCEPT, 10, "defer accept");
  981. #endif
  982. #if defined(SO_ACCEPTFILTER)
  983. struct accept_filter_arg afa;
  984. Zero(afa);
  985. strcpy(afa.af_name, "dataready");
  986. SetSockOpt(s, SOL_SOCKET, SO_ACCEPTFILTER, afa);
  987. #endif
  988. }
  989. ssize_t PollD(struct pollfd fds[], nfds_t nfds, const TInstant& deadLine) noexcept {
  990. TInstant now = TInstant::Now();
  991. do {
  992. const TDuration toWait = PollStep(deadLine, now);
  993. const int res = poll(fds, nfds, MicroToMilli(toWait.MicroSeconds()));
  994. if (res > 0) {
  995. return res;
  996. }
  997. if (res < 0) {
  998. const int err = LastSystemError();
  999. if (err != ETIMEDOUT && err != EINTR) {
  1000. return -err;
  1001. }
  1002. }
  1003. } while ((now = TInstant::Now()) < deadLine);
  1004. return -ETIMEDOUT;
  1005. }
  1006. void ShutDown(SOCKET s, int mode) {
  1007. if (shutdown(s, mode)) {
  1008. ythrow TSystemError() << "shutdown socket error";
  1009. }
  1010. }