network.cpp 9.4 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325
  1. #include "impl.h"
  2. #include "network.h"
  3. #include <util/generic/scope.h>
  4. #include <util/generic/xrange.h>
  5. #include <sys/uio.h>
  6. #if defined(_bionic_)
  7. # define IOV_MAX 1024
  8. #endif
  9. namespace NCoro {
  10. namespace {
  11. bool IsBlocked(int lasterr) noexcept {
  12. return lasterr == EAGAIN || lasterr == EWOULDBLOCK;
  13. }
  14. ssize_t DoReadVector(SOCKET fd, TContIOVector* vec) noexcept {
  15. return readv(fd, (const iovec*) vec->Parts(), Min(IOV_MAX, (int) vec->Count()));
  16. }
  17. ssize_t DoWriteVector(SOCKET fd, TContIOVector* vec) noexcept {
  18. return writev(fd, (const iovec*) vec->Parts(), Min(IOV_MAX, (int) vec->Count()));
  19. }
  20. }
  21. int SelectD(TCont* cont, SOCKET fds[], int what[], size_t nfds, SOCKET* outfd, TInstant deadline) noexcept {
  22. if (cont->Cancelled()) {
  23. return ECANCELED;
  24. }
  25. if (nfds == 0) {
  26. return 0;
  27. }
  28. TTempArray<TFdEvent> events(nfds);
  29. for (auto i : xrange(nfds)) {
  30. new(events.Data() + i) TFdEvent(cont, fds[i], (ui16) what[i], deadline);
  31. }
  32. Y_DEFER {
  33. for (auto i : xrange(nfds)) {
  34. (events.Data() + i)->~TFdEvent();
  35. }
  36. };
  37. for (auto i : xrange(nfds)) {
  38. cont->Executor()->ScheduleIoWait(events.Data() + i);
  39. }
  40. cont->Switch();
  41. if (cont->Cancelled()) {
  42. return ECANCELED;
  43. }
  44. TFdEvent* ret = nullptr;
  45. int status = EINPROGRESS;
  46. for (auto i : xrange(nfds)) {
  47. auto& ev = *(events.Data() + i);
  48. switch (ev.Status()) {
  49. case EINPROGRESS:
  50. break;
  51. case ETIMEDOUT:
  52. if (status != EINPROGRESS) {
  53. break;
  54. }
  55. [[fallthrough]];
  56. default:
  57. status = ev.Status();
  58. ret = &ev;
  59. }
  60. }
  61. if (ret) {
  62. if (outfd) {
  63. *outfd = ret->Fd();
  64. }
  65. return ret->Status();
  66. }
  67. return EINPROGRESS;
  68. }
  69. int SelectT(TCont* cont, SOCKET fds[], int what[], size_t nfds, SOCKET* outfd, TDuration timeout) noexcept {
  70. return SelectD(cont, fds, what, nfds, outfd, timeout.ToDeadLine());
  71. }
  72. int SelectI(TCont* cont, SOCKET fds[], int what[], size_t nfds, SOCKET* outfd) {
  73. return SelectD(cont, fds, what, nfds, outfd, TInstant::Max());
  74. }
  75. int PollD(TCont* cont, SOCKET fd, int what, TInstant deadline) noexcept {
  76. TFdEvent event(cont, fd, (ui16)what, deadline);
  77. return ExecuteEvent(&event);
  78. }
  79. int PollT(TCont* cont, SOCKET fd, int what, TDuration timeout) noexcept {
  80. return PollD(cont, fd, what, timeout.ToDeadLine());
  81. }
  82. int PollI(TCont* cont, SOCKET fd, int what) noexcept {
  83. return PollD(cont, fd, what, TInstant::Max());
  84. }
  85. TContIOStatus ReadVectorD(TCont* cont, SOCKET fd, TContIOVector* vec, TInstant deadline) noexcept {
  86. while (true) {
  87. ssize_t res = DoReadVector(fd, vec);
  88. if (res >= 0) {
  89. return TContIOStatus::Success((size_t) res);
  90. }
  91. {
  92. const int err = LastSystemError();
  93. if (!IsBlocked(err)) {
  94. return TContIOStatus::Error(err);
  95. }
  96. }
  97. if ((res = PollD(cont, fd, CONT_POLL_READ, deadline)) != 0) {
  98. return TContIOStatus::Error((int) res);
  99. }
  100. }
  101. }
  102. TContIOStatus ReadVectorT(TCont* cont, SOCKET fd, TContIOVector* vec, TDuration timeOut) noexcept {
  103. return ReadVectorD(cont, fd, vec, timeOut.ToDeadLine());
  104. }
  105. TContIOStatus ReadVectorI(TCont* cont, SOCKET fd, TContIOVector* vec) noexcept {
  106. return ReadVectorD(cont, fd, vec, TInstant::Max());
  107. }
  108. TContIOStatus ReadD(TCont* cont, SOCKET fd, void* buf, size_t len, TInstant deadline) noexcept {
  109. IOutputStream::TPart part(buf, len);
  110. TContIOVector vec(&part, 1);
  111. return ReadVectorD(cont, fd, &vec, deadline);
  112. }
  113. TContIOStatus ReadT(TCont* cont, SOCKET fd, void* buf, size_t len, TDuration timeout) noexcept {
  114. return ReadD(cont, fd, buf, len, timeout.ToDeadLine());
  115. }
  116. TContIOStatus ReadI(TCont* cont, SOCKET fd, void* buf, size_t len) noexcept {
  117. return ReadD(cont, fd, buf, len, TInstant::Max());
  118. }
  119. TContIOStatus WriteVectorD(TCont* cont, SOCKET fd, TContIOVector* vec, TInstant deadline) noexcept {
  120. size_t written = 0;
  121. while (!vec->Complete()) {
  122. ssize_t res = DoWriteVector(fd, vec);
  123. if (res >= 0) {
  124. written += res;
  125. vec->Proceed((size_t) res);
  126. } else {
  127. {
  128. const int err = LastSystemError();
  129. if (!IsBlocked(err)) {
  130. return TContIOStatus(written, err);
  131. }
  132. }
  133. if ((res = PollD(cont, fd, CONT_POLL_WRITE, deadline)) != 0) {
  134. return TContIOStatus(written, (int) res);
  135. }
  136. }
  137. }
  138. return TContIOStatus::Success(written);
  139. }
  140. TContIOStatus WriteVectorT(TCont* cont, SOCKET fd, TContIOVector* vec, TDuration timeOut) noexcept {
  141. return WriteVectorD(cont, fd, vec, timeOut.ToDeadLine());
  142. }
  143. TContIOStatus WriteVectorI(TCont* cont, SOCKET fd, TContIOVector* vec) noexcept {
  144. return WriteVectorD(cont, fd, vec, TInstant::Max());
  145. }
  146. TContIOStatus WriteD(TCont* cont, SOCKET fd, const void* buf, size_t len, TInstant deadline) noexcept {
  147. IOutputStream::TPart part(buf, len);
  148. TContIOVector vec(&part, 1);
  149. return WriteVectorD(cont, fd, &vec, deadline);
  150. }
  151. TContIOStatus WriteT(TCont* cont, SOCKET fd, const void* buf, size_t len, TDuration timeout) noexcept {
  152. return WriteD(cont, fd, buf, len, timeout.ToDeadLine());
  153. }
  154. TContIOStatus WriteI(TCont* cont, SOCKET fd, const void* buf, size_t len) noexcept {
  155. return WriteD(cont, fd, buf, len, TInstant::Max());
  156. }
  157. int ConnectD(TCont* cont, TSocketHolder& s, const struct addrinfo& ai, TInstant deadline) noexcept {
  158. TSocketHolder res(Socket(ai));
  159. if (res.Closed()) {
  160. return LastSystemError();
  161. }
  162. const int ret = ConnectD(cont, res, ai.ai_addr, (socklen_t) ai.ai_addrlen, deadline);
  163. if (!ret) {
  164. s.Swap(res);
  165. }
  166. return ret;
  167. }
  168. int ConnectD(TCont* cont, TSocketHolder& s, const TNetworkAddress& addr, TInstant deadline) noexcept {
  169. int ret = EHOSTUNREACH;
  170. for (auto it = addr.Begin(); it != addr.End(); ++it) {
  171. ret = ConnectD(cont, s, *it, deadline);
  172. if (ret == 0 || ret == ETIMEDOUT) {
  173. return ret;
  174. }
  175. }
  176. return ret;
  177. }
  178. int ConnectT(TCont* cont, TSocketHolder& s, const TNetworkAddress& addr, TDuration timeout) noexcept {
  179. return ConnectD(cont, s, addr, timeout.ToDeadLine());
  180. }
  181. int ConnectI(TCont* cont, TSocketHolder& s, const TNetworkAddress& addr) noexcept {
  182. return ConnectD(cont, s, addr, TInstant::Max());
  183. }
  184. int ConnectD(TCont* cont, SOCKET s, const struct sockaddr* name, socklen_t namelen, TInstant deadline) noexcept {
  185. if (connect(s, name, namelen)) {
  186. const int err = LastSystemError();
  187. if (!IsBlocked(err) && err != EINPROGRESS) {
  188. return err;
  189. }
  190. int ret = PollD(cont, s, CONT_POLL_WRITE, deadline);
  191. if (ret) {
  192. return ret;
  193. }
  194. // check if we really connected
  195. // FIXME: Unportable ??
  196. int serr = 0;
  197. socklen_t slen = sizeof(serr);
  198. ret = getsockopt(s, SOL_SOCKET, SO_ERROR, (char*) &serr, &slen);
  199. if (ret) {
  200. return LastSystemError();
  201. }
  202. if (serr) {
  203. return serr;
  204. }
  205. }
  206. return 0;
  207. }
  208. int ConnectT(TCont* cont, SOCKET s, const struct sockaddr* name, socklen_t namelen, TDuration timeout) noexcept {
  209. return ConnectD(cont, s, name, namelen, timeout.ToDeadLine());
  210. }
  211. int ConnectI(TCont* cont, SOCKET s, const struct sockaddr* name, socklen_t namelen) noexcept {
  212. return ConnectD(cont, s, name, namelen, TInstant::Max());
  213. }
  214. int AcceptD(TCont* cont, SOCKET s, struct sockaddr* addr, socklen_t* addrlen, TInstant deadline) noexcept {
  215. SOCKET ret;
  216. while ((ret = Accept4(s, addr, addrlen)) == INVALID_SOCKET) {
  217. int err = LastSystemError();
  218. if (!IsBlocked(err)) {
  219. return -err;
  220. }
  221. err = PollD(cont, s, CONT_POLL_READ, deadline);
  222. if (err) {
  223. return -err;
  224. }
  225. }
  226. return (int) ret;
  227. }
  228. int AcceptT(TCont* cont, SOCKET s, struct sockaddr* addr, socklen_t* addrlen, TDuration timeout) noexcept {
  229. return AcceptD(cont, s, addr, addrlen, timeout.ToDeadLine());
  230. }
  231. int AcceptI(TCont* cont, SOCKET s, struct sockaddr* addr, socklen_t* addrlen) noexcept {
  232. return AcceptD(cont, s, addr, addrlen, TInstant::Max());
  233. }
  234. SOCKET Socket(int domain, int type, int protocol) noexcept {
  235. return Socket4(domain, type, protocol);
  236. }
  237. SOCKET Socket(const struct addrinfo& ai) noexcept {
  238. return Socket(ai.ai_family, ai.ai_socktype, ai.ai_protocol);
  239. }
  240. }