socket.c 71 KB

1234567891011121314151617181920212223242526272829303132333435363738394041424344454647484950515253545556575859606162636465666768697071727374757677787980818283848586878889909192939495969798991001011021031041051061071081091101111121131141151161171181191201211221231241251261271281291301311321331341351361371381391401411421431441451461471481491501511521531541551561571581591601611621631641651661671681691701711721731741751761771781791801811821831841851861871881891901911921931941951961971981992002012022032042052062072082092102112122132142152162172182192202212222232242252262272282292302312322332342352362372382392402412422432442452462472482492502512522532542552562572582592602612622632642652662672682692702712722732742752762772782792802812822832842852862872882892902912922932942952962972982993003013023033043053063073083093103113123133143153163173183193203213223233243253263273283293303313323333343353363373383393403413423433443453463473483493503513523533543553563573583593603613623633643653663673683693703713723733743753763773783793803813823833843853863873883893903913923933943953963973983994004014024034044054064074084094104114124134144154164174184194204214224234244254264274284294304314324334344354364374384394404414424434444454464474484494504514524534544554564574584594604614624634644654664674684694704714724734744754764774784794804814824834844854864874884894904914924934944954964974984995005015025035045055065075085095105115125135145155165175185195205215225235245255265275285295305315325335345355365375385395405415425435445455465475485495505515525535545555565575585595605615625635645655665675685695705715725735745755765775785795805815825835845855865875885895905915925935945955965975985996006016026036046056066076086096106116126136146156166176186196206216226236246256266276286296306316326336346356366376386396406416426436446456466476486496506516526536546556566576586596606616626636646656666676686696706716726736746756766776786796806816826836846856866876886896906916926936946956966976986997007017027037047057067077087097107117127137147157167177187197207217227237247257267277287297307317327337347357367377387397407417427437447457467477487497507517527537547557567577587597607617627637647657667677687697707717727737747757767777787797807817827837847857867877887897907917927937947957967977987998008018028038048058068078088098108118128138148158168178188198208218228238248258268278288298308318328338348358368378388398408418428438448458468478488498508518528538548558568578588598608618628638648658668678688698708718728738748758768778788798808818828838848858868878888898908918928938948958968978988999009019029039049059069079089099109119129139149159169179189199209219229239249259269279289299309319329339349359369379389399409419429439449459469479489499509519529539549559569579589599609619629639649659669679689699709719729739749759769779789799809819829839849859869879889899909919929939949959969979989991000100110021003100410051006100710081009101010111012101310141015101610171018101910201021102210231024102510261027102810291030103110321033103410351036103710381039104010411042104310441045104610471048104910501051105210531054105510561057105810591060106110621063106410651066106710681069107010711072107310741075107610771078107910801081108210831084108510861087108810891090109110921093109410951096109710981099110011011102110311041105110611071108110911101111111211131114111511161117111811191120112111221123112411251126112711281129113011311132113311341135113611371138113911401141114211431144114511461147114811491150115111521153115411551156115711581159116011611162116311641165116611671168116911701171117211731174117511761177117811791180118111821183118411851186118711881189119011911192119311941195119611971198119912001201120212031204120512061207120812091210121112121213121412151216121712181219122012211222122312241225122612271228122912301231123212331234123512361237123812391240124112421243124412451246124712481249125012511252125312541255125612571258125912601261126212631264126512661267126812691270127112721273127412751276127712781279128012811282128312841285128612871288128912901291129212931294129512961297129812991300130113021303130413051306130713081309131013111312131313141315131613171318131913201321132213231324132513261327132813291330133113321333133413351336133713381339134013411342134313441345134613471348134913501351135213531354135513561357135813591360136113621363136413651366136713681369137013711372137313741375137613771378137913801381138213831384138513861387138813891390139113921393139413951396139713981399140014011402140314041405140614071408140914101411141214131414141514161417141814191420142114221423142414251426142714281429143014311432143314341435143614371438143914401441144214431444144514461447144814491450145114521453145414551456145714581459146014611462146314641465146614671468146914701471147214731474147514761477147814791480148114821483148414851486148714881489149014911492149314941495149614971498149915001501150215031504150515061507150815091510151115121513151415151516151715181519152015211522152315241525152615271528152915301531153215331534153515361537153815391540154115421543154415451546154715481549155015511552155315541555155615571558155915601561156215631564156515661567156815691570157115721573157415751576157715781579158015811582158315841585158615871588158915901591159215931594159515961597159815991600160116021603160416051606160716081609161016111612161316141615161616171618161916201621162216231624162516261627162816291630163116321633163416351636163716381639164016411642164316441645164616471648164916501651165216531654165516561657165816591660166116621663166416651666166716681669167016711672167316741675167616771678167916801681168216831684168516861687168816891690169116921693169416951696169716981699170017011702170317041705170617071708170917101711171217131714171517161717171817191720172117221723172417251726172717281729173017311732173317341735173617371738173917401741174217431744174517461747174817491750175117521753175417551756175717581759176017611762176317641765176617671768176917701771177217731774177517761777177817791780178117821783178417851786178717881789179017911792179317941795179617971798179918001801180218031804180518061807180818091810181118121813181418151816181718181819182018211822182318241825182618271828182918301831183218331834183518361837183818391840184118421843184418451846184718481849185018511852185318541855185618571858185918601861186218631864186518661867186818691870187118721873187418751876187718781879188018811882188318841885188618871888188918901891189218931894189518961897189818991900190119021903190419051906190719081909
  1. /**
  2. * Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved.
  3. * SPDX-License-Identifier: Apache-2.0.
  4. */
  5. #include <aws/io/socket.h>
  6. #include <aws/common/clock.h>
  7. #include <aws/common/condition_variable.h>
  8. #include <aws/common/mutex.h>
  9. #include <aws/common/string.h>
  10. #include <aws/io/event_loop.h>
  11. #include <aws/io/logging.h>
  12. #include <arpa/inet.h>
  13. #include <aws/io/io.h>
  14. #include <errno.h>
  15. #include <fcntl.h>
  16. #include <inttypes.h>
  17. #include <netinet/tcp.h>
  18. #include <sys/socket.h>
  19. #include <sys/types.h>
  20. #include <unistd.h>
  21. /*
  22. * On OsX, suppress NoPipe signals via flags to setsockopt()
  23. * On Linux, suppress NoPipe signals via flags to send()
  24. */
  25. #if defined(__MACH__)
  26. # define NO_SIGNAL_SOCK_OPT SO_NOSIGPIPE
  27. # define NO_SIGNAL_SEND 0
  28. # define TCP_KEEPIDLE TCP_KEEPALIVE
  29. #else
  30. # define NO_SIGNAL_SEND MSG_NOSIGNAL
  31. #endif
  32. /* This isn't defined on ancient linux distros (breaking the builds).
  33. * However, if this is a prebuild, we purposely build on an ancient system, but
  34. * we want the kernel calls to still be the same as a modern build since that's likely the target of the application
  35. * calling this code. Just define this if it isn't there already. GlibC and the kernel don't really care how the flag
  36. * gets passed as long as it does.
  37. */
  38. #ifndef O_CLOEXEC
  39. # define O_CLOEXEC 02000000
  40. #endif
  41. #ifdef USE_VSOCK
  42. # if defined(__linux__) && defined(AF_VSOCK)
  43. # include <linux/vm_sockets.h>
  44. # else
  45. # error "USE_VSOCK not supported on current platform"
  46. # endif
  47. #endif
  48. /* other than CONNECTED_READ | CONNECTED_WRITE
  49. * a socket is only in one of these states at a time. */
  50. enum socket_state {
  51. INIT = 0x01,
  52. CONNECTING = 0x02,
  53. CONNECTED_READ = 0x04,
  54. CONNECTED_WRITE = 0x08,
  55. BOUND = 0x10,
  56. LISTENING = 0x20,
  57. TIMEDOUT = 0x40,
  58. ERROR = 0x80,
  59. CLOSED,
  60. };
  61. static int s_convert_domain(enum aws_socket_domain domain) {
  62. switch (domain) {
  63. case AWS_SOCKET_IPV4:
  64. return AF_INET;
  65. case AWS_SOCKET_IPV6:
  66. return AF_INET6;
  67. case AWS_SOCKET_LOCAL:
  68. return AF_UNIX;
  69. #ifdef USE_VSOCK
  70. case AWS_SOCKET_VSOCK:
  71. return AF_VSOCK;
  72. #endif
  73. default:
  74. AWS_ASSERT(0);
  75. return AF_INET;
  76. }
  77. }
  78. static int s_convert_type(enum aws_socket_type type) {
  79. switch (type) {
  80. case AWS_SOCKET_STREAM:
  81. return SOCK_STREAM;
  82. case AWS_SOCKET_DGRAM:
  83. return SOCK_DGRAM;
  84. default:
  85. AWS_ASSERT(0);
  86. return SOCK_STREAM;
  87. }
  88. }
  89. static int s_determine_socket_error(int error) {
  90. switch (error) {
  91. case ECONNREFUSED:
  92. return AWS_IO_SOCKET_CONNECTION_REFUSED;
  93. case ECONNRESET:
  94. return AWS_IO_SOCKET_CLOSED;
  95. case ETIMEDOUT:
  96. return AWS_IO_SOCKET_TIMEOUT;
  97. case EHOSTUNREACH:
  98. case ENETUNREACH:
  99. return AWS_IO_SOCKET_NO_ROUTE_TO_HOST;
  100. case EADDRNOTAVAIL:
  101. return AWS_IO_SOCKET_INVALID_ADDRESS;
  102. case ENETDOWN:
  103. return AWS_IO_SOCKET_NETWORK_DOWN;
  104. case ECONNABORTED:
  105. return AWS_IO_SOCKET_CONNECT_ABORTED;
  106. case EADDRINUSE:
  107. return AWS_IO_SOCKET_ADDRESS_IN_USE;
  108. case ENOBUFS:
  109. case ENOMEM:
  110. return AWS_ERROR_OOM;
  111. case EAGAIN:
  112. return AWS_IO_READ_WOULD_BLOCK;
  113. case EMFILE:
  114. case ENFILE:
  115. return AWS_ERROR_MAX_FDS_EXCEEDED;
  116. case ENOENT:
  117. case EINVAL:
  118. return AWS_ERROR_FILE_INVALID_PATH;
  119. case EAFNOSUPPORT:
  120. return AWS_IO_SOCKET_UNSUPPORTED_ADDRESS_FAMILY;
  121. case EACCES:
  122. return AWS_ERROR_NO_PERMISSION;
  123. default:
  124. return AWS_IO_SOCKET_NOT_CONNECTED;
  125. }
  126. }
  127. static int s_create_socket(struct aws_socket *sock, const struct aws_socket_options *options) {
  128. int fd = socket(s_convert_domain(options->domain), s_convert_type(options->type), 0);
  129. int errno_value = errno; /* Always cache errno before potential side-effect */
  130. AWS_LOGF_DEBUG(
  131. AWS_LS_IO_SOCKET,
  132. "id=%p fd=%d: initializing with domain %d and type %d",
  133. (void *)sock,
  134. fd,
  135. options->domain,
  136. options->type);
  137. if (fd != -1) {
  138. int flags = fcntl(fd, F_GETFL, 0);
  139. flags |= O_NONBLOCK | O_CLOEXEC;
  140. int success = fcntl(fd, F_SETFL, flags);
  141. (void)success;
  142. sock->io_handle.data.fd = fd;
  143. sock->io_handle.additional_data = NULL;
  144. return aws_socket_set_options(sock, options);
  145. }
  146. int aws_error = s_determine_socket_error(errno_value);
  147. return aws_raise_error(aws_error);
  148. }
  149. struct posix_socket_connect_args {
  150. struct aws_task task;
  151. struct aws_allocator *allocator;
  152. struct aws_socket *socket;
  153. };
  154. struct posix_socket {
  155. struct aws_linked_list write_queue;
  156. struct aws_linked_list written_queue;
  157. struct aws_task written_task;
  158. struct posix_socket_connect_args *connect_args;
  159. /* Note that only the posix_socket impl part is refcounted.
  160. * The public aws_socket can be a stack variable and cleaned up synchronously
  161. * (by blocking until the event-loop cleans up the impl part).
  162. * In hindsight, aws_socket should have been heap-allocated and refcounted, but alas */
  163. struct aws_ref_count internal_refcount;
  164. struct aws_allocator *allocator;
  165. bool written_task_scheduled;
  166. bool currently_subscribed;
  167. bool continue_accept;
  168. bool *close_happened;
  169. };
  170. static void s_socket_destroy_impl(void *user_data) {
  171. struct posix_socket *socket_impl = user_data;
  172. aws_mem_release(socket_impl->allocator, socket_impl);
  173. }
  174. static int s_socket_init(
  175. struct aws_socket *socket,
  176. struct aws_allocator *alloc,
  177. const struct aws_socket_options *options,
  178. int existing_socket_fd) {
  179. AWS_ASSERT(options);
  180. AWS_ZERO_STRUCT(*socket);
  181. struct posix_socket *posix_socket = aws_mem_calloc(alloc, 1, sizeof(struct posix_socket));
  182. if (!posix_socket) {
  183. socket->impl = NULL;
  184. return AWS_OP_ERR;
  185. }
  186. socket->allocator = alloc;
  187. socket->io_handle.data.fd = -1;
  188. socket->state = INIT;
  189. socket->options = *options;
  190. if (existing_socket_fd < 0) {
  191. int err = s_create_socket(socket, options);
  192. if (err) {
  193. aws_mem_release(alloc, posix_socket);
  194. socket->impl = NULL;
  195. return AWS_OP_ERR;
  196. }
  197. } else {
  198. socket->io_handle = (struct aws_io_handle){
  199. .data = {.fd = existing_socket_fd},
  200. .additional_data = NULL,
  201. };
  202. aws_socket_set_options(socket, options);
  203. }
  204. aws_linked_list_init(&posix_socket->write_queue);
  205. aws_linked_list_init(&posix_socket->written_queue);
  206. posix_socket->currently_subscribed = false;
  207. posix_socket->continue_accept = false;
  208. aws_ref_count_init(&posix_socket->internal_refcount, posix_socket, s_socket_destroy_impl);
  209. posix_socket->allocator = alloc;
  210. posix_socket->connect_args = NULL;
  211. posix_socket->close_happened = NULL;
  212. socket->impl = posix_socket;
  213. return AWS_OP_SUCCESS;
  214. }
  215. int aws_socket_init(struct aws_socket *socket, struct aws_allocator *alloc, const struct aws_socket_options *options) {
  216. AWS_ASSERT(options);
  217. return s_socket_init(socket, alloc, options, -1);
  218. }
  219. void aws_socket_clean_up(struct aws_socket *socket) {
  220. if (!socket->impl) {
  221. /* protect from double clean */
  222. return;
  223. }
  224. int fd_for_logging = socket->io_handle.data.fd; /* socket's fd gets reset before final log */
  225. (void)fd_for_logging;
  226. if (aws_socket_is_open(socket)) {
  227. AWS_LOGF_DEBUG(AWS_LS_IO_SOCKET, "id=%p fd=%d: is still open, closing...", (void *)socket, fd_for_logging);
  228. aws_socket_close(socket);
  229. }
  230. struct posix_socket *socket_impl = socket->impl;
  231. if (aws_ref_count_release(&socket_impl->internal_refcount) != 0) {
  232. AWS_LOGF_DEBUG(
  233. AWS_LS_IO_SOCKET,
  234. "id=%p fd=%d: is still pending io letting it dangle and cleaning up later.",
  235. (void *)socket,
  236. fd_for_logging);
  237. }
  238. AWS_ZERO_STRUCT(*socket);
  239. socket->io_handle.data.fd = -1;
  240. }
  241. /* Update socket->local_endpoint based on the results of getsockname() */
  242. static int s_update_local_endpoint(struct aws_socket *socket) {
  243. struct aws_socket_endpoint tmp_endpoint;
  244. AWS_ZERO_STRUCT(tmp_endpoint);
  245. struct sockaddr_storage address;
  246. AWS_ZERO_STRUCT(address);
  247. socklen_t address_size = sizeof(address);
  248. if (getsockname(socket->io_handle.data.fd, (struct sockaddr *)&address, &address_size) != 0) {
  249. int errno_value = errno; /* Always cache errno before potential side-effect */
  250. AWS_LOGF_ERROR(
  251. AWS_LS_IO_SOCKET,
  252. "id=%p fd=%d: getsockname() failed with error %d",
  253. (void *)socket,
  254. socket->io_handle.data.fd,
  255. errno_value);
  256. int aws_error = s_determine_socket_error(errno_value);
  257. return aws_raise_error(aws_error);
  258. }
  259. if (address.ss_family == AF_INET) {
  260. struct sockaddr_in *s = (struct sockaddr_in *)&address;
  261. tmp_endpoint.port = ntohs(s->sin_port);
  262. if (inet_ntop(AF_INET, &s->sin_addr, tmp_endpoint.address, sizeof(tmp_endpoint.address)) == NULL) {
  263. int errno_value = errno; /* Always cache errno before potential side-effect */
  264. AWS_LOGF_ERROR(
  265. AWS_LS_IO_SOCKET,
  266. "id=%p fd=%d: inet_ntop() failed with error %d",
  267. (void *)socket,
  268. socket->io_handle.data.fd,
  269. errno_value);
  270. int aws_error = s_determine_socket_error(errno_value);
  271. return aws_raise_error(aws_error);
  272. }
  273. } else if (address.ss_family == AF_INET6) {
  274. struct sockaddr_in6 *s = (struct sockaddr_in6 *)&address;
  275. tmp_endpoint.port = ntohs(s->sin6_port);
  276. if (inet_ntop(AF_INET6, &s->sin6_addr, tmp_endpoint.address, sizeof(tmp_endpoint.address)) == NULL) {
  277. int errno_value = errno; /* Always cache errno before potential side-effect */
  278. AWS_LOGF_ERROR(
  279. AWS_LS_IO_SOCKET,
  280. "id=%p fd=%d: inet_ntop() failed with error %d",
  281. (void *)socket,
  282. socket->io_handle.data.fd,
  283. errno_value);
  284. int aws_error = s_determine_socket_error(errno_value);
  285. return aws_raise_error(aws_error);
  286. }
  287. } else if (address.ss_family == AF_UNIX) {
  288. struct sockaddr_un *s = (struct sockaddr_un *)&address;
  289. /* Ensure there's a null-terminator.
  290. * On some platforms it may be missing when the path gets very long. See:
  291. * https://man7.org/linux/man-pages/man7/unix.7.html#BUGS
  292. * But let's keep it simple, and not deal with that madness until someone demands it. */
  293. size_t sun_len;
  294. if (aws_secure_strlen(s->sun_path, sizeof(tmp_endpoint.address), &sun_len)) {
  295. AWS_LOGF_ERROR(
  296. AWS_LS_IO_SOCKET,
  297. "id=%p fd=%d: UNIX domain socket name is too long",
  298. (void *)socket,
  299. socket->io_handle.data.fd);
  300. return aws_raise_error(AWS_IO_SOCKET_INVALID_ADDRESS);
  301. }
  302. memcpy(tmp_endpoint.address, s->sun_path, sun_len);
  303. #if USE_VSOCK
  304. } else if (address.ss_family == AF_VSOCK) {
  305. struct sockaddr_vm *s = (struct sockaddr_vm *)&address;
  306. /* VSOCK port is 32bit, but aws_socket_endpoint.port is only 16bit.
  307. * Hopefully this isn't an issue, since users can only pass in 16bit values.
  308. * But if it becomes an issue, we'll need to make aws_socket_endpoint more flexible */
  309. if (s->svm_port > UINT16_MAX) {
  310. AWS_LOGF_ERROR(
  311. AWS_LS_IO_SOCKET,
  312. "id=%p fd=%d: aws_socket_endpoint can't deal with VSOCK port > UINT16_MAX",
  313. (void *)socket,
  314. socket->io_handle.data.fd);
  315. return aws_raise_error(AWS_IO_SOCKET_INVALID_ADDRESS);
  316. }
  317. tmp_endpoint.port = (uint16_t)s->svm_port;
  318. snprintf(tmp_endpoint.address, sizeof(tmp_endpoint.address), "%" PRIu32, s->svm_cid);
  319. return AWS_OP_SUCCESS;
  320. #endif /* USE_VSOCK */
  321. } else {
  322. AWS_ASSERT(0);
  323. return aws_raise_error(AWS_IO_SOCKET_UNSUPPORTED_ADDRESS_FAMILY);
  324. }
  325. socket->local_endpoint = tmp_endpoint;
  326. return AWS_OP_SUCCESS;
  327. }
  328. static void s_on_connection_error(struct aws_socket *socket, int error);
  329. static int s_on_connection_success(struct aws_socket *socket) {
  330. struct aws_event_loop *event_loop = socket->event_loop;
  331. struct posix_socket *socket_impl = socket->impl;
  332. if (socket_impl->currently_subscribed) {
  333. aws_event_loop_unsubscribe_from_io_events(socket->event_loop, &socket->io_handle);
  334. socket_impl->currently_subscribed = false;
  335. }
  336. socket->event_loop = NULL;
  337. int connect_result;
  338. socklen_t result_length = sizeof(connect_result);
  339. if (getsockopt(socket->io_handle.data.fd, SOL_SOCKET, SO_ERROR, &connect_result, &result_length) < 0) {
  340. int errno_value = errno; /* Always cache errno before potential side-effect */
  341. AWS_LOGF_ERROR(
  342. AWS_LS_IO_SOCKET,
  343. "id=%p fd=%d: failed to determine connection error %d",
  344. (void *)socket,
  345. socket->io_handle.data.fd,
  346. errno_value);
  347. int aws_error = s_determine_socket_error(errno_value);
  348. aws_raise_error(aws_error);
  349. s_on_connection_error(socket, aws_error);
  350. return AWS_OP_ERR;
  351. }
  352. if (connect_result) {
  353. AWS_LOGF_ERROR(
  354. AWS_LS_IO_SOCKET,
  355. "id=%p fd=%d: connection error %d",
  356. (void *)socket,
  357. socket->io_handle.data.fd,
  358. connect_result);
  359. int aws_error = s_determine_socket_error(connect_result);
  360. aws_raise_error(aws_error);
  361. s_on_connection_error(socket, aws_error);
  362. return AWS_OP_ERR;
  363. }
  364. AWS_LOGF_INFO(AWS_LS_IO_SOCKET, "id=%p fd=%d: connection success", (void *)socket, socket->io_handle.data.fd);
  365. if (s_update_local_endpoint(socket)) {
  366. s_on_connection_error(socket, aws_last_error());
  367. return AWS_OP_ERR;
  368. }
  369. socket->state = CONNECTED_WRITE | CONNECTED_READ;
  370. if (aws_socket_assign_to_event_loop(socket, event_loop)) {
  371. AWS_LOGF_ERROR(
  372. AWS_LS_IO_SOCKET,
  373. "id=%p fd=%d: assignment to event loop %p failed with error %d",
  374. (void *)socket,
  375. socket->io_handle.data.fd,
  376. (void *)event_loop,
  377. aws_last_error());
  378. s_on_connection_error(socket, aws_last_error());
  379. return AWS_OP_ERR;
  380. }
  381. socket->connection_result_fn(socket, AWS_ERROR_SUCCESS, socket->connect_accept_user_data);
  382. return AWS_OP_SUCCESS;
  383. }
  384. static void s_on_connection_error(struct aws_socket *socket, int error) {
  385. socket->state = ERROR;
  386. AWS_LOGF_ERROR(AWS_LS_IO_SOCKET, "id=%p fd=%d: connection failure", (void *)socket, socket->io_handle.data.fd);
  387. if (socket->connection_result_fn) {
  388. socket->connection_result_fn(socket, error, socket->connect_accept_user_data);
  389. } else if (socket->accept_result_fn) {
  390. socket->accept_result_fn(socket, error, NULL, socket->connect_accept_user_data);
  391. }
  392. }
  393. /* the next two callbacks compete based on which one runs first. if s_socket_connect_event
  394. * comes back first, then we set socket_args->socket = NULL and continue on with the connection.
  395. * if s_handle_socket_timeout() runs first, is sees socket_args->socket is NULL and just cleans up its memory.
  396. * s_handle_socket_timeout() will always run so the memory for socket_connect_args is always cleaned up there. */
  397. static void s_socket_connect_event(
  398. struct aws_event_loop *event_loop,
  399. struct aws_io_handle *handle,
  400. int events,
  401. void *user_data) {
  402. (void)event_loop;
  403. (void)handle;
  404. struct posix_socket_connect_args *socket_args = (struct posix_socket_connect_args *)user_data;
  405. AWS_LOGF_TRACE(AWS_LS_IO_SOCKET, "fd=%d: connection activity handler triggered ", handle->data.fd);
  406. if (socket_args->socket) {
  407. AWS_LOGF_TRACE(
  408. AWS_LS_IO_SOCKET,
  409. "id=%p fd=%d: has not timed out yet proceeding with connection.",
  410. (void *)socket_args->socket,
  411. handle->data.fd);
  412. struct posix_socket *socket_impl = socket_args->socket->impl;
  413. if (!(events & AWS_IO_EVENT_TYPE_ERROR || events & AWS_IO_EVENT_TYPE_CLOSED) &&
  414. (events & AWS_IO_EVENT_TYPE_READABLE || events & AWS_IO_EVENT_TYPE_WRITABLE)) {
  415. struct aws_socket *socket = socket_args->socket;
  416. socket_args->socket = NULL;
  417. socket_impl->connect_args = NULL;
  418. s_on_connection_success(socket);
  419. return;
  420. }
  421. int aws_error = aws_socket_get_error(socket_args->socket);
  422. /* we'll get another notification. */
  423. if (aws_error == AWS_IO_READ_WOULD_BLOCK) {
  424. AWS_LOGF_TRACE(
  425. AWS_LS_IO_SOCKET,
  426. "id=%p fd=%d: spurious event, waiting for another notification.",
  427. (void *)socket_args->socket,
  428. handle->data.fd);
  429. return;
  430. }
  431. struct aws_socket *socket = socket_args->socket;
  432. socket_args->socket = NULL;
  433. socket_impl->connect_args = NULL;
  434. aws_raise_error(aws_error);
  435. s_on_connection_error(socket, aws_error);
  436. }
  437. }
  438. static void s_handle_socket_timeout(struct aws_task *task, void *args, aws_task_status status) {
  439. (void)task;
  440. (void)status;
  441. struct posix_socket_connect_args *socket_args = args;
  442. AWS_LOGF_TRACE(AWS_LS_IO_SOCKET, "task_id=%p: timeout task triggered, evaluating timeouts.", (void *)task);
  443. /* successful connection will have nulled out connect_args->socket */
  444. if (socket_args->socket) {
  445. AWS_LOGF_ERROR(
  446. AWS_LS_IO_SOCKET,
  447. "id=%p fd=%d: timed out, shutting down.",
  448. (void *)socket_args->socket,
  449. socket_args->socket->io_handle.data.fd);
  450. socket_args->socket->state = TIMEDOUT;
  451. int error_code = AWS_IO_SOCKET_TIMEOUT;
  452. if (status == AWS_TASK_STATUS_RUN_READY) {
  453. aws_event_loop_unsubscribe_from_io_events(socket_args->socket->event_loop, &socket_args->socket->io_handle);
  454. } else {
  455. error_code = AWS_IO_EVENT_LOOP_SHUTDOWN;
  456. aws_event_loop_free_io_event_resources(socket_args->socket->event_loop, &socket_args->socket->io_handle);
  457. }
  458. socket_args->socket->event_loop = NULL;
  459. struct posix_socket *socket_impl = socket_args->socket->impl;
  460. socket_impl->currently_subscribed = false;
  461. aws_raise_error(error_code);
  462. struct aws_socket *socket = socket_args->socket;
  463. /*socket close sets socket_args->socket to NULL and
  464. * socket_impl->connect_args to NULL. */
  465. aws_socket_close(socket);
  466. s_on_connection_error(socket, error_code);
  467. }
  468. aws_mem_release(socket_args->allocator, socket_args);
  469. }
  470. /* this is used simply for moving a connect_success callback when the connect finished immediately
  471. * (like for unix domain sockets) into the event loop's thread. Also note, in that case there was no
  472. * timeout task scheduled, so in this case the socket_args are cleaned up. */
  473. static void s_run_connect_success(struct aws_task *task, void *arg, enum aws_task_status status) {
  474. (void)task;
  475. struct posix_socket_connect_args *socket_args = arg;
  476. if (socket_args->socket) {
  477. struct posix_socket *socket_impl = socket_args->socket->impl;
  478. if (status == AWS_TASK_STATUS_RUN_READY) {
  479. s_on_connection_success(socket_args->socket);
  480. } else {
  481. aws_raise_error(AWS_IO_SOCKET_CONNECT_ABORTED);
  482. socket_args->socket->event_loop = NULL;
  483. s_on_connection_error(socket_args->socket, AWS_IO_SOCKET_CONNECT_ABORTED);
  484. }
  485. socket_impl->connect_args = NULL;
  486. }
  487. aws_mem_release(socket_args->allocator, socket_args);
  488. }
  489. static inline int s_convert_pton_error(int pton_code, int errno_value) {
  490. if (pton_code == 0) {
  491. return AWS_IO_SOCKET_INVALID_ADDRESS;
  492. }
  493. return s_determine_socket_error(errno_value);
  494. }
  495. struct socket_address {
  496. union sock_addr_types {
  497. struct sockaddr_in addr_in;
  498. struct sockaddr_in6 addr_in6;
  499. struct sockaddr_un un_addr;
  500. #ifdef USE_VSOCK
  501. struct sockaddr_vm vm_addr;
  502. #endif
  503. } sock_addr_types;
  504. };
  505. #ifdef USE_VSOCK
  506. /** Convert a string to a VSOCK CID. Respects the calling convetion of inet_pton:
  507. * 0 on error, 1 on success. */
  508. static int parse_cid(const char *cid_str, unsigned int *value) {
  509. if (cid_str == NULL || value == NULL) {
  510. errno = EINVAL;
  511. return 0;
  512. }
  513. /* strtoll returns 0 as both error and correct value */
  514. errno = 0;
  515. /* unsigned long long to handle edge cases in convention explicitly */
  516. long long cid = strtoll(cid_str, NULL, 10);
  517. if (errno != 0) {
  518. return 0;
  519. }
  520. /* -1U means any, so it's a valid value, but it needs to be converted to
  521. * unsigned int. */
  522. if (cid == -1) {
  523. *value = VMADDR_CID_ANY;
  524. return 1;
  525. }
  526. if (cid < 0 || cid > UINT_MAX) {
  527. errno = ERANGE;
  528. return 0;
  529. }
  530. /* cast is safe here, edge cases already checked */
  531. *value = (unsigned int)cid;
  532. return 1;
  533. }
  534. #endif
  535. int aws_socket_connect(
  536. struct aws_socket *socket,
  537. const struct aws_socket_endpoint *remote_endpoint,
  538. struct aws_event_loop *event_loop,
  539. aws_socket_on_connection_result_fn *on_connection_result,
  540. void *user_data) {
  541. AWS_ASSERT(event_loop);
  542. AWS_ASSERT(!socket->event_loop);
  543. AWS_LOGF_DEBUG(AWS_LS_IO_SOCKET, "id=%p fd=%d: beginning connect.", (void *)socket, socket->io_handle.data.fd);
  544. if (socket->event_loop) {
  545. return aws_raise_error(AWS_IO_EVENT_LOOP_ALREADY_ASSIGNED);
  546. }
  547. if (socket->options.type != AWS_SOCKET_DGRAM) {
  548. AWS_ASSERT(on_connection_result);
  549. if (socket->state != INIT) {
  550. return aws_raise_error(AWS_IO_SOCKET_ILLEGAL_OPERATION_FOR_STATE);
  551. }
  552. } else { /* UDP socket */
  553. /* UDP sockets jump to CONNECT_READ if bind is called first */
  554. if (socket->state != CONNECTED_READ && socket->state != INIT) {
  555. return aws_raise_error(AWS_IO_SOCKET_ILLEGAL_OPERATION_FOR_STATE);
  556. }
  557. }
  558. size_t address_strlen;
  559. if (aws_secure_strlen(remote_endpoint->address, AWS_ADDRESS_MAX_LEN, &address_strlen)) {
  560. return AWS_OP_ERR;
  561. }
  562. struct socket_address address;
  563. AWS_ZERO_STRUCT(address);
  564. socklen_t sock_size = 0;
  565. int pton_err = 1;
  566. if (socket->options.domain == AWS_SOCKET_IPV4) {
  567. pton_err = inet_pton(AF_INET, remote_endpoint->address, &address.sock_addr_types.addr_in.sin_addr);
  568. address.sock_addr_types.addr_in.sin_port = htons(remote_endpoint->port);
  569. address.sock_addr_types.addr_in.sin_family = AF_INET;
  570. sock_size = sizeof(address.sock_addr_types.addr_in);
  571. } else if (socket->options.domain == AWS_SOCKET_IPV6) {
  572. pton_err = inet_pton(AF_INET6, remote_endpoint->address, &address.sock_addr_types.addr_in6.sin6_addr);
  573. address.sock_addr_types.addr_in6.sin6_port = htons(remote_endpoint->port);
  574. address.sock_addr_types.addr_in6.sin6_family = AF_INET6;
  575. sock_size = sizeof(address.sock_addr_types.addr_in6);
  576. } else if (socket->options.domain == AWS_SOCKET_LOCAL) {
  577. address.sock_addr_types.un_addr.sun_family = AF_UNIX;
  578. strncpy(address.sock_addr_types.un_addr.sun_path, remote_endpoint->address, AWS_ADDRESS_MAX_LEN);
  579. sock_size = sizeof(address.sock_addr_types.un_addr);
  580. #ifdef USE_VSOCK
  581. } else if (socket->options.domain == AWS_SOCKET_VSOCK) {
  582. pton_err = parse_cid(remote_endpoint->address, &address.sock_addr_types.vm_addr.svm_cid);
  583. address.sock_addr_types.vm_addr.svm_family = AF_VSOCK;
  584. address.sock_addr_types.vm_addr.svm_port = (unsigned int)remote_endpoint->port;
  585. sock_size = sizeof(address.sock_addr_types.vm_addr);
  586. #endif
  587. } else {
  588. AWS_ASSERT(0);
  589. return aws_raise_error(AWS_IO_SOCKET_UNSUPPORTED_ADDRESS_FAMILY);
  590. }
  591. if (pton_err != 1) {
  592. int errno_value = errno; /* Always cache errno before potential side-effect */
  593. AWS_LOGF_ERROR(
  594. AWS_LS_IO_SOCKET,
  595. "id=%p fd=%d: failed to parse address %s:%d.",
  596. (void *)socket,
  597. socket->io_handle.data.fd,
  598. remote_endpoint->address,
  599. (int)remote_endpoint->port);
  600. return aws_raise_error(s_convert_pton_error(pton_err, errno_value));
  601. }
  602. AWS_LOGF_DEBUG(
  603. AWS_LS_IO_SOCKET,
  604. "id=%p fd=%d: connecting to endpoint %s:%d.",
  605. (void *)socket,
  606. socket->io_handle.data.fd,
  607. remote_endpoint->address,
  608. (int)remote_endpoint->port);
  609. socket->state = CONNECTING;
  610. socket->remote_endpoint = *remote_endpoint;
  611. socket->connect_accept_user_data = user_data;
  612. socket->connection_result_fn = on_connection_result;
  613. struct posix_socket *socket_impl = socket->impl;
  614. socket_impl->connect_args = aws_mem_calloc(socket->allocator, 1, sizeof(struct posix_socket_connect_args));
  615. if (!socket_impl->connect_args) {
  616. return AWS_OP_ERR;
  617. }
  618. socket_impl->connect_args->socket = socket;
  619. socket_impl->connect_args->allocator = socket->allocator;
  620. socket_impl->connect_args->task.fn = s_handle_socket_timeout;
  621. socket_impl->connect_args->task.arg = socket_impl->connect_args;
  622. int error_code = connect(socket->io_handle.data.fd, (struct sockaddr *)&address.sock_addr_types, sock_size);
  623. socket->event_loop = event_loop;
  624. if (!error_code) {
  625. AWS_LOGF_INFO(
  626. AWS_LS_IO_SOCKET,
  627. "id=%p fd=%d: connected immediately, not scheduling timeout.",
  628. (void *)socket,
  629. socket->io_handle.data.fd);
  630. socket_impl->connect_args->task.fn = s_run_connect_success;
  631. /* the subscription for IO will happen once we setup the connection in the task. Since we already
  632. * know the connection succeeded, we don't need to register for events yet. */
  633. aws_event_loop_schedule_task_now(event_loop, &socket_impl->connect_args->task);
  634. }
  635. if (error_code) {
  636. int errno_value = errno; /* Always cache errno before potential side-effect */
  637. if (errno_value == EINPROGRESS || errno_value == EALREADY) {
  638. AWS_LOGF_TRACE(
  639. AWS_LS_IO_SOCKET,
  640. "id=%p fd=%d: connection pending waiting on event-loop notification or timeout.",
  641. (void *)socket,
  642. socket->io_handle.data.fd);
  643. /* cache the timeout task; it is possible for the IO subscription to come back virtually immediately
  644. * and null out the connect args */
  645. struct aws_task *timeout_task = &socket_impl->connect_args->task;
  646. socket_impl->currently_subscribed = true;
  647. /* This event is for when the connection finishes. (the fd will flip writable). */
  648. if (aws_event_loop_subscribe_to_io_events(
  649. event_loop,
  650. &socket->io_handle,
  651. AWS_IO_EVENT_TYPE_WRITABLE,
  652. s_socket_connect_event,
  653. socket_impl->connect_args)) {
  654. AWS_LOGF_ERROR(
  655. AWS_LS_IO_SOCKET,
  656. "id=%p fd=%d: failed to register with event-loop %p.",
  657. (void *)socket,
  658. socket->io_handle.data.fd,
  659. (void *)event_loop);
  660. socket_impl->currently_subscribed = false;
  661. socket->event_loop = NULL;
  662. goto err_clean_up;
  663. }
  664. /* schedule a task to run at the connect timeout interval, if this task runs before the connect
  665. * happens, we consider that a timeout. */
  666. uint64_t timeout = 0;
  667. aws_event_loop_current_clock_time(event_loop, &timeout);
  668. timeout += aws_timestamp_convert(
  669. socket->options.connect_timeout_ms, AWS_TIMESTAMP_MILLIS, AWS_TIMESTAMP_NANOS, NULL);
  670. AWS_LOGF_TRACE(
  671. AWS_LS_IO_SOCKET,
  672. "id=%p fd=%d: scheduling timeout task for %llu.",
  673. (void *)socket,
  674. socket->io_handle.data.fd,
  675. (unsigned long long)timeout);
  676. aws_event_loop_schedule_task_future(event_loop, timeout_task, timeout);
  677. } else {
  678. AWS_LOGF_ERROR(
  679. AWS_LS_IO_SOCKET,
  680. "id=%p fd=%d: connect failed with error code %d.",
  681. (void *)socket,
  682. socket->io_handle.data.fd,
  683. errno_value);
  684. int aws_error = s_determine_socket_error(errno_value);
  685. aws_raise_error(aws_error);
  686. socket->event_loop = NULL;
  687. socket_impl->currently_subscribed = false;
  688. goto err_clean_up;
  689. }
  690. }
  691. return AWS_OP_SUCCESS;
  692. err_clean_up:
  693. aws_mem_release(socket->allocator, socket_impl->connect_args);
  694. socket_impl->connect_args = NULL;
  695. return AWS_OP_ERR;
  696. }
  697. int aws_socket_bind(struct aws_socket *socket, const struct aws_socket_endpoint *local_endpoint) {
  698. if (socket->state != INIT) {
  699. AWS_LOGF_ERROR(
  700. AWS_LS_IO_SOCKET,
  701. "id=%p fd=%d: invalid state for bind operation.",
  702. (void *)socket,
  703. socket->io_handle.data.fd);
  704. return aws_raise_error(AWS_IO_SOCKET_ILLEGAL_OPERATION_FOR_STATE);
  705. }
  706. size_t address_strlen;
  707. if (aws_secure_strlen(local_endpoint->address, AWS_ADDRESS_MAX_LEN, &address_strlen)) {
  708. return AWS_OP_ERR;
  709. }
  710. AWS_LOGF_INFO(
  711. AWS_LS_IO_SOCKET,
  712. "id=%p fd=%d: binding to %s:%d.",
  713. (void *)socket,
  714. socket->io_handle.data.fd,
  715. local_endpoint->address,
  716. (int)local_endpoint->port);
  717. struct socket_address address;
  718. AWS_ZERO_STRUCT(address);
  719. socklen_t sock_size = 0;
  720. int pton_err = 1;
  721. if (socket->options.domain == AWS_SOCKET_IPV4) {
  722. pton_err = inet_pton(AF_INET, local_endpoint->address, &address.sock_addr_types.addr_in.sin_addr);
  723. address.sock_addr_types.addr_in.sin_port = htons(local_endpoint->port);
  724. address.sock_addr_types.addr_in.sin_family = AF_INET;
  725. sock_size = sizeof(address.sock_addr_types.addr_in);
  726. } else if (socket->options.domain == AWS_SOCKET_IPV6) {
  727. pton_err = inet_pton(AF_INET6, local_endpoint->address, &address.sock_addr_types.addr_in6.sin6_addr);
  728. address.sock_addr_types.addr_in6.sin6_port = htons(local_endpoint->port);
  729. address.sock_addr_types.addr_in6.sin6_family = AF_INET6;
  730. sock_size = sizeof(address.sock_addr_types.addr_in6);
  731. } else if (socket->options.domain == AWS_SOCKET_LOCAL) {
  732. address.sock_addr_types.un_addr.sun_family = AF_UNIX;
  733. strncpy(address.sock_addr_types.un_addr.sun_path, local_endpoint->address, AWS_ADDRESS_MAX_LEN);
  734. sock_size = sizeof(address.sock_addr_types.un_addr);
  735. #ifdef USE_VSOCK
  736. } else if (socket->options.domain == AWS_SOCKET_VSOCK) {
  737. pton_err = parse_cid(local_endpoint->address, &address.sock_addr_types.vm_addr.svm_cid);
  738. address.sock_addr_types.vm_addr.svm_family = AF_VSOCK;
  739. address.sock_addr_types.vm_addr.svm_port = (unsigned int)local_endpoint->port;
  740. sock_size = sizeof(address.sock_addr_types.vm_addr);
  741. #endif
  742. } else {
  743. AWS_ASSERT(0);
  744. return aws_raise_error(AWS_IO_SOCKET_UNSUPPORTED_ADDRESS_FAMILY);
  745. }
  746. if (pton_err != 1) {
  747. int errno_value = errno; /* Always cache errno before potential side-effect */
  748. AWS_LOGF_ERROR(
  749. AWS_LS_IO_SOCKET,
  750. "id=%p fd=%d: failed to parse address %s:%d.",
  751. (void *)socket,
  752. socket->io_handle.data.fd,
  753. local_endpoint->address,
  754. (int)local_endpoint->port);
  755. return aws_raise_error(s_convert_pton_error(pton_err, errno_value));
  756. }
  757. if (bind(socket->io_handle.data.fd, (struct sockaddr *)&address.sock_addr_types, sock_size) != 0) {
  758. int errno_value = errno; /* Always cache errno before potential side-effect */
  759. AWS_LOGF_ERROR(
  760. AWS_LS_IO_SOCKET,
  761. "id=%p fd=%d: bind failed with error code %d",
  762. (void *)socket,
  763. socket->io_handle.data.fd,
  764. errno_value);
  765. aws_raise_error(s_determine_socket_error(errno_value));
  766. goto error;
  767. }
  768. if (s_update_local_endpoint(socket)) {
  769. goto error;
  770. }
  771. if (socket->options.type == AWS_SOCKET_STREAM) {
  772. socket->state = BOUND;
  773. } else {
  774. /* e.g. UDP is now readable */
  775. socket->state = CONNECTED_READ;
  776. }
  777. AWS_LOGF_DEBUG(
  778. AWS_LS_IO_SOCKET,
  779. "id=%p fd=%d: successfully bound to %s:%d",
  780. (void *)socket,
  781. socket->io_handle.data.fd,
  782. socket->local_endpoint.address,
  783. socket->local_endpoint.port);
  784. return AWS_OP_SUCCESS;
  785. error:
  786. socket->state = ERROR;
  787. return AWS_OP_ERR;
  788. }
  789. int aws_socket_get_bound_address(const struct aws_socket *socket, struct aws_socket_endpoint *out_address) {
  790. if (socket->local_endpoint.address[0] == 0) {
  791. AWS_LOGF_ERROR(
  792. AWS_LS_IO_SOCKET,
  793. "id=%p fd=%d: Socket has no local address. Socket must be bound first.",
  794. (void *)socket,
  795. socket->io_handle.data.fd);
  796. return aws_raise_error(AWS_IO_SOCKET_ILLEGAL_OPERATION_FOR_STATE);
  797. }
  798. *out_address = socket->local_endpoint;
  799. return AWS_OP_SUCCESS;
  800. }
  801. int aws_socket_listen(struct aws_socket *socket, int backlog_size) {
  802. if (socket->state != BOUND) {
  803. AWS_LOGF_ERROR(
  804. AWS_LS_IO_SOCKET,
  805. "id=%p fd=%d: invalid state for listen operation. You must call bind first.",
  806. (void *)socket,
  807. socket->io_handle.data.fd);
  808. return aws_raise_error(AWS_IO_SOCKET_ILLEGAL_OPERATION_FOR_STATE);
  809. }
  810. int error_code = listen(socket->io_handle.data.fd, backlog_size);
  811. if (!error_code) {
  812. AWS_LOGF_INFO(
  813. AWS_LS_IO_SOCKET, "id=%p fd=%d: successfully listening", (void *)socket, socket->io_handle.data.fd);
  814. socket->state = LISTENING;
  815. return AWS_OP_SUCCESS;
  816. }
  817. int errno_value = errno; /* Always cache errno before potential side-effect */
  818. AWS_LOGF_ERROR(
  819. AWS_LS_IO_SOCKET,
  820. "id=%p fd=%d: listen failed with error code %d",
  821. (void *)socket,
  822. socket->io_handle.data.fd,
  823. errno_value);
  824. socket->state = ERROR;
  825. return aws_raise_error(s_determine_socket_error(errno_value));
  826. }
  827. /* this is called by the event loop handler that was installed in start_accept(). It runs once the FD goes readable,
  828. * accepts as many as it can and then returns control to the event loop. */
  829. static void s_socket_accept_event(
  830. struct aws_event_loop *event_loop,
  831. struct aws_io_handle *handle,
  832. int events,
  833. void *user_data) {
  834. (void)event_loop;
  835. struct aws_socket *socket = user_data;
  836. struct posix_socket *socket_impl = socket->impl;
  837. AWS_LOGF_DEBUG(
  838. AWS_LS_IO_SOCKET, "id=%p fd=%d: listening event received", (void *)socket, socket->io_handle.data.fd);
  839. if (socket_impl->continue_accept && events & AWS_IO_EVENT_TYPE_READABLE) {
  840. int in_fd = 0;
  841. while (socket_impl->continue_accept && in_fd != -1) {
  842. struct sockaddr_storage in_addr;
  843. socklen_t in_len = sizeof(struct sockaddr_storage);
  844. in_fd = accept(handle->data.fd, (struct sockaddr *)&in_addr, &in_len);
  845. if (in_fd == -1) {
  846. int errno_value = errno; /* Always cache errno before potential side-effect */
  847. if (errno_value == EAGAIN || errno_value == EWOULDBLOCK) {
  848. break;
  849. }
  850. int aws_error = aws_socket_get_error(socket);
  851. aws_raise_error(aws_error);
  852. s_on_connection_error(socket, aws_error);
  853. break;
  854. }
  855. AWS_LOGF_DEBUG(
  856. AWS_LS_IO_SOCKET, "id=%p fd=%d: incoming connection", (void *)socket, socket->io_handle.data.fd);
  857. struct aws_socket *new_sock = aws_mem_acquire(socket->allocator, sizeof(struct aws_socket));
  858. if (!new_sock) {
  859. close(in_fd);
  860. s_on_connection_error(socket, aws_last_error());
  861. continue;
  862. }
  863. if (s_socket_init(new_sock, socket->allocator, &socket->options, in_fd)) {
  864. aws_mem_release(socket->allocator, new_sock);
  865. s_on_connection_error(socket, aws_last_error());
  866. continue;
  867. }
  868. new_sock->local_endpoint = socket->local_endpoint;
  869. new_sock->state = CONNECTED_READ | CONNECTED_WRITE;
  870. uint16_t port = 0;
  871. /* get the info on the incoming socket's address */
  872. if (in_addr.ss_family == AF_INET) {
  873. struct sockaddr_in *s = (struct sockaddr_in *)&in_addr;
  874. port = ntohs(s->sin_port);
  875. /* this came from the kernel, a.) it won't fail. b.) even if it does
  876. * its not fatal. come back and add logging later. */
  877. if (!inet_ntop(
  878. AF_INET,
  879. &s->sin_addr,
  880. new_sock->remote_endpoint.address,
  881. sizeof(new_sock->remote_endpoint.address))) {
  882. AWS_LOGF_WARN(
  883. AWS_LS_IO_SOCKET,
  884. "id=%p fd=%d:. Failed to determine remote address.",
  885. (void *)socket,
  886. socket->io_handle.data.fd);
  887. }
  888. new_sock->options.domain = AWS_SOCKET_IPV4;
  889. } else if (in_addr.ss_family == AF_INET6) {
  890. /* this came from the kernel, a.) it won't fail. b.) even if it does
  891. * its not fatal. come back and add logging later. */
  892. struct sockaddr_in6 *s = (struct sockaddr_in6 *)&in_addr;
  893. port = ntohs(s->sin6_port);
  894. if (!inet_ntop(
  895. AF_INET6,
  896. &s->sin6_addr,
  897. new_sock->remote_endpoint.address,
  898. sizeof(new_sock->remote_endpoint.address))) {
  899. AWS_LOGF_WARN(
  900. AWS_LS_IO_SOCKET,
  901. "id=%p fd=%d:. Failed to determine remote address.",
  902. (void *)socket,
  903. socket->io_handle.data.fd);
  904. }
  905. new_sock->options.domain = AWS_SOCKET_IPV6;
  906. } else if (in_addr.ss_family == AF_UNIX) {
  907. new_sock->remote_endpoint = socket->local_endpoint;
  908. new_sock->options.domain = AWS_SOCKET_LOCAL;
  909. }
  910. new_sock->remote_endpoint.port = port;
  911. AWS_LOGF_INFO(
  912. AWS_LS_IO_SOCKET,
  913. "id=%p fd=%d: connected to %s:%d, incoming fd %d",
  914. (void *)socket,
  915. socket->io_handle.data.fd,
  916. new_sock->remote_endpoint.address,
  917. new_sock->remote_endpoint.port,
  918. in_fd);
  919. int flags = fcntl(in_fd, F_GETFL, 0);
  920. flags |= O_NONBLOCK | O_CLOEXEC;
  921. fcntl(in_fd, F_SETFL, flags);
  922. bool close_occurred = false;
  923. socket_impl->close_happened = &close_occurred;
  924. socket->accept_result_fn(socket, AWS_ERROR_SUCCESS, new_sock, socket->connect_accept_user_data);
  925. if (close_occurred) {
  926. return;
  927. }
  928. socket_impl->close_happened = NULL;
  929. }
  930. }
  931. AWS_LOGF_TRACE(
  932. AWS_LS_IO_SOCKET,
  933. "id=%p fd=%d: finished processing incoming connections, "
  934. "waiting on event-loop notification",
  935. (void *)socket,
  936. socket->io_handle.data.fd);
  937. }
  938. int aws_socket_start_accept(
  939. struct aws_socket *socket,
  940. struct aws_event_loop *accept_loop,
  941. aws_socket_on_accept_result_fn *on_accept_result,
  942. void *user_data) {
  943. AWS_ASSERT(on_accept_result);
  944. AWS_ASSERT(accept_loop);
  945. if (socket->event_loop) {
  946. AWS_LOGF_ERROR(
  947. AWS_LS_IO_SOCKET,
  948. "id=%p fd=%d: is already assigned to event-loop %p.",
  949. (void *)socket,
  950. socket->io_handle.data.fd,
  951. (void *)socket->event_loop);
  952. return aws_raise_error(AWS_IO_EVENT_LOOP_ALREADY_ASSIGNED);
  953. }
  954. if (socket->state != LISTENING) {
  955. AWS_LOGF_ERROR(
  956. AWS_LS_IO_SOCKET,
  957. "id=%p fd=%d: invalid state for start_accept operation. You must call listen first.",
  958. (void *)socket,
  959. socket->io_handle.data.fd);
  960. return aws_raise_error(AWS_IO_SOCKET_ILLEGAL_OPERATION_FOR_STATE);
  961. }
  962. socket->accept_result_fn = on_accept_result;
  963. socket->connect_accept_user_data = user_data;
  964. socket->event_loop = accept_loop;
  965. struct posix_socket *socket_impl = socket->impl;
  966. socket_impl->continue_accept = true;
  967. socket_impl->currently_subscribed = true;
  968. if (aws_event_loop_subscribe_to_io_events(
  969. socket->event_loop, &socket->io_handle, AWS_IO_EVENT_TYPE_READABLE, s_socket_accept_event, socket)) {
  970. AWS_LOGF_ERROR(
  971. AWS_LS_IO_SOCKET,
  972. "id=%p fd=%d: failed to subscribe to event-loop %p.",
  973. (void *)socket,
  974. socket->io_handle.data.fd,
  975. (void *)socket->event_loop);
  976. socket_impl->continue_accept = false;
  977. socket_impl->currently_subscribed = false;
  978. socket->event_loop = NULL;
  979. return AWS_OP_ERR;
  980. }
  981. return AWS_OP_SUCCESS;
  982. }
  983. struct stop_accept_args {
  984. struct aws_task task;
  985. struct aws_mutex mutex;
  986. struct aws_condition_variable condition_variable;
  987. struct aws_socket *socket;
  988. int ret_code;
  989. bool invoked;
  990. };
  991. static bool s_stop_accept_pred(void *arg) {
  992. struct stop_accept_args *stop_accept_args = arg;
  993. return stop_accept_args->invoked;
  994. }
  995. static void s_stop_accept_task(struct aws_task *task, void *arg, enum aws_task_status status) {
  996. (void)task;
  997. (void)status;
  998. struct stop_accept_args *stop_accept_args = arg;
  999. aws_mutex_lock(&stop_accept_args->mutex);
  1000. stop_accept_args->ret_code = AWS_OP_SUCCESS;
  1001. if (aws_socket_stop_accept(stop_accept_args->socket)) {
  1002. stop_accept_args->ret_code = aws_last_error();
  1003. }
  1004. stop_accept_args->invoked = true;
  1005. aws_condition_variable_notify_one(&stop_accept_args->condition_variable);
  1006. aws_mutex_unlock(&stop_accept_args->mutex);
  1007. }
  1008. int aws_socket_stop_accept(struct aws_socket *socket) {
  1009. if (socket->state != LISTENING) {
  1010. AWS_LOGF_ERROR(
  1011. AWS_LS_IO_SOCKET,
  1012. "id=%p fd=%d: is not in a listening state, can't stop_accept.",
  1013. (void *)socket,
  1014. socket->io_handle.data.fd);
  1015. return aws_raise_error(AWS_IO_SOCKET_ILLEGAL_OPERATION_FOR_STATE);
  1016. }
  1017. AWS_LOGF_INFO(
  1018. AWS_LS_IO_SOCKET, "id=%p fd=%d: stopping accepting new connections", (void *)socket, socket->io_handle.data.fd);
  1019. if (!aws_event_loop_thread_is_callers_thread(socket->event_loop)) {
  1020. struct stop_accept_args args = {
  1021. .mutex = AWS_MUTEX_INIT,
  1022. .condition_variable = AWS_CONDITION_VARIABLE_INIT,
  1023. .invoked = false,
  1024. .socket = socket,
  1025. .ret_code = AWS_OP_SUCCESS,
  1026. .task = {.fn = s_stop_accept_task},
  1027. };
  1028. AWS_LOGF_INFO(
  1029. AWS_LS_IO_SOCKET,
  1030. "id=%p fd=%d: stopping accepting new connections from a different thread than "
  1031. "the socket is running from. Blocking until it shuts down.",
  1032. (void *)socket,
  1033. socket->io_handle.data.fd);
  1034. /* Look.... I know what I'm doing.... trust me, I'm an engineer.
  1035. * We wait on the completion before 'args' goes out of scope.
  1036. * NOLINTNEXTLINE */
  1037. args.task.arg = &args;
  1038. aws_mutex_lock(&args.mutex);
  1039. aws_event_loop_schedule_task_now(socket->event_loop, &args.task);
  1040. aws_condition_variable_wait_pred(&args.condition_variable, &args.mutex, s_stop_accept_pred, &args);
  1041. aws_mutex_unlock(&args.mutex);
  1042. AWS_LOGF_INFO(
  1043. AWS_LS_IO_SOCKET,
  1044. "id=%p fd=%d: stop accept task finished running.",
  1045. (void *)socket,
  1046. socket->io_handle.data.fd);
  1047. if (args.ret_code) {
  1048. return aws_raise_error(args.ret_code);
  1049. }
  1050. return AWS_OP_SUCCESS;
  1051. }
  1052. int ret_val = AWS_OP_SUCCESS;
  1053. struct posix_socket *socket_impl = socket->impl;
  1054. if (socket_impl->currently_subscribed) {
  1055. ret_val = aws_event_loop_unsubscribe_from_io_events(socket->event_loop, &socket->io_handle);
  1056. socket_impl->currently_subscribed = false;
  1057. socket_impl->continue_accept = false;
  1058. socket->event_loop = NULL;
  1059. }
  1060. return ret_val;
  1061. }
  1062. int aws_socket_set_options(struct aws_socket *socket, const struct aws_socket_options *options) {
  1063. if (socket->options.domain != options->domain || socket->options.type != options->type) {
  1064. return aws_raise_error(AWS_IO_SOCKET_INVALID_OPTIONS);
  1065. }
  1066. AWS_LOGF_DEBUG(
  1067. AWS_LS_IO_SOCKET,
  1068. "id=%p fd=%d: setting socket options to: keep-alive %d, keep idle %d, keep-alive interval %d, keep-alive probe "
  1069. "count %d.",
  1070. (void *)socket,
  1071. socket->io_handle.data.fd,
  1072. (int)options->keepalive,
  1073. (int)options->keep_alive_timeout_sec,
  1074. (int)options->keep_alive_interval_sec,
  1075. (int)options->keep_alive_max_failed_probes);
  1076. socket->options = *options;
  1077. #ifdef NO_SIGNAL_SOCK_OPT
  1078. int option_value = 1;
  1079. if (AWS_UNLIKELY(setsockopt(
  1080. socket->io_handle.data.fd, SOL_SOCKET, NO_SIGNAL_SOCK_OPT, &option_value, sizeof(option_value)))) {
  1081. int errno_value = errno; /* Always cache errno before potential side-effect */
  1082. AWS_LOGF_WARN(
  1083. AWS_LS_IO_SOCKET,
  1084. "id=%p fd=%d: setsockopt() for NO_SIGNAL_SOCK_OPT failed with errno %d.",
  1085. (void *)socket,
  1086. socket->io_handle.data.fd,
  1087. errno_value);
  1088. }
  1089. #endif /* NO_SIGNAL_SOCK_OPT */
  1090. int reuse = 1;
  1091. if (AWS_UNLIKELY(setsockopt(socket->io_handle.data.fd, SOL_SOCKET, SO_REUSEADDR, &reuse, sizeof(int)))) {
  1092. int errno_value = errno; /* Always cache errno before potential side-effect */
  1093. AWS_LOGF_WARN(
  1094. AWS_LS_IO_SOCKET,
  1095. "id=%p fd=%d: setsockopt() for SO_REUSEADDR failed with errno %d.",
  1096. (void *)socket,
  1097. socket->io_handle.data.fd,
  1098. errno_value);
  1099. }
  1100. if (options->type == AWS_SOCKET_STREAM && options->domain != AWS_SOCKET_LOCAL) {
  1101. if (socket->options.keepalive) {
  1102. int keep_alive = 1;
  1103. if (AWS_UNLIKELY(
  1104. setsockopt(socket->io_handle.data.fd, SOL_SOCKET, SO_KEEPALIVE, &keep_alive, sizeof(int)))) {
  1105. int errno_value = errno; /* Always cache errno before potential side-effect */
  1106. AWS_LOGF_WARN(
  1107. AWS_LS_IO_SOCKET,
  1108. "id=%p fd=%d: setsockopt() for enabling SO_KEEPALIVE failed with errno %d.",
  1109. (void *)socket,
  1110. socket->io_handle.data.fd,
  1111. errno_value);
  1112. }
  1113. }
  1114. #if !defined(__OpenBSD__)
  1115. if (socket->options.keep_alive_interval_sec && socket->options.keep_alive_timeout_sec) {
  1116. int ival_in_secs = socket->options.keep_alive_interval_sec;
  1117. if (AWS_UNLIKELY(setsockopt(
  1118. socket->io_handle.data.fd, IPPROTO_TCP, TCP_KEEPIDLE, &ival_in_secs, sizeof(ival_in_secs)))) {
  1119. int errno_value = errno; /* Always cache errno before potential side-effect */
  1120. AWS_LOGF_WARN(
  1121. AWS_LS_IO_SOCKET,
  1122. "id=%p fd=%d: setsockopt() for enabling TCP_KEEPIDLE for TCP failed with errno %d.",
  1123. (void *)socket,
  1124. socket->io_handle.data.fd,
  1125. errno_value);
  1126. }
  1127. ival_in_secs = socket->options.keep_alive_timeout_sec;
  1128. if (AWS_UNLIKELY(setsockopt(
  1129. socket->io_handle.data.fd, IPPROTO_TCP, TCP_KEEPINTVL, &ival_in_secs, sizeof(ival_in_secs)))) {
  1130. int errno_value = errno; /* Always cache errno before potential side-effect */
  1131. AWS_LOGF_WARN(
  1132. AWS_LS_IO_SOCKET,
  1133. "id=%p fd=%d: setsockopt() for enabling TCP_KEEPINTVL for TCP failed with errno %d.",
  1134. (void *)socket,
  1135. socket->io_handle.data.fd,
  1136. errno_value);
  1137. }
  1138. }
  1139. if (socket->options.keep_alive_max_failed_probes) {
  1140. int max_probes = socket->options.keep_alive_max_failed_probes;
  1141. if (AWS_UNLIKELY(
  1142. setsockopt(socket->io_handle.data.fd, IPPROTO_TCP, TCP_KEEPCNT, &max_probes, sizeof(max_probes)))) {
  1143. int errno_value = errno; /* Always cache errno before potential side-effect */
  1144. AWS_LOGF_WARN(
  1145. AWS_LS_IO_SOCKET,
  1146. "id=%p fd=%d: setsockopt() for enabling TCP_KEEPCNT for TCP failed with errno %d.",
  1147. (void *)socket,
  1148. socket->io_handle.data.fd,
  1149. errno_value);
  1150. }
  1151. }
  1152. #endif /* __OpenBSD__ */
  1153. }
  1154. return AWS_OP_SUCCESS;
  1155. }
  1156. struct socket_write_request {
  1157. struct aws_byte_cursor cursor_cpy;
  1158. aws_socket_on_write_completed_fn *written_fn;
  1159. void *write_user_data;
  1160. struct aws_linked_list_node node;
  1161. size_t original_buffer_len;
  1162. int error_code;
  1163. };
  1164. struct posix_socket_close_args {
  1165. struct aws_mutex mutex;
  1166. struct aws_condition_variable condition_variable;
  1167. struct aws_socket *socket;
  1168. bool invoked;
  1169. int ret_code;
  1170. };
  1171. static bool s_close_predicate(void *arg) {
  1172. struct posix_socket_close_args *close_args = arg;
  1173. return close_args->invoked;
  1174. }
  1175. static void s_close_task(struct aws_task *task, void *arg, enum aws_task_status status) {
  1176. (void)task;
  1177. (void)status;
  1178. struct posix_socket_close_args *close_args = arg;
  1179. aws_mutex_lock(&close_args->mutex);
  1180. close_args->ret_code = AWS_OP_SUCCESS;
  1181. if (aws_socket_close(close_args->socket)) {
  1182. close_args->ret_code = aws_last_error();
  1183. }
  1184. close_args->invoked = true;
  1185. aws_condition_variable_notify_one(&close_args->condition_variable);
  1186. aws_mutex_unlock(&close_args->mutex);
  1187. }
  1188. int aws_socket_close(struct aws_socket *socket) {
  1189. struct posix_socket *socket_impl = socket->impl;
  1190. AWS_LOGF_DEBUG(AWS_LS_IO_SOCKET, "id=%p fd=%d: closing", (void *)socket, socket->io_handle.data.fd);
  1191. struct aws_event_loop *event_loop = socket->event_loop;
  1192. if (socket->event_loop) {
  1193. /* don't freak out on me, this almost never happens, and never occurs inside a channel
  1194. * it only gets hit from a listening socket shutting down or from a unit test. */
  1195. if (!aws_event_loop_thread_is_callers_thread(socket->event_loop)) {
  1196. AWS_LOGF_INFO(
  1197. AWS_LS_IO_SOCKET,
  1198. "id=%p fd=%d: closing from a different thread than "
  1199. "the socket is running from. Blocking until it closes down.",
  1200. (void *)socket,
  1201. socket->io_handle.data.fd);
  1202. /* the only time we allow this kind of thing is when you're a listener.*/
  1203. if (socket->state != LISTENING) {
  1204. return aws_raise_error(AWS_IO_SOCKET_ILLEGAL_OPERATION_FOR_STATE);
  1205. }
  1206. struct posix_socket_close_args args = {
  1207. .mutex = AWS_MUTEX_INIT,
  1208. .condition_variable = AWS_CONDITION_VARIABLE_INIT,
  1209. .socket = socket,
  1210. .ret_code = AWS_OP_SUCCESS,
  1211. .invoked = false,
  1212. };
  1213. struct aws_task close_task = {
  1214. .fn = s_close_task,
  1215. .arg = &args,
  1216. };
  1217. int fd_for_logging = socket->io_handle.data.fd; /* socket's fd gets reset before final log */
  1218. (void)fd_for_logging;
  1219. aws_mutex_lock(&args.mutex);
  1220. aws_event_loop_schedule_task_now(socket->event_loop, &close_task);
  1221. aws_condition_variable_wait_pred(&args.condition_variable, &args.mutex, s_close_predicate, &args);
  1222. aws_mutex_unlock(&args.mutex);
  1223. AWS_LOGF_INFO(AWS_LS_IO_SOCKET, "id=%p fd=%d: close task completed.", (void *)socket, fd_for_logging);
  1224. if (args.ret_code) {
  1225. return aws_raise_error(args.ret_code);
  1226. }
  1227. return AWS_OP_SUCCESS;
  1228. }
  1229. if (socket_impl->currently_subscribed) {
  1230. if (socket->state & LISTENING) {
  1231. aws_socket_stop_accept(socket);
  1232. } else {
  1233. int err_code = aws_event_loop_unsubscribe_from_io_events(socket->event_loop, &socket->io_handle);
  1234. if (err_code) {
  1235. return AWS_OP_ERR;
  1236. }
  1237. }
  1238. socket_impl->currently_subscribed = false;
  1239. socket->event_loop = NULL;
  1240. }
  1241. }
  1242. if (socket_impl->close_happened) {
  1243. *socket_impl->close_happened = true;
  1244. }
  1245. if (socket_impl->connect_args) {
  1246. socket_impl->connect_args->socket = NULL;
  1247. socket_impl->connect_args = NULL;
  1248. }
  1249. if (aws_socket_is_open(socket)) {
  1250. close(socket->io_handle.data.fd);
  1251. socket->io_handle.data.fd = -1;
  1252. socket->state = CLOSED;
  1253. /* ensure callbacks for pending writes fire (in order) before this close function returns */
  1254. if (socket_impl->written_task_scheduled) {
  1255. aws_event_loop_cancel_task(event_loop, &socket_impl->written_task);
  1256. }
  1257. while (!aws_linked_list_empty(&socket_impl->written_queue)) {
  1258. struct aws_linked_list_node *node = aws_linked_list_pop_front(&socket_impl->written_queue);
  1259. struct socket_write_request *write_request = AWS_CONTAINER_OF(node, struct socket_write_request, node);
  1260. size_t bytes_written = write_request->original_buffer_len - write_request->cursor_cpy.len;
  1261. write_request->written_fn(socket, write_request->error_code, bytes_written, write_request->write_user_data);
  1262. aws_mem_release(socket->allocator, write_request);
  1263. }
  1264. while (!aws_linked_list_empty(&socket_impl->write_queue)) {
  1265. struct aws_linked_list_node *node = aws_linked_list_pop_front(&socket_impl->write_queue);
  1266. struct socket_write_request *write_request = AWS_CONTAINER_OF(node, struct socket_write_request, node);
  1267. size_t bytes_written = write_request->original_buffer_len - write_request->cursor_cpy.len;
  1268. write_request->written_fn(socket, AWS_IO_SOCKET_CLOSED, bytes_written, write_request->write_user_data);
  1269. aws_mem_release(socket->allocator, write_request);
  1270. }
  1271. }
  1272. return AWS_OP_SUCCESS;
  1273. }
  1274. int aws_socket_shutdown_dir(struct aws_socket *socket, enum aws_channel_direction dir) {
  1275. int how = dir == AWS_CHANNEL_DIR_READ ? 0 : 1;
  1276. AWS_LOGF_DEBUG(
  1277. AWS_LS_IO_SOCKET, "id=%p fd=%d: shutting down in direction %d", (void *)socket, socket->io_handle.data.fd, dir);
  1278. if (shutdown(socket->io_handle.data.fd, how)) {
  1279. int errno_value = errno; /* Always cache errno before potential side-effect */
  1280. int aws_error = s_determine_socket_error(errno_value);
  1281. return aws_raise_error(aws_error);
  1282. }
  1283. if (dir == AWS_CHANNEL_DIR_READ) {
  1284. socket->state &= ~CONNECTED_READ;
  1285. } else {
  1286. socket->state &= ~CONNECTED_WRITE;
  1287. }
  1288. return AWS_OP_SUCCESS;
  1289. }
  1290. static void s_written_task(struct aws_task *task, void *arg, enum aws_task_status status) {
  1291. (void)task;
  1292. (void)status;
  1293. struct aws_socket *socket = arg;
  1294. struct posix_socket *socket_impl = socket->impl;
  1295. socket_impl->written_task_scheduled = false;
  1296. /* this is to handle a race condition when a callback kicks off a cleanup, or the user decides
  1297. * to close the socket based on something they read (SSL validation failed for example).
  1298. * if clean_up happens when internal_refcount > 0, socket_impl is kept dangling */
  1299. aws_ref_count_acquire(&socket_impl->internal_refcount);
  1300. /* Notes about weird loop:
  1301. * 1) Only process the initial contents of queue when this task is run,
  1302. * ignoring any writes queued during delivery.
  1303. * If we simply looped until the queue was empty, we could get into a
  1304. * synchronous loop of completing and writing and completing and writing...
  1305. * and it would be tough for multiple sockets to share an event-loop fairly.
  1306. * 2) Check if queue is empty with each iteration.
  1307. * If user calls close() from the callback, close() will process all
  1308. * nodes in the written_queue, and the queue will be empty when the
  1309. * callstack gets back to here. */
  1310. if (!aws_linked_list_empty(&socket_impl->written_queue)) {
  1311. struct aws_linked_list_node *stop_after = aws_linked_list_back(&socket_impl->written_queue);
  1312. do {
  1313. struct aws_linked_list_node *node = aws_linked_list_pop_front(&socket_impl->written_queue);
  1314. struct socket_write_request *write_request = AWS_CONTAINER_OF(node, struct socket_write_request, node);
  1315. size_t bytes_written = write_request->original_buffer_len - write_request->cursor_cpy.len;
  1316. write_request->written_fn(socket, write_request->error_code, bytes_written, write_request->write_user_data);
  1317. aws_mem_release(socket_impl->allocator, write_request);
  1318. if (node == stop_after) {
  1319. break;
  1320. }
  1321. } while (!aws_linked_list_empty(&socket_impl->written_queue));
  1322. }
  1323. aws_ref_count_release(&socket_impl->internal_refcount);
  1324. }
  1325. /* this gets called in two scenarios.
  1326. * 1st scenario, someone called aws_socket_write() and we want to try writing now, so an error can be returned
  1327. * immediately if something bad has happened to the socket. In this case, `parent_request` is set.
  1328. * 2nd scenario, the event loop notified us that the socket went writable. In this case `parent_request` is NULL */
  1329. static int s_process_socket_write_requests(struct aws_socket *socket, struct socket_write_request *parent_request) {
  1330. struct posix_socket *socket_impl = socket->impl;
  1331. if (parent_request) {
  1332. AWS_LOGF_TRACE(
  1333. AWS_LS_IO_SOCKET,
  1334. "id=%p fd=%d: processing write requests, called from aws_socket_write",
  1335. (void *)socket,
  1336. socket->io_handle.data.fd);
  1337. } else {
  1338. AWS_LOGF_TRACE(
  1339. AWS_LS_IO_SOCKET,
  1340. "id=%p fd=%d: processing write requests, invoked by the event-loop",
  1341. (void *)socket,
  1342. socket->io_handle.data.fd);
  1343. }
  1344. bool purge = false;
  1345. int aws_error = AWS_OP_SUCCESS;
  1346. bool parent_request_failed = false;
  1347. bool pushed_to_written_queue = false;
  1348. /* if a close call happens in the middle, this queue will have been cleaned out from under us. */
  1349. while (!aws_linked_list_empty(&socket_impl->write_queue)) {
  1350. struct aws_linked_list_node *node = aws_linked_list_front(&socket_impl->write_queue);
  1351. struct socket_write_request *write_request = AWS_CONTAINER_OF(node, struct socket_write_request, node);
  1352. AWS_LOGF_TRACE(
  1353. AWS_LS_IO_SOCKET,
  1354. "id=%p fd=%d: dequeued write request of size %llu, remaining to write %llu",
  1355. (void *)socket,
  1356. socket->io_handle.data.fd,
  1357. (unsigned long long)write_request->original_buffer_len,
  1358. (unsigned long long)write_request->cursor_cpy.len);
  1359. ssize_t written = send(
  1360. socket->io_handle.data.fd, write_request->cursor_cpy.ptr, write_request->cursor_cpy.len, NO_SIGNAL_SEND);
  1361. int errno_value = errno; /* Always cache errno before potential side-effect */
  1362. AWS_LOGF_TRACE(
  1363. AWS_LS_IO_SOCKET,
  1364. "id=%p fd=%d: send written size %d",
  1365. (void *)socket,
  1366. socket->io_handle.data.fd,
  1367. (int)written);
  1368. if (written < 0) {
  1369. if (errno_value == EAGAIN) {
  1370. AWS_LOGF_TRACE(
  1371. AWS_LS_IO_SOCKET, "id=%p fd=%d: returned would block", (void *)socket, socket->io_handle.data.fd);
  1372. break;
  1373. }
  1374. if (errno_value == EPIPE) {
  1375. AWS_LOGF_DEBUG(
  1376. AWS_LS_IO_SOCKET,
  1377. "id=%p fd=%d: already closed before write",
  1378. (void *)socket,
  1379. socket->io_handle.data.fd);
  1380. aws_error = AWS_IO_SOCKET_CLOSED;
  1381. aws_raise_error(aws_error);
  1382. purge = true;
  1383. break;
  1384. }
  1385. purge = true;
  1386. AWS_LOGF_DEBUG(
  1387. AWS_LS_IO_SOCKET,
  1388. "id=%p fd=%d: write error with error code %d",
  1389. (void *)socket,
  1390. socket->io_handle.data.fd,
  1391. errno_value);
  1392. aws_error = s_determine_socket_error(errno_value);
  1393. aws_raise_error(aws_error);
  1394. break;
  1395. }
  1396. size_t remaining_to_write = write_request->cursor_cpy.len;
  1397. aws_byte_cursor_advance(&write_request->cursor_cpy, (size_t)written);
  1398. AWS_LOGF_TRACE(
  1399. AWS_LS_IO_SOCKET,
  1400. "id=%p fd=%d: remaining write request to write %llu",
  1401. (void *)socket,
  1402. socket->io_handle.data.fd,
  1403. (unsigned long long)write_request->cursor_cpy.len);
  1404. if ((size_t)written == remaining_to_write) {
  1405. AWS_LOGF_TRACE(
  1406. AWS_LS_IO_SOCKET, "id=%p fd=%d: write request completed", (void *)socket, socket->io_handle.data.fd);
  1407. aws_linked_list_remove(node);
  1408. write_request->error_code = AWS_ERROR_SUCCESS;
  1409. aws_linked_list_push_back(&socket_impl->written_queue, node);
  1410. pushed_to_written_queue = true;
  1411. }
  1412. }
  1413. if (purge) {
  1414. while (!aws_linked_list_empty(&socket_impl->write_queue)) {
  1415. struct aws_linked_list_node *node = aws_linked_list_pop_front(&socket_impl->write_queue);
  1416. struct socket_write_request *write_request = AWS_CONTAINER_OF(node, struct socket_write_request, node);
  1417. /* If this fn was invoked directly from aws_socket_write(), don't invoke the error callback
  1418. * as the user will be able to rely on the return value from aws_socket_write() */
  1419. if (write_request == parent_request) {
  1420. parent_request_failed = true;
  1421. aws_mem_release(socket->allocator, write_request);
  1422. } else {
  1423. write_request->error_code = aws_error;
  1424. aws_linked_list_push_back(&socket_impl->written_queue, node);
  1425. pushed_to_written_queue = true;
  1426. }
  1427. }
  1428. }
  1429. if (pushed_to_written_queue && !socket_impl->written_task_scheduled) {
  1430. socket_impl->written_task_scheduled = true;
  1431. aws_task_init(&socket_impl->written_task, s_written_task, socket, "socket_written_task");
  1432. aws_event_loop_schedule_task_now(socket->event_loop, &socket_impl->written_task);
  1433. }
  1434. /* Only report error if aws_socket_write() invoked this function and its write_request failed */
  1435. if (!parent_request_failed) {
  1436. return AWS_OP_SUCCESS;
  1437. }
  1438. aws_raise_error(aws_error);
  1439. return AWS_OP_ERR;
  1440. }
  1441. static void s_on_socket_io_event(
  1442. struct aws_event_loop *event_loop,
  1443. struct aws_io_handle *handle,
  1444. int events,
  1445. void *user_data) {
  1446. (void)event_loop;
  1447. (void)handle;
  1448. struct aws_socket *socket = user_data;
  1449. struct posix_socket *socket_impl = socket->impl;
  1450. /* this is to handle a race condition when an error kicks off a cleanup, or the user decides
  1451. * to close the socket based on something they read (SSL validation failed for example).
  1452. * if clean_up happens when internal_refcount > 0, socket_impl is kept dangling but currently
  1453. * subscribed is set to false. */
  1454. aws_ref_count_acquire(&socket_impl->internal_refcount);
  1455. if (events & AWS_IO_EVENT_TYPE_REMOTE_HANG_UP || events & AWS_IO_EVENT_TYPE_CLOSED) {
  1456. aws_raise_error(AWS_IO_SOCKET_CLOSED);
  1457. AWS_LOGF_TRACE(AWS_LS_IO_SOCKET, "id=%p fd=%d: closed remotely", (void *)socket, socket->io_handle.data.fd);
  1458. if (socket->readable_fn) {
  1459. socket->readable_fn(socket, AWS_IO_SOCKET_CLOSED, socket->readable_user_data);
  1460. }
  1461. goto end_check;
  1462. }
  1463. if (socket_impl->currently_subscribed && events & AWS_IO_EVENT_TYPE_ERROR) {
  1464. int aws_error = aws_socket_get_error(socket);
  1465. aws_raise_error(aws_error);
  1466. AWS_LOGF_TRACE(
  1467. AWS_LS_IO_SOCKET, "id=%p fd=%d: error event occurred", (void *)socket, socket->io_handle.data.fd);
  1468. if (socket->readable_fn) {
  1469. socket->readable_fn(socket, aws_error, socket->readable_user_data);
  1470. }
  1471. goto end_check;
  1472. }
  1473. if (socket_impl->currently_subscribed && events & AWS_IO_EVENT_TYPE_READABLE) {
  1474. AWS_LOGF_TRACE(AWS_LS_IO_SOCKET, "id=%p fd=%d: is readable", (void *)socket, socket->io_handle.data.fd);
  1475. if (socket->readable_fn) {
  1476. socket->readable_fn(socket, AWS_OP_SUCCESS, socket->readable_user_data);
  1477. }
  1478. }
  1479. /* if socket closed in between these branches, the currently_subscribed will be false and socket_impl will not
  1480. * have been cleaned up, so this next branch is safe. */
  1481. if (socket_impl->currently_subscribed && events & AWS_IO_EVENT_TYPE_WRITABLE) {
  1482. AWS_LOGF_TRACE(AWS_LS_IO_SOCKET, "id=%p fd=%d: is writable", (void *)socket, socket->io_handle.data.fd);
  1483. s_process_socket_write_requests(socket, NULL);
  1484. }
  1485. end_check:
  1486. aws_ref_count_release(&socket_impl->internal_refcount);
  1487. }
  1488. int aws_socket_assign_to_event_loop(struct aws_socket *socket, struct aws_event_loop *event_loop) {
  1489. if (!socket->event_loop) {
  1490. AWS_LOGF_DEBUG(
  1491. AWS_LS_IO_SOCKET,
  1492. "id=%p fd=%d: assigning to event loop %p",
  1493. (void *)socket,
  1494. socket->io_handle.data.fd,
  1495. (void *)event_loop);
  1496. socket->event_loop = event_loop;
  1497. struct posix_socket *socket_impl = socket->impl;
  1498. socket_impl->currently_subscribed = true;
  1499. if (aws_event_loop_subscribe_to_io_events(
  1500. event_loop,
  1501. &socket->io_handle,
  1502. AWS_IO_EVENT_TYPE_WRITABLE | AWS_IO_EVENT_TYPE_READABLE,
  1503. s_on_socket_io_event,
  1504. socket)) {
  1505. AWS_LOGF_ERROR(
  1506. AWS_LS_IO_SOCKET,
  1507. "id=%p fd=%d: assigning to event loop %p failed with error %d",
  1508. (void *)socket,
  1509. socket->io_handle.data.fd,
  1510. (void *)event_loop,
  1511. aws_last_error());
  1512. socket_impl->currently_subscribed = false;
  1513. socket->event_loop = NULL;
  1514. return AWS_OP_ERR;
  1515. }
  1516. return AWS_OP_SUCCESS;
  1517. }
  1518. return aws_raise_error(AWS_IO_EVENT_LOOP_ALREADY_ASSIGNED);
  1519. }
  1520. struct aws_event_loop *aws_socket_get_event_loop(struct aws_socket *socket) {
  1521. return socket->event_loop;
  1522. }
  1523. int aws_socket_subscribe_to_readable_events(
  1524. struct aws_socket *socket,
  1525. aws_socket_on_readable_fn *on_readable,
  1526. void *user_data) {
  1527. AWS_LOGF_TRACE(
  1528. AWS_LS_IO_SOCKET, " id=%p fd=%d: subscribing to readable events", (void *)socket, socket->io_handle.data.fd);
  1529. if (!(socket->state & CONNECTED_READ)) {
  1530. AWS_LOGF_ERROR(
  1531. AWS_LS_IO_SOCKET,
  1532. "id=%p fd=%d: can't subscribe to readable events since the socket is not connected",
  1533. (void *)socket,
  1534. socket->io_handle.data.fd);
  1535. return aws_raise_error(AWS_IO_SOCKET_NOT_CONNECTED);
  1536. }
  1537. if (socket->readable_fn) {
  1538. AWS_LOGF_ERROR(
  1539. AWS_LS_IO_SOCKET,
  1540. "id=%p fd=%d: can't subscribe to readable events since it is already subscribed",
  1541. (void *)socket,
  1542. socket->io_handle.data.fd);
  1543. return aws_raise_error(AWS_ERROR_IO_ALREADY_SUBSCRIBED);
  1544. }
  1545. AWS_ASSERT(on_readable);
  1546. socket->readable_user_data = user_data;
  1547. socket->readable_fn = on_readable;
  1548. return AWS_OP_SUCCESS;
  1549. }
  1550. int aws_socket_read(struct aws_socket *socket, struct aws_byte_buf *buffer, size_t *amount_read) {
  1551. AWS_ASSERT(amount_read);
  1552. if (!aws_event_loop_thread_is_callers_thread(socket->event_loop)) {
  1553. AWS_LOGF_ERROR(
  1554. AWS_LS_IO_SOCKET,
  1555. "id=%p fd=%d: cannot read from a different thread than event loop %p",
  1556. (void *)socket,
  1557. socket->io_handle.data.fd,
  1558. (void *)socket->event_loop);
  1559. return aws_raise_error(AWS_ERROR_IO_EVENT_LOOP_THREAD_ONLY);
  1560. }
  1561. if (!(socket->state & CONNECTED_READ)) {
  1562. AWS_LOGF_ERROR(
  1563. AWS_LS_IO_SOCKET,
  1564. "id=%p fd=%d: cannot read because it is not connected",
  1565. (void *)socket,
  1566. socket->io_handle.data.fd);
  1567. return aws_raise_error(AWS_IO_SOCKET_NOT_CONNECTED);
  1568. }
  1569. ssize_t read_val = read(socket->io_handle.data.fd, buffer->buffer + buffer->len, buffer->capacity - buffer->len);
  1570. int errno_value = errno; /* Always cache errno before potential side-effect */
  1571. AWS_LOGF_TRACE(
  1572. AWS_LS_IO_SOCKET, "id=%p fd=%d: read of %d", (void *)socket, socket->io_handle.data.fd, (int)read_val);
  1573. if (read_val > 0) {
  1574. *amount_read = (size_t)read_val;
  1575. buffer->len += *amount_read;
  1576. return AWS_OP_SUCCESS;
  1577. }
  1578. /* read_val of 0 means EOF which we'll treat as AWS_IO_SOCKET_CLOSED */
  1579. if (read_val == 0) {
  1580. AWS_LOGF_INFO(
  1581. AWS_LS_IO_SOCKET, "id=%p fd=%d: zero read, socket is closed", (void *)socket, socket->io_handle.data.fd);
  1582. *amount_read = 0;
  1583. if (buffer->capacity - buffer->len > 0) {
  1584. return aws_raise_error(AWS_IO_SOCKET_CLOSED);
  1585. }
  1586. return AWS_OP_SUCCESS;
  1587. }
  1588. #if defined(EWOULDBLOCK)
  1589. if (errno_value == EAGAIN || errno_value == EWOULDBLOCK) {
  1590. #else
  1591. if (errno_value == EAGAIN) {
  1592. #endif
  1593. AWS_LOGF_TRACE(AWS_LS_IO_SOCKET, "id=%p fd=%d: read would block", (void *)socket, socket->io_handle.data.fd);
  1594. return aws_raise_error(AWS_IO_READ_WOULD_BLOCK);
  1595. }
  1596. if (errno_value == EPIPE || errno_value == ECONNRESET) {
  1597. AWS_LOGF_INFO(AWS_LS_IO_SOCKET, "id=%p fd=%d: socket is closed.", (void *)socket, socket->io_handle.data.fd);
  1598. return aws_raise_error(AWS_IO_SOCKET_CLOSED);
  1599. }
  1600. if (errno_value == ETIMEDOUT) {
  1601. AWS_LOGF_ERROR(AWS_LS_IO_SOCKET, "id=%p fd=%d: socket timed out.", (void *)socket, socket->io_handle.data.fd);
  1602. return aws_raise_error(AWS_IO_SOCKET_TIMEOUT);
  1603. }
  1604. AWS_LOGF_ERROR(
  1605. AWS_LS_IO_SOCKET,
  1606. "id=%p fd=%d: read failed with error: %s",
  1607. (void *)socket,
  1608. socket->io_handle.data.fd,
  1609. strerror(errno_value));
  1610. return aws_raise_error(s_determine_socket_error(errno_value));
  1611. }
  1612. int aws_socket_write(
  1613. struct aws_socket *socket,
  1614. const struct aws_byte_cursor *cursor,
  1615. aws_socket_on_write_completed_fn *written_fn,
  1616. void *user_data) {
  1617. if (!aws_event_loop_thread_is_callers_thread(socket->event_loop)) {
  1618. return aws_raise_error(AWS_ERROR_IO_EVENT_LOOP_THREAD_ONLY);
  1619. }
  1620. if (!(socket->state & CONNECTED_WRITE)) {
  1621. AWS_LOGF_ERROR(
  1622. AWS_LS_IO_SOCKET,
  1623. "id=%p fd=%d: cannot write to because it is not connected",
  1624. (void *)socket,
  1625. socket->io_handle.data.fd);
  1626. return aws_raise_error(AWS_IO_SOCKET_NOT_CONNECTED);
  1627. }
  1628. AWS_ASSERT(written_fn);
  1629. struct posix_socket *socket_impl = socket->impl;
  1630. struct socket_write_request *write_request =
  1631. aws_mem_calloc(socket->allocator, 1, sizeof(struct socket_write_request));
  1632. if (!write_request) {
  1633. return AWS_OP_ERR;
  1634. }
  1635. write_request->original_buffer_len = cursor->len;
  1636. write_request->written_fn = written_fn;
  1637. write_request->write_user_data = user_data;
  1638. write_request->cursor_cpy = *cursor;
  1639. aws_linked_list_push_back(&socket_impl->write_queue, &write_request->node);
  1640. return s_process_socket_write_requests(socket, write_request);
  1641. }
  1642. int aws_socket_get_error(struct aws_socket *socket) {
  1643. int connect_result;
  1644. socklen_t result_length = sizeof(connect_result);
  1645. if (getsockopt(socket->io_handle.data.fd, SOL_SOCKET, SO_ERROR, &connect_result, &result_length) < 0) {
  1646. return AWS_OP_ERR;
  1647. }
  1648. if (connect_result) {
  1649. return s_determine_socket_error(connect_result);
  1650. }
  1651. return AWS_OP_SUCCESS;
  1652. }
  1653. bool aws_socket_is_open(struct aws_socket *socket) {
  1654. return socket->io_handle.data.fd >= 0;
  1655. }