1234567891011121314151617181920212223242526272829303132333435363738394041424344454647484950515253545556575859606162636465666768697071727374757677787980818283848586878889909192939495969798991001011021031041051061071081091101111121131141151161171181191201211221231241251261271281291301311321331341351361371381391401411421431441451461471481491501511521531541551561571581591601611621631641651661671681691701711721731741751761771781791801811821831841851861871881891901911921931941951961971981992002012022032042052062072082092102112122132142152162172182192202212222232242252262272282292302312322332342352362372382392402412422432442452462472482492502512522532542552562572582592602612622632642652662672682692702712722732742752762772782792802812822832842852862872882892902912922932942952962972982993003013023033043053063073083093103113123133143153163173183193203213223233243253263273283293303313323333343353363373383393403413423433443453463473483493503513523533543553563573583593603613623633643653663673683693703713723733743753763773783793803813823833843853863873883893903913923933943953963973983994004014024034044054064074084094104114124134144154164174184194204214224234244254264274284294304314324334344354364374384394404414424434444454464474484494504514524534544554564574584594604614624634644654664674684694704714724734744754764774784794804814824834844854864874884894904914924934944954964974984995005015025035045055065075085095105115125135145155165175185195205215225235245255265275285295305315325335345355365375385395405415425435445455465475485495505515525535545555565575585595605615625635645655665675685695705715725735745755765775785795805815825835845855865875885895905915925935945955965975985996006016026036046056066076086096106116126136146156166176186196206216226236246256266276286296306316326336346356366376386396406416426436446456466476486496506516526536546556566576586596606616626636646656666676686696706716726736746756766776786796806816826836846856866876886896906916926936946956966976986997007017027037047057067077087097107117127137147157167177187197207217227237247257267277287297307317327337347357367377387397407417427437447457467477487497507517527537547557567577587597607617627637647657667677687697707717727737747757767777787797807817827837847857867877887897907917927937947957967977987998008018028038048058068078088098108118128138148158168178188198208218228238248258268278288298308318328338348358368378388398408418428438448458468478488498508518528538548558568578588598608618628638648658668678688698708718728738748758768778788798808818828838848858868878888898908918928938948958968978988999009019029039049059069079089099109119129139149159169179189199209219229239249259269279289299309319329339349359369379389399409419429439449459469479489499509519529539549559569579589599609619629639649659669679689699709719729739749759769779789799809819829839849859869879889899909919929939949959969979989991000100110021003100410051006100710081009101010111012101310141015101610171018101910201021102210231024102510261027102810291030103110321033103410351036103710381039104010411042104310441045104610471048104910501051105210531054105510561057105810591060106110621063106410651066106710681069107010711072107310741075107610771078107910801081108210831084108510861087108810891090109110921093109410951096109710981099110011011102110311041105110611071108110911101111111211131114111511161117111811191120112111221123112411251126112711281129113011311132113311341135113611371138113911401141114211431144114511461147114811491150115111521153115411551156115711581159116011611162116311641165116611671168116911701171117211731174117511761177117811791180118111821183118411851186118711881189119011911192119311941195119611971198119912001201120212031204120512061207120812091210121112121213121412151216121712181219122012211222122312241225122612271228122912301231123212331234123512361237123812391240124112421243124412451246124712481249125012511252125312541255125612571258125912601261126212631264126512661267126812691270127112721273127412751276127712781279128012811282128312841285128612871288128912901291129212931294129512961297129812991300130113021303130413051306130713081309131013111312131313141315131613171318131913201321132213231324132513261327132813291330133113321333133413351336133713381339134013411342134313441345134613471348134913501351135213531354135513561357135813591360136113621363136413651366136713681369137013711372137313741375137613771378137913801381138213831384138513861387138813891390139113921393139413951396139713981399140014011402140314041405140614071408140914101411141214131414141514161417141814191420142114221423142414251426142714281429143014311432143314341435143614371438143914401441144214431444144514461447144814491450145114521453145414551456145714581459146014611462146314641465146614671468146914701471147214731474147514761477147814791480148114821483148414851486148714881489149014911492149314941495149614971498149915001501150215031504150515061507150815091510151115121513151415151516151715181519152015211522152315241525152615271528152915301531153215331534153515361537153815391540154115421543154415451546154715481549155015511552155315541555155615571558155915601561156215631564156515661567156815691570157115721573157415751576157715781579158015811582158315841585158615871588158915901591159215931594159515961597159815991600160116021603160416051606160716081609161016111612161316141615161616171618161916201621162216231624162516261627162816291630163116321633163416351636163716381639164016411642164316441645164616471648164916501651165216531654165516561657165816591660166116621663166416651666166716681669167016711672167316741675167616771678167916801681168216831684168516861687168816891690169116921693169416951696169716981699170017011702170317041705170617071708170917101711171217131714171517161717171817191720172117221723172417251726172717281729173017311732173317341735173617371738173917401741174217431744174517461747174817491750175117521753175417551756175717581759176017611762176317641765176617671768176917701771177217731774177517761777177817791780178117821783178417851786178717881789179017911792179317941795179617971798179918001801180218031804180518061807180818091810181118121813181418151816181718181819182018211822182318241825182618271828182918301831183218331834183518361837183818391840184118421843184418451846184718481849185018511852185318541855185618571858185918601861186218631864186518661867186818691870187118721873187418751876187718781879188018811882188318841885188618871888188918901891189218931894189518961897189818991900190119021903190419051906190719081909 |
- /**
- * Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved.
- * SPDX-License-Identifier: Apache-2.0.
- */
- #include <aws/io/socket.h>
- #include <aws/common/clock.h>
- #include <aws/common/condition_variable.h>
- #include <aws/common/mutex.h>
- #include <aws/common/string.h>
- #include <aws/io/event_loop.h>
- #include <aws/io/logging.h>
- #include <arpa/inet.h>
- #include <aws/io/io.h>
- #include <errno.h>
- #include <fcntl.h>
- #include <inttypes.h>
- #include <netinet/tcp.h>
- #include <sys/socket.h>
- #include <sys/types.h>
- #include <unistd.h>
- /*
- * On OsX, suppress NoPipe signals via flags to setsockopt()
- * On Linux, suppress NoPipe signals via flags to send()
- */
- #if defined(__MACH__)
- # define NO_SIGNAL_SOCK_OPT SO_NOSIGPIPE
- # define NO_SIGNAL_SEND 0
- # define TCP_KEEPIDLE TCP_KEEPALIVE
- #else
- # define NO_SIGNAL_SEND MSG_NOSIGNAL
- #endif
- /* This isn't defined on ancient linux distros (breaking the builds).
- * However, if this is a prebuild, we purposely build on an ancient system, but
- * we want the kernel calls to still be the same as a modern build since that's likely the target of the application
- * calling this code. Just define this if it isn't there already. GlibC and the kernel don't really care how the flag
- * gets passed as long as it does.
- */
- #ifndef O_CLOEXEC
- # define O_CLOEXEC 02000000
- #endif
- #ifdef USE_VSOCK
- # if defined(__linux__) && defined(AF_VSOCK)
- # include <linux/vm_sockets.h>
- # else
- # error "USE_VSOCK not supported on current platform"
- # endif
- #endif
- /* other than CONNECTED_READ | CONNECTED_WRITE
- * a socket is only in one of these states at a time. */
- enum socket_state {
- INIT = 0x01,
- CONNECTING = 0x02,
- CONNECTED_READ = 0x04,
- CONNECTED_WRITE = 0x08,
- BOUND = 0x10,
- LISTENING = 0x20,
- TIMEDOUT = 0x40,
- ERROR = 0x80,
- CLOSED,
- };
- static int s_convert_domain(enum aws_socket_domain domain) {
- switch (domain) {
- case AWS_SOCKET_IPV4:
- return AF_INET;
- case AWS_SOCKET_IPV6:
- return AF_INET6;
- case AWS_SOCKET_LOCAL:
- return AF_UNIX;
- #ifdef USE_VSOCK
- case AWS_SOCKET_VSOCK:
- return AF_VSOCK;
- #endif
- default:
- AWS_ASSERT(0);
- return AF_INET;
- }
- }
- static int s_convert_type(enum aws_socket_type type) {
- switch (type) {
- case AWS_SOCKET_STREAM:
- return SOCK_STREAM;
- case AWS_SOCKET_DGRAM:
- return SOCK_DGRAM;
- default:
- AWS_ASSERT(0);
- return SOCK_STREAM;
- }
- }
- static int s_determine_socket_error(int error) {
- switch (error) {
- case ECONNREFUSED:
- return AWS_IO_SOCKET_CONNECTION_REFUSED;
- case ECONNRESET:
- return AWS_IO_SOCKET_CLOSED;
- case ETIMEDOUT:
- return AWS_IO_SOCKET_TIMEOUT;
- case EHOSTUNREACH:
- case ENETUNREACH:
- return AWS_IO_SOCKET_NO_ROUTE_TO_HOST;
- case EADDRNOTAVAIL:
- return AWS_IO_SOCKET_INVALID_ADDRESS;
- case ENETDOWN:
- return AWS_IO_SOCKET_NETWORK_DOWN;
- case ECONNABORTED:
- return AWS_IO_SOCKET_CONNECT_ABORTED;
- case EADDRINUSE:
- return AWS_IO_SOCKET_ADDRESS_IN_USE;
- case ENOBUFS:
- case ENOMEM:
- return AWS_ERROR_OOM;
- case EAGAIN:
- return AWS_IO_READ_WOULD_BLOCK;
- case EMFILE:
- case ENFILE:
- return AWS_ERROR_MAX_FDS_EXCEEDED;
- case ENOENT:
- case EINVAL:
- return AWS_ERROR_FILE_INVALID_PATH;
- case EAFNOSUPPORT:
- return AWS_IO_SOCKET_UNSUPPORTED_ADDRESS_FAMILY;
- case EACCES:
- return AWS_ERROR_NO_PERMISSION;
- default:
- return AWS_IO_SOCKET_NOT_CONNECTED;
- }
- }
- static int s_create_socket(struct aws_socket *sock, const struct aws_socket_options *options) {
- int fd = socket(s_convert_domain(options->domain), s_convert_type(options->type), 0);
- int errno_value = errno; /* Always cache errno before potential side-effect */
- AWS_LOGF_DEBUG(
- AWS_LS_IO_SOCKET,
- "id=%p fd=%d: initializing with domain %d and type %d",
- (void *)sock,
- fd,
- options->domain,
- options->type);
- if (fd != -1) {
- int flags = fcntl(fd, F_GETFL, 0);
- flags |= O_NONBLOCK | O_CLOEXEC;
- int success = fcntl(fd, F_SETFL, flags);
- (void)success;
- sock->io_handle.data.fd = fd;
- sock->io_handle.additional_data = NULL;
- return aws_socket_set_options(sock, options);
- }
- int aws_error = s_determine_socket_error(errno_value);
- return aws_raise_error(aws_error);
- }
- struct posix_socket_connect_args {
- struct aws_task task;
- struct aws_allocator *allocator;
- struct aws_socket *socket;
- };
- struct posix_socket {
- struct aws_linked_list write_queue;
- struct aws_linked_list written_queue;
- struct aws_task written_task;
- struct posix_socket_connect_args *connect_args;
- /* Note that only the posix_socket impl part is refcounted.
- * The public aws_socket can be a stack variable and cleaned up synchronously
- * (by blocking until the event-loop cleans up the impl part).
- * In hindsight, aws_socket should have been heap-allocated and refcounted, but alas */
- struct aws_ref_count internal_refcount;
- struct aws_allocator *allocator;
- bool written_task_scheduled;
- bool currently_subscribed;
- bool continue_accept;
- bool *close_happened;
- };
- static void s_socket_destroy_impl(void *user_data) {
- struct posix_socket *socket_impl = user_data;
- aws_mem_release(socket_impl->allocator, socket_impl);
- }
- static int s_socket_init(
- struct aws_socket *socket,
- struct aws_allocator *alloc,
- const struct aws_socket_options *options,
- int existing_socket_fd) {
- AWS_ASSERT(options);
- AWS_ZERO_STRUCT(*socket);
- struct posix_socket *posix_socket = aws_mem_calloc(alloc, 1, sizeof(struct posix_socket));
- if (!posix_socket) {
- socket->impl = NULL;
- return AWS_OP_ERR;
- }
- socket->allocator = alloc;
- socket->io_handle.data.fd = -1;
- socket->state = INIT;
- socket->options = *options;
- if (existing_socket_fd < 0) {
- int err = s_create_socket(socket, options);
- if (err) {
- aws_mem_release(alloc, posix_socket);
- socket->impl = NULL;
- return AWS_OP_ERR;
- }
- } else {
- socket->io_handle = (struct aws_io_handle){
- .data = {.fd = existing_socket_fd},
- .additional_data = NULL,
- };
- aws_socket_set_options(socket, options);
- }
- aws_linked_list_init(&posix_socket->write_queue);
- aws_linked_list_init(&posix_socket->written_queue);
- posix_socket->currently_subscribed = false;
- posix_socket->continue_accept = false;
- aws_ref_count_init(&posix_socket->internal_refcount, posix_socket, s_socket_destroy_impl);
- posix_socket->allocator = alloc;
- posix_socket->connect_args = NULL;
- posix_socket->close_happened = NULL;
- socket->impl = posix_socket;
- return AWS_OP_SUCCESS;
- }
- int aws_socket_init(struct aws_socket *socket, struct aws_allocator *alloc, const struct aws_socket_options *options) {
- AWS_ASSERT(options);
- return s_socket_init(socket, alloc, options, -1);
- }
- void aws_socket_clean_up(struct aws_socket *socket) {
- if (!socket->impl) {
- /* protect from double clean */
- return;
- }
- int fd_for_logging = socket->io_handle.data.fd; /* socket's fd gets reset before final log */
- (void)fd_for_logging;
- if (aws_socket_is_open(socket)) {
- AWS_LOGF_DEBUG(AWS_LS_IO_SOCKET, "id=%p fd=%d: is still open, closing...", (void *)socket, fd_for_logging);
- aws_socket_close(socket);
- }
- struct posix_socket *socket_impl = socket->impl;
- if (aws_ref_count_release(&socket_impl->internal_refcount) != 0) {
- AWS_LOGF_DEBUG(
- AWS_LS_IO_SOCKET,
- "id=%p fd=%d: is still pending io letting it dangle and cleaning up later.",
- (void *)socket,
- fd_for_logging);
- }
- AWS_ZERO_STRUCT(*socket);
- socket->io_handle.data.fd = -1;
- }
- /* Update socket->local_endpoint based on the results of getsockname() */
- static int s_update_local_endpoint(struct aws_socket *socket) {
- struct aws_socket_endpoint tmp_endpoint;
- AWS_ZERO_STRUCT(tmp_endpoint);
- struct sockaddr_storage address;
- AWS_ZERO_STRUCT(address);
- socklen_t address_size = sizeof(address);
- if (getsockname(socket->io_handle.data.fd, (struct sockaddr *)&address, &address_size) != 0) {
- int errno_value = errno; /* Always cache errno before potential side-effect */
- AWS_LOGF_ERROR(
- AWS_LS_IO_SOCKET,
- "id=%p fd=%d: getsockname() failed with error %d",
- (void *)socket,
- socket->io_handle.data.fd,
- errno_value);
- int aws_error = s_determine_socket_error(errno_value);
- return aws_raise_error(aws_error);
- }
- if (address.ss_family == AF_INET) {
- struct sockaddr_in *s = (struct sockaddr_in *)&address;
- tmp_endpoint.port = ntohs(s->sin_port);
- if (inet_ntop(AF_INET, &s->sin_addr, tmp_endpoint.address, sizeof(tmp_endpoint.address)) == NULL) {
- int errno_value = errno; /* Always cache errno before potential side-effect */
- AWS_LOGF_ERROR(
- AWS_LS_IO_SOCKET,
- "id=%p fd=%d: inet_ntop() failed with error %d",
- (void *)socket,
- socket->io_handle.data.fd,
- errno_value);
- int aws_error = s_determine_socket_error(errno_value);
- return aws_raise_error(aws_error);
- }
- } else if (address.ss_family == AF_INET6) {
- struct sockaddr_in6 *s = (struct sockaddr_in6 *)&address;
- tmp_endpoint.port = ntohs(s->sin6_port);
- if (inet_ntop(AF_INET6, &s->sin6_addr, tmp_endpoint.address, sizeof(tmp_endpoint.address)) == NULL) {
- int errno_value = errno; /* Always cache errno before potential side-effect */
- AWS_LOGF_ERROR(
- AWS_LS_IO_SOCKET,
- "id=%p fd=%d: inet_ntop() failed with error %d",
- (void *)socket,
- socket->io_handle.data.fd,
- errno_value);
- int aws_error = s_determine_socket_error(errno_value);
- return aws_raise_error(aws_error);
- }
- } else if (address.ss_family == AF_UNIX) {
- struct sockaddr_un *s = (struct sockaddr_un *)&address;
- /* Ensure there's a null-terminator.
- * On some platforms it may be missing when the path gets very long. See:
- * https://man7.org/linux/man-pages/man7/unix.7.html#BUGS
- * But let's keep it simple, and not deal with that madness until someone demands it. */
- size_t sun_len;
- if (aws_secure_strlen(s->sun_path, sizeof(tmp_endpoint.address), &sun_len)) {
- AWS_LOGF_ERROR(
- AWS_LS_IO_SOCKET,
- "id=%p fd=%d: UNIX domain socket name is too long",
- (void *)socket,
- socket->io_handle.data.fd);
- return aws_raise_error(AWS_IO_SOCKET_INVALID_ADDRESS);
- }
- memcpy(tmp_endpoint.address, s->sun_path, sun_len);
- #if USE_VSOCK
- } else if (address.ss_family == AF_VSOCK) {
- struct sockaddr_vm *s = (struct sockaddr_vm *)&address;
- /* VSOCK port is 32bit, but aws_socket_endpoint.port is only 16bit.
- * Hopefully this isn't an issue, since users can only pass in 16bit values.
- * But if it becomes an issue, we'll need to make aws_socket_endpoint more flexible */
- if (s->svm_port > UINT16_MAX) {
- AWS_LOGF_ERROR(
- AWS_LS_IO_SOCKET,
- "id=%p fd=%d: aws_socket_endpoint can't deal with VSOCK port > UINT16_MAX",
- (void *)socket,
- socket->io_handle.data.fd);
- return aws_raise_error(AWS_IO_SOCKET_INVALID_ADDRESS);
- }
- tmp_endpoint.port = (uint16_t)s->svm_port;
- snprintf(tmp_endpoint.address, sizeof(tmp_endpoint.address), "%" PRIu32, s->svm_cid);
- return AWS_OP_SUCCESS;
- #endif /* USE_VSOCK */
- } else {
- AWS_ASSERT(0);
- return aws_raise_error(AWS_IO_SOCKET_UNSUPPORTED_ADDRESS_FAMILY);
- }
- socket->local_endpoint = tmp_endpoint;
- return AWS_OP_SUCCESS;
- }
- static void s_on_connection_error(struct aws_socket *socket, int error);
- static int s_on_connection_success(struct aws_socket *socket) {
- struct aws_event_loop *event_loop = socket->event_loop;
- struct posix_socket *socket_impl = socket->impl;
- if (socket_impl->currently_subscribed) {
- aws_event_loop_unsubscribe_from_io_events(socket->event_loop, &socket->io_handle);
- socket_impl->currently_subscribed = false;
- }
- socket->event_loop = NULL;
- int connect_result;
- socklen_t result_length = sizeof(connect_result);
- if (getsockopt(socket->io_handle.data.fd, SOL_SOCKET, SO_ERROR, &connect_result, &result_length) < 0) {
- int errno_value = errno; /* Always cache errno before potential side-effect */
- AWS_LOGF_ERROR(
- AWS_LS_IO_SOCKET,
- "id=%p fd=%d: failed to determine connection error %d",
- (void *)socket,
- socket->io_handle.data.fd,
- errno_value);
- int aws_error = s_determine_socket_error(errno_value);
- aws_raise_error(aws_error);
- s_on_connection_error(socket, aws_error);
- return AWS_OP_ERR;
- }
- if (connect_result) {
- AWS_LOGF_ERROR(
- AWS_LS_IO_SOCKET,
- "id=%p fd=%d: connection error %d",
- (void *)socket,
- socket->io_handle.data.fd,
- connect_result);
- int aws_error = s_determine_socket_error(connect_result);
- aws_raise_error(aws_error);
- s_on_connection_error(socket, aws_error);
- return AWS_OP_ERR;
- }
- AWS_LOGF_INFO(AWS_LS_IO_SOCKET, "id=%p fd=%d: connection success", (void *)socket, socket->io_handle.data.fd);
- if (s_update_local_endpoint(socket)) {
- s_on_connection_error(socket, aws_last_error());
- return AWS_OP_ERR;
- }
- socket->state = CONNECTED_WRITE | CONNECTED_READ;
- if (aws_socket_assign_to_event_loop(socket, event_loop)) {
- AWS_LOGF_ERROR(
- AWS_LS_IO_SOCKET,
- "id=%p fd=%d: assignment to event loop %p failed with error %d",
- (void *)socket,
- socket->io_handle.data.fd,
- (void *)event_loop,
- aws_last_error());
- s_on_connection_error(socket, aws_last_error());
- return AWS_OP_ERR;
- }
- socket->connection_result_fn(socket, AWS_ERROR_SUCCESS, socket->connect_accept_user_data);
- return AWS_OP_SUCCESS;
- }
- static void s_on_connection_error(struct aws_socket *socket, int error) {
- socket->state = ERROR;
- AWS_LOGF_ERROR(AWS_LS_IO_SOCKET, "id=%p fd=%d: connection failure", (void *)socket, socket->io_handle.data.fd);
- if (socket->connection_result_fn) {
- socket->connection_result_fn(socket, error, socket->connect_accept_user_data);
- } else if (socket->accept_result_fn) {
- socket->accept_result_fn(socket, error, NULL, socket->connect_accept_user_data);
- }
- }
- /* the next two callbacks compete based on which one runs first. if s_socket_connect_event
- * comes back first, then we set socket_args->socket = NULL and continue on with the connection.
- * if s_handle_socket_timeout() runs first, is sees socket_args->socket is NULL and just cleans up its memory.
- * s_handle_socket_timeout() will always run so the memory for socket_connect_args is always cleaned up there. */
- static void s_socket_connect_event(
- struct aws_event_loop *event_loop,
- struct aws_io_handle *handle,
- int events,
- void *user_data) {
- (void)event_loop;
- (void)handle;
- struct posix_socket_connect_args *socket_args = (struct posix_socket_connect_args *)user_data;
- AWS_LOGF_TRACE(AWS_LS_IO_SOCKET, "fd=%d: connection activity handler triggered ", handle->data.fd);
- if (socket_args->socket) {
- AWS_LOGF_TRACE(
- AWS_LS_IO_SOCKET,
- "id=%p fd=%d: has not timed out yet proceeding with connection.",
- (void *)socket_args->socket,
- handle->data.fd);
- struct posix_socket *socket_impl = socket_args->socket->impl;
- if (!(events & AWS_IO_EVENT_TYPE_ERROR || events & AWS_IO_EVENT_TYPE_CLOSED) &&
- (events & AWS_IO_EVENT_TYPE_READABLE || events & AWS_IO_EVENT_TYPE_WRITABLE)) {
- struct aws_socket *socket = socket_args->socket;
- socket_args->socket = NULL;
- socket_impl->connect_args = NULL;
- s_on_connection_success(socket);
- return;
- }
- int aws_error = aws_socket_get_error(socket_args->socket);
- /* we'll get another notification. */
- if (aws_error == AWS_IO_READ_WOULD_BLOCK) {
- AWS_LOGF_TRACE(
- AWS_LS_IO_SOCKET,
- "id=%p fd=%d: spurious event, waiting for another notification.",
- (void *)socket_args->socket,
- handle->data.fd);
- return;
- }
- struct aws_socket *socket = socket_args->socket;
- socket_args->socket = NULL;
- socket_impl->connect_args = NULL;
- aws_raise_error(aws_error);
- s_on_connection_error(socket, aws_error);
- }
- }
- static void s_handle_socket_timeout(struct aws_task *task, void *args, aws_task_status status) {
- (void)task;
- (void)status;
- struct posix_socket_connect_args *socket_args = args;
- AWS_LOGF_TRACE(AWS_LS_IO_SOCKET, "task_id=%p: timeout task triggered, evaluating timeouts.", (void *)task);
- /* successful connection will have nulled out connect_args->socket */
- if (socket_args->socket) {
- AWS_LOGF_ERROR(
- AWS_LS_IO_SOCKET,
- "id=%p fd=%d: timed out, shutting down.",
- (void *)socket_args->socket,
- socket_args->socket->io_handle.data.fd);
- socket_args->socket->state = TIMEDOUT;
- int error_code = AWS_IO_SOCKET_TIMEOUT;
- if (status == AWS_TASK_STATUS_RUN_READY) {
- aws_event_loop_unsubscribe_from_io_events(socket_args->socket->event_loop, &socket_args->socket->io_handle);
- } else {
- error_code = AWS_IO_EVENT_LOOP_SHUTDOWN;
- aws_event_loop_free_io_event_resources(socket_args->socket->event_loop, &socket_args->socket->io_handle);
- }
- socket_args->socket->event_loop = NULL;
- struct posix_socket *socket_impl = socket_args->socket->impl;
- socket_impl->currently_subscribed = false;
- aws_raise_error(error_code);
- struct aws_socket *socket = socket_args->socket;
- /*socket close sets socket_args->socket to NULL and
- * socket_impl->connect_args to NULL. */
- aws_socket_close(socket);
- s_on_connection_error(socket, error_code);
- }
- aws_mem_release(socket_args->allocator, socket_args);
- }
- /* this is used simply for moving a connect_success callback when the connect finished immediately
- * (like for unix domain sockets) into the event loop's thread. Also note, in that case there was no
- * timeout task scheduled, so in this case the socket_args are cleaned up. */
- static void s_run_connect_success(struct aws_task *task, void *arg, enum aws_task_status status) {
- (void)task;
- struct posix_socket_connect_args *socket_args = arg;
- if (socket_args->socket) {
- struct posix_socket *socket_impl = socket_args->socket->impl;
- if (status == AWS_TASK_STATUS_RUN_READY) {
- s_on_connection_success(socket_args->socket);
- } else {
- aws_raise_error(AWS_IO_SOCKET_CONNECT_ABORTED);
- socket_args->socket->event_loop = NULL;
- s_on_connection_error(socket_args->socket, AWS_IO_SOCKET_CONNECT_ABORTED);
- }
- socket_impl->connect_args = NULL;
- }
- aws_mem_release(socket_args->allocator, socket_args);
- }
- static inline int s_convert_pton_error(int pton_code, int errno_value) {
- if (pton_code == 0) {
- return AWS_IO_SOCKET_INVALID_ADDRESS;
- }
- return s_determine_socket_error(errno_value);
- }
- struct socket_address {
- union sock_addr_types {
- struct sockaddr_in addr_in;
- struct sockaddr_in6 addr_in6;
- struct sockaddr_un un_addr;
- #ifdef USE_VSOCK
- struct sockaddr_vm vm_addr;
- #endif
- } sock_addr_types;
- };
- #ifdef USE_VSOCK
- /** Convert a string to a VSOCK CID. Respects the calling convetion of inet_pton:
- * 0 on error, 1 on success. */
- static int parse_cid(const char *cid_str, unsigned int *value) {
- if (cid_str == NULL || value == NULL) {
- errno = EINVAL;
- return 0;
- }
- /* strtoll returns 0 as both error and correct value */
- errno = 0;
- /* unsigned long long to handle edge cases in convention explicitly */
- long long cid = strtoll(cid_str, NULL, 10);
- if (errno != 0) {
- return 0;
- }
- /* -1U means any, so it's a valid value, but it needs to be converted to
- * unsigned int. */
- if (cid == -1) {
- *value = VMADDR_CID_ANY;
- return 1;
- }
- if (cid < 0 || cid > UINT_MAX) {
- errno = ERANGE;
- return 0;
- }
- /* cast is safe here, edge cases already checked */
- *value = (unsigned int)cid;
- return 1;
- }
- #endif
- int aws_socket_connect(
- struct aws_socket *socket,
- const struct aws_socket_endpoint *remote_endpoint,
- struct aws_event_loop *event_loop,
- aws_socket_on_connection_result_fn *on_connection_result,
- void *user_data) {
- AWS_ASSERT(event_loop);
- AWS_ASSERT(!socket->event_loop);
- AWS_LOGF_DEBUG(AWS_LS_IO_SOCKET, "id=%p fd=%d: beginning connect.", (void *)socket, socket->io_handle.data.fd);
- if (socket->event_loop) {
- return aws_raise_error(AWS_IO_EVENT_LOOP_ALREADY_ASSIGNED);
- }
- if (socket->options.type != AWS_SOCKET_DGRAM) {
- AWS_ASSERT(on_connection_result);
- if (socket->state != INIT) {
- return aws_raise_error(AWS_IO_SOCKET_ILLEGAL_OPERATION_FOR_STATE);
- }
- } else { /* UDP socket */
- /* UDP sockets jump to CONNECT_READ if bind is called first */
- if (socket->state != CONNECTED_READ && socket->state != INIT) {
- return aws_raise_error(AWS_IO_SOCKET_ILLEGAL_OPERATION_FOR_STATE);
- }
- }
- size_t address_strlen;
- if (aws_secure_strlen(remote_endpoint->address, AWS_ADDRESS_MAX_LEN, &address_strlen)) {
- return AWS_OP_ERR;
- }
- struct socket_address address;
- AWS_ZERO_STRUCT(address);
- socklen_t sock_size = 0;
- int pton_err = 1;
- if (socket->options.domain == AWS_SOCKET_IPV4) {
- pton_err = inet_pton(AF_INET, remote_endpoint->address, &address.sock_addr_types.addr_in.sin_addr);
- address.sock_addr_types.addr_in.sin_port = htons(remote_endpoint->port);
- address.sock_addr_types.addr_in.sin_family = AF_INET;
- sock_size = sizeof(address.sock_addr_types.addr_in);
- } else if (socket->options.domain == AWS_SOCKET_IPV6) {
- pton_err = inet_pton(AF_INET6, remote_endpoint->address, &address.sock_addr_types.addr_in6.sin6_addr);
- address.sock_addr_types.addr_in6.sin6_port = htons(remote_endpoint->port);
- address.sock_addr_types.addr_in6.sin6_family = AF_INET6;
- sock_size = sizeof(address.sock_addr_types.addr_in6);
- } else if (socket->options.domain == AWS_SOCKET_LOCAL) {
- address.sock_addr_types.un_addr.sun_family = AF_UNIX;
- strncpy(address.sock_addr_types.un_addr.sun_path, remote_endpoint->address, AWS_ADDRESS_MAX_LEN);
- sock_size = sizeof(address.sock_addr_types.un_addr);
- #ifdef USE_VSOCK
- } else if (socket->options.domain == AWS_SOCKET_VSOCK) {
- pton_err = parse_cid(remote_endpoint->address, &address.sock_addr_types.vm_addr.svm_cid);
- address.sock_addr_types.vm_addr.svm_family = AF_VSOCK;
- address.sock_addr_types.vm_addr.svm_port = (unsigned int)remote_endpoint->port;
- sock_size = sizeof(address.sock_addr_types.vm_addr);
- #endif
- } else {
- AWS_ASSERT(0);
- return aws_raise_error(AWS_IO_SOCKET_UNSUPPORTED_ADDRESS_FAMILY);
- }
- if (pton_err != 1) {
- int errno_value = errno; /* Always cache errno before potential side-effect */
- AWS_LOGF_ERROR(
- AWS_LS_IO_SOCKET,
- "id=%p fd=%d: failed to parse address %s:%d.",
- (void *)socket,
- socket->io_handle.data.fd,
- remote_endpoint->address,
- (int)remote_endpoint->port);
- return aws_raise_error(s_convert_pton_error(pton_err, errno_value));
- }
- AWS_LOGF_DEBUG(
- AWS_LS_IO_SOCKET,
- "id=%p fd=%d: connecting to endpoint %s:%d.",
- (void *)socket,
- socket->io_handle.data.fd,
- remote_endpoint->address,
- (int)remote_endpoint->port);
- socket->state = CONNECTING;
- socket->remote_endpoint = *remote_endpoint;
- socket->connect_accept_user_data = user_data;
- socket->connection_result_fn = on_connection_result;
- struct posix_socket *socket_impl = socket->impl;
- socket_impl->connect_args = aws_mem_calloc(socket->allocator, 1, sizeof(struct posix_socket_connect_args));
- if (!socket_impl->connect_args) {
- return AWS_OP_ERR;
- }
- socket_impl->connect_args->socket = socket;
- socket_impl->connect_args->allocator = socket->allocator;
- socket_impl->connect_args->task.fn = s_handle_socket_timeout;
- socket_impl->connect_args->task.arg = socket_impl->connect_args;
- int error_code = connect(socket->io_handle.data.fd, (struct sockaddr *)&address.sock_addr_types, sock_size);
- socket->event_loop = event_loop;
- if (!error_code) {
- AWS_LOGF_INFO(
- AWS_LS_IO_SOCKET,
- "id=%p fd=%d: connected immediately, not scheduling timeout.",
- (void *)socket,
- socket->io_handle.data.fd);
- socket_impl->connect_args->task.fn = s_run_connect_success;
- /* the subscription for IO will happen once we setup the connection in the task. Since we already
- * know the connection succeeded, we don't need to register for events yet. */
- aws_event_loop_schedule_task_now(event_loop, &socket_impl->connect_args->task);
- }
- if (error_code) {
- int errno_value = errno; /* Always cache errno before potential side-effect */
- if (errno_value == EINPROGRESS || errno_value == EALREADY) {
- AWS_LOGF_TRACE(
- AWS_LS_IO_SOCKET,
- "id=%p fd=%d: connection pending waiting on event-loop notification or timeout.",
- (void *)socket,
- socket->io_handle.data.fd);
- /* cache the timeout task; it is possible for the IO subscription to come back virtually immediately
- * and null out the connect args */
- struct aws_task *timeout_task = &socket_impl->connect_args->task;
- socket_impl->currently_subscribed = true;
- /* This event is for when the connection finishes. (the fd will flip writable). */
- if (aws_event_loop_subscribe_to_io_events(
- event_loop,
- &socket->io_handle,
- AWS_IO_EVENT_TYPE_WRITABLE,
- s_socket_connect_event,
- socket_impl->connect_args)) {
- AWS_LOGF_ERROR(
- AWS_LS_IO_SOCKET,
- "id=%p fd=%d: failed to register with event-loop %p.",
- (void *)socket,
- socket->io_handle.data.fd,
- (void *)event_loop);
- socket_impl->currently_subscribed = false;
- socket->event_loop = NULL;
- goto err_clean_up;
- }
- /* schedule a task to run at the connect timeout interval, if this task runs before the connect
- * happens, we consider that a timeout. */
- uint64_t timeout = 0;
- aws_event_loop_current_clock_time(event_loop, &timeout);
- timeout += aws_timestamp_convert(
- socket->options.connect_timeout_ms, AWS_TIMESTAMP_MILLIS, AWS_TIMESTAMP_NANOS, NULL);
- AWS_LOGF_TRACE(
- AWS_LS_IO_SOCKET,
- "id=%p fd=%d: scheduling timeout task for %llu.",
- (void *)socket,
- socket->io_handle.data.fd,
- (unsigned long long)timeout);
- aws_event_loop_schedule_task_future(event_loop, timeout_task, timeout);
- } else {
- AWS_LOGF_ERROR(
- AWS_LS_IO_SOCKET,
- "id=%p fd=%d: connect failed with error code %d.",
- (void *)socket,
- socket->io_handle.data.fd,
- errno_value);
- int aws_error = s_determine_socket_error(errno_value);
- aws_raise_error(aws_error);
- socket->event_loop = NULL;
- socket_impl->currently_subscribed = false;
- goto err_clean_up;
- }
- }
- return AWS_OP_SUCCESS;
- err_clean_up:
- aws_mem_release(socket->allocator, socket_impl->connect_args);
- socket_impl->connect_args = NULL;
- return AWS_OP_ERR;
- }
- int aws_socket_bind(struct aws_socket *socket, const struct aws_socket_endpoint *local_endpoint) {
- if (socket->state != INIT) {
- AWS_LOGF_ERROR(
- AWS_LS_IO_SOCKET,
- "id=%p fd=%d: invalid state for bind operation.",
- (void *)socket,
- socket->io_handle.data.fd);
- return aws_raise_error(AWS_IO_SOCKET_ILLEGAL_OPERATION_FOR_STATE);
- }
- size_t address_strlen;
- if (aws_secure_strlen(local_endpoint->address, AWS_ADDRESS_MAX_LEN, &address_strlen)) {
- return AWS_OP_ERR;
- }
- AWS_LOGF_INFO(
- AWS_LS_IO_SOCKET,
- "id=%p fd=%d: binding to %s:%d.",
- (void *)socket,
- socket->io_handle.data.fd,
- local_endpoint->address,
- (int)local_endpoint->port);
- struct socket_address address;
- AWS_ZERO_STRUCT(address);
- socklen_t sock_size = 0;
- int pton_err = 1;
- if (socket->options.domain == AWS_SOCKET_IPV4) {
- pton_err = inet_pton(AF_INET, local_endpoint->address, &address.sock_addr_types.addr_in.sin_addr);
- address.sock_addr_types.addr_in.sin_port = htons(local_endpoint->port);
- address.sock_addr_types.addr_in.sin_family = AF_INET;
- sock_size = sizeof(address.sock_addr_types.addr_in);
- } else if (socket->options.domain == AWS_SOCKET_IPV6) {
- pton_err = inet_pton(AF_INET6, local_endpoint->address, &address.sock_addr_types.addr_in6.sin6_addr);
- address.sock_addr_types.addr_in6.sin6_port = htons(local_endpoint->port);
- address.sock_addr_types.addr_in6.sin6_family = AF_INET6;
- sock_size = sizeof(address.sock_addr_types.addr_in6);
- } else if (socket->options.domain == AWS_SOCKET_LOCAL) {
- address.sock_addr_types.un_addr.sun_family = AF_UNIX;
- strncpy(address.sock_addr_types.un_addr.sun_path, local_endpoint->address, AWS_ADDRESS_MAX_LEN);
- sock_size = sizeof(address.sock_addr_types.un_addr);
- #ifdef USE_VSOCK
- } else if (socket->options.domain == AWS_SOCKET_VSOCK) {
- pton_err = parse_cid(local_endpoint->address, &address.sock_addr_types.vm_addr.svm_cid);
- address.sock_addr_types.vm_addr.svm_family = AF_VSOCK;
- address.sock_addr_types.vm_addr.svm_port = (unsigned int)local_endpoint->port;
- sock_size = sizeof(address.sock_addr_types.vm_addr);
- #endif
- } else {
- AWS_ASSERT(0);
- return aws_raise_error(AWS_IO_SOCKET_UNSUPPORTED_ADDRESS_FAMILY);
- }
- if (pton_err != 1) {
- int errno_value = errno; /* Always cache errno before potential side-effect */
- AWS_LOGF_ERROR(
- AWS_LS_IO_SOCKET,
- "id=%p fd=%d: failed to parse address %s:%d.",
- (void *)socket,
- socket->io_handle.data.fd,
- local_endpoint->address,
- (int)local_endpoint->port);
- return aws_raise_error(s_convert_pton_error(pton_err, errno_value));
- }
- if (bind(socket->io_handle.data.fd, (struct sockaddr *)&address.sock_addr_types, sock_size) != 0) {
- int errno_value = errno; /* Always cache errno before potential side-effect */
- AWS_LOGF_ERROR(
- AWS_LS_IO_SOCKET,
- "id=%p fd=%d: bind failed with error code %d",
- (void *)socket,
- socket->io_handle.data.fd,
- errno_value);
- aws_raise_error(s_determine_socket_error(errno_value));
- goto error;
- }
- if (s_update_local_endpoint(socket)) {
- goto error;
- }
- if (socket->options.type == AWS_SOCKET_STREAM) {
- socket->state = BOUND;
- } else {
- /* e.g. UDP is now readable */
- socket->state = CONNECTED_READ;
- }
- AWS_LOGF_DEBUG(
- AWS_LS_IO_SOCKET,
- "id=%p fd=%d: successfully bound to %s:%d",
- (void *)socket,
- socket->io_handle.data.fd,
- socket->local_endpoint.address,
- socket->local_endpoint.port);
- return AWS_OP_SUCCESS;
- error:
- socket->state = ERROR;
- return AWS_OP_ERR;
- }
- int aws_socket_get_bound_address(const struct aws_socket *socket, struct aws_socket_endpoint *out_address) {
- if (socket->local_endpoint.address[0] == 0) {
- AWS_LOGF_ERROR(
- AWS_LS_IO_SOCKET,
- "id=%p fd=%d: Socket has no local address. Socket must be bound first.",
- (void *)socket,
- socket->io_handle.data.fd);
- return aws_raise_error(AWS_IO_SOCKET_ILLEGAL_OPERATION_FOR_STATE);
- }
- *out_address = socket->local_endpoint;
- return AWS_OP_SUCCESS;
- }
- int aws_socket_listen(struct aws_socket *socket, int backlog_size) {
- if (socket->state != BOUND) {
- AWS_LOGF_ERROR(
- AWS_LS_IO_SOCKET,
- "id=%p fd=%d: invalid state for listen operation. You must call bind first.",
- (void *)socket,
- socket->io_handle.data.fd);
- return aws_raise_error(AWS_IO_SOCKET_ILLEGAL_OPERATION_FOR_STATE);
- }
- int error_code = listen(socket->io_handle.data.fd, backlog_size);
- if (!error_code) {
- AWS_LOGF_INFO(
- AWS_LS_IO_SOCKET, "id=%p fd=%d: successfully listening", (void *)socket, socket->io_handle.data.fd);
- socket->state = LISTENING;
- return AWS_OP_SUCCESS;
- }
- int errno_value = errno; /* Always cache errno before potential side-effect */
- AWS_LOGF_ERROR(
- AWS_LS_IO_SOCKET,
- "id=%p fd=%d: listen failed with error code %d",
- (void *)socket,
- socket->io_handle.data.fd,
- errno_value);
- socket->state = ERROR;
- return aws_raise_error(s_determine_socket_error(errno_value));
- }
- /* this is called by the event loop handler that was installed in start_accept(). It runs once the FD goes readable,
- * accepts as many as it can and then returns control to the event loop. */
- static void s_socket_accept_event(
- struct aws_event_loop *event_loop,
- struct aws_io_handle *handle,
- int events,
- void *user_data) {
- (void)event_loop;
- struct aws_socket *socket = user_data;
- struct posix_socket *socket_impl = socket->impl;
- AWS_LOGF_DEBUG(
- AWS_LS_IO_SOCKET, "id=%p fd=%d: listening event received", (void *)socket, socket->io_handle.data.fd);
- if (socket_impl->continue_accept && events & AWS_IO_EVENT_TYPE_READABLE) {
- int in_fd = 0;
- while (socket_impl->continue_accept && in_fd != -1) {
- struct sockaddr_storage in_addr;
- socklen_t in_len = sizeof(struct sockaddr_storage);
- in_fd = accept(handle->data.fd, (struct sockaddr *)&in_addr, &in_len);
- if (in_fd == -1) {
- int errno_value = errno; /* Always cache errno before potential side-effect */
- if (errno_value == EAGAIN || errno_value == EWOULDBLOCK) {
- break;
- }
- int aws_error = aws_socket_get_error(socket);
- aws_raise_error(aws_error);
- s_on_connection_error(socket, aws_error);
- break;
- }
- AWS_LOGF_DEBUG(
- AWS_LS_IO_SOCKET, "id=%p fd=%d: incoming connection", (void *)socket, socket->io_handle.data.fd);
- struct aws_socket *new_sock = aws_mem_acquire(socket->allocator, sizeof(struct aws_socket));
- if (!new_sock) {
- close(in_fd);
- s_on_connection_error(socket, aws_last_error());
- continue;
- }
- if (s_socket_init(new_sock, socket->allocator, &socket->options, in_fd)) {
- aws_mem_release(socket->allocator, new_sock);
- s_on_connection_error(socket, aws_last_error());
- continue;
- }
- new_sock->local_endpoint = socket->local_endpoint;
- new_sock->state = CONNECTED_READ | CONNECTED_WRITE;
- uint16_t port = 0;
- /* get the info on the incoming socket's address */
- if (in_addr.ss_family == AF_INET) {
- struct sockaddr_in *s = (struct sockaddr_in *)&in_addr;
- port = ntohs(s->sin_port);
- /* this came from the kernel, a.) it won't fail. b.) even if it does
- * its not fatal. come back and add logging later. */
- if (!inet_ntop(
- AF_INET,
- &s->sin_addr,
- new_sock->remote_endpoint.address,
- sizeof(new_sock->remote_endpoint.address))) {
- AWS_LOGF_WARN(
- AWS_LS_IO_SOCKET,
- "id=%p fd=%d:. Failed to determine remote address.",
- (void *)socket,
- socket->io_handle.data.fd);
- }
- new_sock->options.domain = AWS_SOCKET_IPV4;
- } else if (in_addr.ss_family == AF_INET6) {
- /* this came from the kernel, a.) it won't fail. b.) even if it does
- * its not fatal. come back and add logging later. */
- struct sockaddr_in6 *s = (struct sockaddr_in6 *)&in_addr;
- port = ntohs(s->sin6_port);
- if (!inet_ntop(
- AF_INET6,
- &s->sin6_addr,
- new_sock->remote_endpoint.address,
- sizeof(new_sock->remote_endpoint.address))) {
- AWS_LOGF_WARN(
- AWS_LS_IO_SOCKET,
- "id=%p fd=%d:. Failed to determine remote address.",
- (void *)socket,
- socket->io_handle.data.fd);
- }
- new_sock->options.domain = AWS_SOCKET_IPV6;
- } else if (in_addr.ss_family == AF_UNIX) {
- new_sock->remote_endpoint = socket->local_endpoint;
- new_sock->options.domain = AWS_SOCKET_LOCAL;
- }
- new_sock->remote_endpoint.port = port;
- AWS_LOGF_INFO(
- AWS_LS_IO_SOCKET,
- "id=%p fd=%d: connected to %s:%d, incoming fd %d",
- (void *)socket,
- socket->io_handle.data.fd,
- new_sock->remote_endpoint.address,
- new_sock->remote_endpoint.port,
- in_fd);
- int flags = fcntl(in_fd, F_GETFL, 0);
- flags |= O_NONBLOCK | O_CLOEXEC;
- fcntl(in_fd, F_SETFL, flags);
- bool close_occurred = false;
- socket_impl->close_happened = &close_occurred;
- socket->accept_result_fn(socket, AWS_ERROR_SUCCESS, new_sock, socket->connect_accept_user_data);
- if (close_occurred) {
- return;
- }
- socket_impl->close_happened = NULL;
- }
- }
- AWS_LOGF_TRACE(
- AWS_LS_IO_SOCKET,
- "id=%p fd=%d: finished processing incoming connections, "
- "waiting on event-loop notification",
- (void *)socket,
- socket->io_handle.data.fd);
- }
- int aws_socket_start_accept(
- struct aws_socket *socket,
- struct aws_event_loop *accept_loop,
- aws_socket_on_accept_result_fn *on_accept_result,
- void *user_data) {
- AWS_ASSERT(on_accept_result);
- AWS_ASSERT(accept_loop);
- if (socket->event_loop) {
- AWS_LOGF_ERROR(
- AWS_LS_IO_SOCKET,
- "id=%p fd=%d: is already assigned to event-loop %p.",
- (void *)socket,
- socket->io_handle.data.fd,
- (void *)socket->event_loop);
- return aws_raise_error(AWS_IO_EVENT_LOOP_ALREADY_ASSIGNED);
- }
- if (socket->state != LISTENING) {
- AWS_LOGF_ERROR(
- AWS_LS_IO_SOCKET,
- "id=%p fd=%d: invalid state for start_accept operation. You must call listen first.",
- (void *)socket,
- socket->io_handle.data.fd);
- return aws_raise_error(AWS_IO_SOCKET_ILLEGAL_OPERATION_FOR_STATE);
- }
- socket->accept_result_fn = on_accept_result;
- socket->connect_accept_user_data = user_data;
- socket->event_loop = accept_loop;
- struct posix_socket *socket_impl = socket->impl;
- socket_impl->continue_accept = true;
- socket_impl->currently_subscribed = true;
- if (aws_event_loop_subscribe_to_io_events(
- socket->event_loop, &socket->io_handle, AWS_IO_EVENT_TYPE_READABLE, s_socket_accept_event, socket)) {
- AWS_LOGF_ERROR(
- AWS_LS_IO_SOCKET,
- "id=%p fd=%d: failed to subscribe to event-loop %p.",
- (void *)socket,
- socket->io_handle.data.fd,
- (void *)socket->event_loop);
- socket_impl->continue_accept = false;
- socket_impl->currently_subscribed = false;
- socket->event_loop = NULL;
- return AWS_OP_ERR;
- }
- return AWS_OP_SUCCESS;
- }
- struct stop_accept_args {
- struct aws_task task;
- struct aws_mutex mutex;
- struct aws_condition_variable condition_variable;
- struct aws_socket *socket;
- int ret_code;
- bool invoked;
- };
- static bool s_stop_accept_pred(void *arg) {
- struct stop_accept_args *stop_accept_args = arg;
- return stop_accept_args->invoked;
- }
- static void s_stop_accept_task(struct aws_task *task, void *arg, enum aws_task_status status) {
- (void)task;
- (void)status;
- struct stop_accept_args *stop_accept_args = arg;
- aws_mutex_lock(&stop_accept_args->mutex);
- stop_accept_args->ret_code = AWS_OP_SUCCESS;
- if (aws_socket_stop_accept(stop_accept_args->socket)) {
- stop_accept_args->ret_code = aws_last_error();
- }
- stop_accept_args->invoked = true;
- aws_condition_variable_notify_one(&stop_accept_args->condition_variable);
- aws_mutex_unlock(&stop_accept_args->mutex);
- }
- int aws_socket_stop_accept(struct aws_socket *socket) {
- if (socket->state != LISTENING) {
- AWS_LOGF_ERROR(
- AWS_LS_IO_SOCKET,
- "id=%p fd=%d: is not in a listening state, can't stop_accept.",
- (void *)socket,
- socket->io_handle.data.fd);
- return aws_raise_error(AWS_IO_SOCKET_ILLEGAL_OPERATION_FOR_STATE);
- }
- AWS_LOGF_INFO(
- AWS_LS_IO_SOCKET, "id=%p fd=%d: stopping accepting new connections", (void *)socket, socket->io_handle.data.fd);
- if (!aws_event_loop_thread_is_callers_thread(socket->event_loop)) {
- struct stop_accept_args args = {
- .mutex = AWS_MUTEX_INIT,
- .condition_variable = AWS_CONDITION_VARIABLE_INIT,
- .invoked = false,
- .socket = socket,
- .ret_code = AWS_OP_SUCCESS,
- .task = {.fn = s_stop_accept_task},
- };
- AWS_LOGF_INFO(
- AWS_LS_IO_SOCKET,
- "id=%p fd=%d: stopping accepting new connections from a different thread than "
- "the socket is running from. Blocking until it shuts down.",
- (void *)socket,
- socket->io_handle.data.fd);
- /* Look.... I know what I'm doing.... trust me, I'm an engineer.
- * We wait on the completion before 'args' goes out of scope.
- * NOLINTNEXTLINE */
- args.task.arg = &args;
- aws_mutex_lock(&args.mutex);
- aws_event_loop_schedule_task_now(socket->event_loop, &args.task);
- aws_condition_variable_wait_pred(&args.condition_variable, &args.mutex, s_stop_accept_pred, &args);
- aws_mutex_unlock(&args.mutex);
- AWS_LOGF_INFO(
- AWS_LS_IO_SOCKET,
- "id=%p fd=%d: stop accept task finished running.",
- (void *)socket,
- socket->io_handle.data.fd);
- if (args.ret_code) {
- return aws_raise_error(args.ret_code);
- }
- return AWS_OP_SUCCESS;
- }
- int ret_val = AWS_OP_SUCCESS;
- struct posix_socket *socket_impl = socket->impl;
- if (socket_impl->currently_subscribed) {
- ret_val = aws_event_loop_unsubscribe_from_io_events(socket->event_loop, &socket->io_handle);
- socket_impl->currently_subscribed = false;
- socket_impl->continue_accept = false;
- socket->event_loop = NULL;
- }
- return ret_val;
- }
- int aws_socket_set_options(struct aws_socket *socket, const struct aws_socket_options *options) {
- if (socket->options.domain != options->domain || socket->options.type != options->type) {
- return aws_raise_error(AWS_IO_SOCKET_INVALID_OPTIONS);
- }
- AWS_LOGF_DEBUG(
- AWS_LS_IO_SOCKET,
- "id=%p fd=%d: setting socket options to: keep-alive %d, keep idle %d, keep-alive interval %d, keep-alive probe "
- "count %d.",
- (void *)socket,
- socket->io_handle.data.fd,
- (int)options->keepalive,
- (int)options->keep_alive_timeout_sec,
- (int)options->keep_alive_interval_sec,
- (int)options->keep_alive_max_failed_probes);
- socket->options = *options;
- #ifdef NO_SIGNAL_SOCK_OPT
- int option_value = 1;
- if (AWS_UNLIKELY(setsockopt(
- socket->io_handle.data.fd, SOL_SOCKET, NO_SIGNAL_SOCK_OPT, &option_value, sizeof(option_value)))) {
- int errno_value = errno; /* Always cache errno before potential side-effect */
- AWS_LOGF_WARN(
- AWS_LS_IO_SOCKET,
- "id=%p fd=%d: setsockopt() for NO_SIGNAL_SOCK_OPT failed with errno %d.",
- (void *)socket,
- socket->io_handle.data.fd,
- errno_value);
- }
- #endif /* NO_SIGNAL_SOCK_OPT */
- int reuse = 1;
- if (AWS_UNLIKELY(setsockopt(socket->io_handle.data.fd, SOL_SOCKET, SO_REUSEADDR, &reuse, sizeof(int)))) {
- int errno_value = errno; /* Always cache errno before potential side-effect */
- AWS_LOGF_WARN(
- AWS_LS_IO_SOCKET,
- "id=%p fd=%d: setsockopt() for SO_REUSEADDR failed with errno %d.",
- (void *)socket,
- socket->io_handle.data.fd,
- errno_value);
- }
- if (options->type == AWS_SOCKET_STREAM && options->domain != AWS_SOCKET_LOCAL) {
- if (socket->options.keepalive) {
- int keep_alive = 1;
- if (AWS_UNLIKELY(
- setsockopt(socket->io_handle.data.fd, SOL_SOCKET, SO_KEEPALIVE, &keep_alive, sizeof(int)))) {
- int errno_value = errno; /* Always cache errno before potential side-effect */
- AWS_LOGF_WARN(
- AWS_LS_IO_SOCKET,
- "id=%p fd=%d: setsockopt() for enabling SO_KEEPALIVE failed with errno %d.",
- (void *)socket,
- socket->io_handle.data.fd,
- errno_value);
- }
- }
- #if !defined(__OpenBSD__)
- if (socket->options.keep_alive_interval_sec && socket->options.keep_alive_timeout_sec) {
- int ival_in_secs = socket->options.keep_alive_interval_sec;
- if (AWS_UNLIKELY(setsockopt(
- socket->io_handle.data.fd, IPPROTO_TCP, TCP_KEEPIDLE, &ival_in_secs, sizeof(ival_in_secs)))) {
- int errno_value = errno; /* Always cache errno before potential side-effect */
- AWS_LOGF_WARN(
- AWS_LS_IO_SOCKET,
- "id=%p fd=%d: setsockopt() for enabling TCP_KEEPIDLE for TCP failed with errno %d.",
- (void *)socket,
- socket->io_handle.data.fd,
- errno_value);
- }
- ival_in_secs = socket->options.keep_alive_timeout_sec;
- if (AWS_UNLIKELY(setsockopt(
- socket->io_handle.data.fd, IPPROTO_TCP, TCP_KEEPINTVL, &ival_in_secs, sizeof(ival_in_secs)))) {
- int errno_value = errno; /* Always cache errno before potential side-effect */
- AWS_LOGF_WARN(
- AWS_LS_IO_SOCKET,
- "id=%p fd=%d: setsockopt() for enabling TCP_KEEPINTVL for TCP failed with errno %d.",
- (void *)socket,
- socket->io_handle.data.fd,
- errno_value);
- }
- }
- if (socket->options.keep_alive_max_failed_probes) {
- int max_probes = socket->options.keep_alive_max_failed_probes;
- if (AWS_UNLIKELY(
- setsockopt(socket->io_handle.data.fd, IPPROTO_TCP, TCP_KEEPCNT, &max_probes, sizeof(max_probes)))) {
- int errno_value = errno; /* Always cache errno before potential side-effect */
- AWS_LOGF_WARN(
- AWS_LS_IO_SOCKET,
- "id=%p fd=%d: setsockopt() for enabling TCP_KEEPCNT for TCP failed with errno %d.",
- (void *)socket,
- socket->io_handle.data.fd,
- errno_value);
- }
- }
- #endif /* __OpenBSD__ */
- }
- return AWS_OP_SUCCESS;
- }
- struct socket_write_request {
- struct aws_byte_cursor cursor_cpy;
- aws_socket_on_write_completed_fn *written_fn;
- void *write_user_data;
- struct aws_linked_list_node node;
- size_t original_buffer_len;
- int error_code;
- };
- struct posix_socket_close_args {
- struct aws_mutex mutex;
- struct aws_condition_variable condition_variable;
- struct aws_socket *socket;
- bool invoked;
- int ret_code;
- };
- static bool s_close_predicate(void *arg) {
- struct posix_socket_close_args *close_args = arg;
- return close_args->invoked;
- }
- static void s_close_task(struct aws_task *task, void *arg, enum aws_task_status status) {
- (void)task;
- (void)status;
- struct posix_socket_close_args *close_args = arg;
- aws_mutex_lock(&close_args->mutex);
- close_args->ret_code = AWS_OP_SUCCESS;
- if (aws_socket_close(close_args->socket)) {
- close_args->ret_code = aws_last_error();
- }
- close_args->invoked = true;
- aws_condition_variable_notify_one(&close_args->condition_variable);
- aws_mutex_unlock(&close_args->mutex);
- }
- int aws_socket_close(struct aws_socket *socket) {
- struct posix_socket *socket_impl = socket->impl;
- AWS_LOGF_DEBUG(AWS_LS_IO_SOCKET, "id=%p fd=%d: closing", (void *)socket, socket->io_handle.data.fd);
- struct aws_event_loop *event_loop = socket->event_loop;
- if (socket->event_loop) {
- /* don't freak out on me, this almost never happens, and never occurs inside a channel
- * it only gets hit from a listening socket shutting down or from a unit test. */
- if (!aws_event_loop_thread_is_callers_thread(socket->event_loop)) {
- AWS_LOGF_INFO(
- AWS_LS_IO_SOCKET,
- "id=%p fd=%d: closing from a different thread than "
- "the socket is running from. Blocking until it closes down.",
- (void *)socket,
- socket->io_handle.data.fd);
- /* the only time we allow this kind of thing is when you're a listener.*/
- if (socket->state != LISTENING) {
- return aws_raise_error(AWS_IO_SOCKET_ILLEGAL_OPERATION_FOR_STATE);
- }
- struct posix_socket_close_args args = {
- .mutex = AWS_MUTEX_INIT,
- .condition_variable = AWS_CONDITION_VARIABLE_INIT,
- .socket = socket,
- .ret_code = AWS_OP_SUCCESS,
- .invoked = false,
- };
- struct aws_task close_task = {
- .fn = s_close_task,
- .arg = &args,
- };
- int fd_for_logging = socket->io_handle.data.fd; /* socket's fd gets reset before final log */
- (void)fd_for_logging;
- aws_mutex_lock(&args.mutex);
- aws_event_loop_schedule_task_now(socket->event_loop, &close_task);
- aws_condition_variable_wait_pred(&args.condition_variable, &args.mutex, s_close_predicate, &args);
- aws_mutex_unlock(&args.mutex);
- AWS_LOGF_INFO(AWS_LS_IO_SOCKET, "id=%p fd=%d: close task completed.", (void *)socket, fd_for_logging);
- if (args.ret_code) {
- return aws_raise_error(args.ret_code);
- }
- return AWS_OP_SUCCESS;
- }
- if (socket_impl->currently_subscribed) {
- if (socket->state & LISTENING) {
- aws_socket_stop_accept(socket);
- } else {
- int err_code = aws_event_loop_unsubscribe_from_io_events(socket->event_loop, &socket->io_handle);
- if (err_code) {
- return AWS_OP_ERR;
- }
- }
- socket_impl->currently_subscribed = false;
- socket->event_loop = NULL;
- }
- }
- if (socket_impl->close_happened) {
- *socket_impl->close_happened = true;
- }
- if (socket_impl->connect_args) {
- socket_impl->connect_args->socket = NULL;
- socket_impl->connect_args = NULL;
- }
- if (aws_socket_is_open(socket)) {
- close(socket->io_handle.data.fd);
- socket->io_handle.data.fd = -1;
- socket->state = CLOSED;
- /* ensure callbacks for pending writes fire (in order) before this close function returns */
- if (socket_impl->written_task_scheduled) {
- aws_event_loop_cancel_task(event_loop, &socket_impl->written_task);
- }
- while (!aws_linked_list_empty(&socket_impl->written_queue)) {
- struct aws_linked_list_node *node = aws_linked_list_pop_front(&socket_impl->written_queue);
- struct socket_write_request *write_request = AWS_CONTAINER_OF(node, struct socket_write_request, node);
- size_t bytes_written = write_request->original_buffer_len - write_request->cursor_cpy.len;
- write_request->written_fn(socket, write_request->error_code, bytes_written, write_request->write_user_data);
- aws_mem_release(socket->allocator, write_request);
- }
- while (!aws_linked_list_empty(&socket_impl->write_queue)) {
- struct aws_linked_list_node *node = aws_linked_list_pop_front(&socket_impl->write_queue);
- struct socket_write_request *write_request = AWS_CONTAINER_OF(node, struct socket_write_request, node);
- size_t bytes_written = write_request->original_buffer_len - write_request->cursor_cpy.len;
- write_request->written_fn(socket, AWS_IO_SOCKET_CLOSED, bytes_written, write_request->write_user_data);
- aws_mem_release(socket->allocator, write_request);
- }
- }
- return AWS_OP_SUCCESS;
- }
- int aws_socket_shutdown_dir(struct aws_socket *socket, enum aws_channel_direction dir) {
- int how = dir == AWS_CHANNEL_DIR_READ ? 0 : 1;
- AWS_LOGF_DEBUG(
- AWS_LS_IO_SOCKET, "id=%p fd=%d: shutting down in direction %d", (void *)socket, socket->io_handle.data.fd, dir);
- if (shutdown(socket->io_handle.data.fd, how)) {
- int errno_value = errno; /* Always cache errno before potential side-effect */
- int aws_error = s_determine_socket_error(errno_value);
- return aws_raise_error(aws_error);
- }
- if (dir == AWS_CHANNEL_DIR_READ) {
- socket->state &= ~CONNECTED_READ;
- } else {
- socket->state &= ~CONNECTED_WRITE;
- }
- return AWS_OP_SUCCESS;
- }
- static void s_written_task(struct aws_task *task, void *arg, enum aws_task_status status) {
- (void)task;
- (void)status;
- struct aws_socket *socket = arg;
- struct posix_socket *socket_impl = socket->impl;
- socket_impl->written_task_scheduled = false;
- /* this is to handle a race condition when a callback kicks off a cleanup, or the user decides
- * to close the socket based on something they read (SSL validation failed for example).
- * if clean_up happens when internal_refcount > 0, socket_impl is kept dangling */
- aws_ref_count_acquire(&socket_impl->internal_refcount);
- /* Notes about weird loop:
- * 1) Only process the initial contents of queue when this task is run,
- * ignoring any writes queued during delivery.
- * If we simply looped until the queue was empty, we could get into a
- * synchronous loop of completing and writing and completing and writing...
- * and it would be tough for multiple sockets to share an event-loop fairly.
- * 2) Check if queue is empty with each iteration.
- * If user calls close() from the callback, close() will process all
- * nodes in the written_queue, and the queue will be empty when the
- * callstack gets back to here. */
- if (!aws_linked_list_empty(&socket_impl->written_queue)) {
- struct aws_linked_list_node *stop_after = aws_linked_list_back(&socket_impl->written_queue);
- do {
- struct aws_linked_list_node *node = aws_linked_list_pop_front(&socket_impl->written_queue);
- struct socket_write_request *write_request = AWS_CONTAINER_OF(node, struct socket_write_request, node);
- size_t bytes_written = write_request->original_buffer_len - write_request->cursor_cpy.len;
- write_request->written_fn(socket, write_request->error_code, bytes_written, write_request->write_user_data);
- aws_mem_release(socket_impl->allocator, write_request);
- if (node == stop_after) {
- break;
- }
- } while (!aws_linked_list_empty(&socket_impl->written_queue));
- }
- aws_ref_count_release(&socket_impl->internal_refcount);
- }
- /* this gets called in two scenarios.
- * 1st scenario, someone called aws_socket_write() and we want to try writing now, so an error can be returned
- * immediately if something bad has happened to the socket. In this case, `parent_request` is set.
- * 2nd scenario, the event loop notified us that the socket went writable. In this case `parent_request` is NULL */
- static int s_process_socket_write_requests(struct aws_socket *socket, struct socket_write_request *parent_request) {
- struct posix_socket *socket_impl = socket->impl;
- if (parent_request) {
- AWS_LOGF_TRACE(
- AWS_LS_IO_SOCKET,
- "id=%p fd=%d: processing write requests, called from aws_socket_write",
- (void *)socket,
- socket->io_handle.data.fd);
- } else {
- AWS_LOGF_TRACE(
- AWS_LS_IO_SOCKET,
- "id=%p fd=%d: processing write requests, invoked by the event-loop",
- (void *)socket,
- socket->io_handle.data.fd);
- }
- bool purge = false;
- int aws_error = AWS_OP_SUCCESS;
- bool parent_request_failed = false;
- bool pushed_to_written_queue = false;
- /* if a close call happens in the middle, this queue will have been cleaned out from under us. */
- while (!aws_linked_list_empty(&socket_impl->write_queue)) {
- struct aws_linked_list_node *node = aws_linked_list_front(&socket_impl->write_queue);
- struct socket_write_request *write_request = AWS_CONTAINER_OF(node, struct socket_write_request, node);
- AWS_LOGF_TRACE(
- AWS_LS_IO_SOCKET,
- "id=%p fd=%d: dequeued write request of size %llu, remaining to write %llu",
- (void *)socket,
- socket->io_handle.data.fd,
- (unsigned long long)write_request->original_buffer_len,
- (unsigned long long)write_request->cursor_cpy.len);
- ssize_t written = send(
- socket->io_handle.data.fd, write_request->cursor_cpy.ptr, write_request->cursor_cpy.len, NO_SIGNAL_SEND);
- int errno_value = errno; /* Always cache errno before potential side-effect */
- AWS_LOGF_TRACE(
- AWS_LS_IO_SOCKET,
- "id=%p fd=%d: send written size %d",
- (void *)socket,
- socket->io_handle.data.fd,
- (int)written);
- if (written < 0) {
- if (errno_value == EAGAIN) {
- AWS_LOGF_TRACE(
- AWS_LS_IO_SOCKET, "id=%p fd=%d: returned would block", (void *)socket, socket->io_handle.data.fd);
- break;
- }
- if (errno_value == EPIPE) {
- AWS_LOGF_DEBUG(
- AWS_LS_IO_SOCKET,
- "id=%p fd=%d: already closed before write",
- (void *)socket,
- socket->io_handle.data.fd);
- aws_error = AWS_IO_SOCKET_CLOSED;
- aws_raise_error(aws_error);
- purge = true;
- break;
- }
- purge = true;
- AWS_LOGF_DEBUG(
- AWS_LS_IO_SOCKET,
- "id=%p fd=%d: write error with error code %d",
- (void *)socket,
- socket->io_handle.data.fd,
- errno_value);
- aws_error = s_determine_socket_error(errno_value);
- aws_raise_error(aws_error);
- break;
- }
- size_t remaining_to_write = write_request->cursor_cpy.len;
- aws_byte_cursor_advance(&write_request->cursor_cpy, (size_t)written);
- AWS_LOGF_TRACE(
- AWS_LS_IO_SOCKET,
- "id=%p fd=%d: remaining write request to write %llu",
- (void *)socket,
- socket->io_handle.data.fd,
- (unsigned long long)write_request->cursor_cpy.len);
- if ((size_t)written == remaining_to_write) {
- AWS_LOGF_TRACE(
- AWS_LS_IO_SOCKET, "id=%p fd=%d: write request completed", (void *)socket, socket->io_handle.data.fd);
- aws_linked_list_remove(node);
- write_request->error_code = AWS_ERROR_SUCCESS;
- aws_linked_list_push_back(&socket_impl->written_queue, node);
- pushed_to_written_queue = true;
- }
- }
- if (purge) {
- while (!aws_linked_list_empty(&socket_impl->write_queue)) {
- struct aws_linked_list_node *node = aws_linked_list_pop_front(&socket_impl->write_queue);
- struct socket_write_request *write_request = AWS_CONTAINER_OF(node, struct socket_write_request, node);
- /* If this fn was invoked directly from aws_socket_write(), don't invoke the error callback
- * as the user will be able to rely on the return value from aws_socket_write() */
- if (write_request == parent_request) {
- parent_request_failed = true;
- aws_mem_release(socket->allocator, write_request);
- } else {
- write_request->error_code = aws_error;
- aws_linked_list_push_back(&socket_impl->written_queue, node);
- pushed_to_written_queue = true;
- }
- }
- }
- if (pushed_to_written_queue && !socket_impl->written_task_scheduled) {
- socket_impl->written_task_scheduled = true;
- aws_task_init(&socket_impl->written_task, s_written_task, socket, "socket_written_task");
- aws_event_loop_schedule_task_now(socket->event_loop, &socket_impl->written_task);
- }
- /* Only report error if aws_socket_write() invoked this function and its write_request failed */
- if (!parent_request_failed) {
- return AWS_OP_SUCCESS;
- }
- aws_raise_error(aws_error);
- return AWS_OP_ERR;
- }
- static void s_on_socket_io_event(
- struct aws_event_loop *event_loop,
- struct aws_io_handle *handle,
- int events,
- void *user_data) {
- (void)event_loop;
- (void)handle;
- struct aws_socket *socket = user_data;
- struct posix_socket *socket_impl = socket->impl;
- /* this is to handle a race condition when an error kicks off a cleanup, or the user decides
- * to close the socket based on something they read (SSL validation failed for example).
- * if clean_up happens when internal_refcount > 0, socket_impl is kept dangling but currently
- * subscribed is set to false. */
- aws_ref_count_acquire(&socket_impl->internal_refcount);
- if (events & AWS_IO_EVENT_TYPE_REMOTE_HANG_UP || events & AWS_IO_EVENT_TYPE_CLOSED) {
- aws_raise_error(AWS_IO_SOCKET_CLOSED);
- AWS_LOGF_TRACE(AWS_LS_IO_SOCKET, "id=%p fd=%d: closed remotely", (void *)socket, socket->io_handle.data.fd);
- if (socket->readable_fn) {
- socket->readable_fn(socket, AWS_IO_SOCKET_CLOSED, socket->readable_user_data);
- }
- goto end_check;
- }
- if (socket_impl->currently_subscribed && events & AWS_IO_EVENT_TYPE_ERROR) {
- int aws_error = aws_socket_get_error(socket);
- aws_raise_error(aws_error);
- AWS_LOGF_TRACE(
- AWS_LS_IO_SOCKET, "id=%p fd=%d: error event occurred", (void *)socket, socket->io_handle.data.fd);
- if (socket->readable_fn) {
- socket->readable_fn(socket, aws_error, socket->readable_user_data);
- }
- goto end_check;
- }
- if (socket_impl->currently_subscribed && events & AWS_IO_EVENT_TYPE_READABLE) {
- AWS_LOGF_TRACE(AWS_LS_IO_SOCKET, "id=%p fd=%d: is readable", (void *)socket, socket->io_handle.data.fd);
- if (socket->readable_fn) {
- socket->readable_fn(socket, AWS_OP_SUCCESS, socket->readable_user_data);
- }
- }
- /* if socket closed in between these branches, the currently_subscribed will be false and socket_impl will not
- * have been cleaned up, so this next branch is safe. */
- if (socket_impl->currently_subscribed && events & AWS_IO_EVENT_TYPE_WRITABLE) {
- AWS_LOGF_TRACE(AWS_LS_IO_SOCKET, "id=%p fd=%d: is writable", (void *)socket, socket->io_handle.data.fd);
- s_process_socket_write_requests(socket, NULL);
- }
- end_check:
- aws_ref_count_release(&socket_impl->internal_refcount);
- }
- int aws_socket_assign_to_event_loop(struct aws_socket *socket, struct aws_event_loop *event_loop) {
- if (!socket->event_loop) {
- AWS_LOGF_DEBUG(
- AWS_LS_IO_SOCKET,
- "id=%p fd=%d: assigning to event loop %p",
- (void *)socket,
- socket->io_handle.data.fd,
- (void *)event_loop);
- socket->event_loop = event_loop;
- struct posix_socket *socket_impl = socket->impl;
- socket_impl->currently_subscribed = true;
- if (aws_event_loop_subscribe_to_io_events(
- event_loop,
- &socket->io_handle,
- AWS_IO_EVENT_TYPE_WRITABLE | AWS_IO_EVENT_TYPE_READABLE,
- s_on_socket_io_event,
- socket)) {
- AWS_LOGF_ERROR(
- AWS_LS_IO_SOCKET,
- "id=%p fd=%d: assigning to event loop %p failed with error %d",
- (void *)socket,
- socket->io_handle.data.fd,
- (void *)event_loop,
- aws_last_error());
- socket_impl->currently_subscribed = false;
- socket->event_loop = NULL;
- return AWS_OP_ERR;
- }
- return AWS_OP_SUCCESS;
- }
- return aws_raise_error(AWS_IO_EVENT_LOOP_ALREADY_ASSIGNED);
- }
- struct aws_event_loop *aws_socket_get_event_loop(struct aws_socket *socket) {
- return socket->event_loop;
- }
- int aws_socket_subscribe_to_readable_events(
- struct aws_socket *socket,
- aws_socket_on_readable_fn *on_readable,
- void *user_data) {
- AWS_LOGF_TRACE(
- AWS_LS_IO_SOCKET, " id=%p fd=%d: subscribing to readable events", (void *)socket, socket->io_handle.data.fd);
- if (!(socket->state & CONNECTED_READ)) {
- AWS_LOGF_ERROR(
- AWS_LS_IO_SOCKET,
- "id=%p fd=%d: can't subscribe to readable events since the socket is not connected",
- (void *)socket,
- socket->io_handle.data.fd);
- return aws_raise_error(AWS_IO_SOCKET_NOT_CONNECTED);
- }
- if (socket->readable_fn) {
- AWS_LOGF_ERROR(
- AWS_LS_IO_SOCKET,
- "id=%p fd=%d: can't subscribe to readable events since it is already subscribed",
- (void *)socket,
- socket->io_handle.data.fd);
- return aws_raise_error(AWS_ERROR_IO_ALREADY_SUBSCRIBED);
- }
- AWS_ASSERT(on_readable);
- socket->readable_user_data = user_data;
- socket->readable_fn = on_readable;
- return AWS_OP_SUCCESS;
- }
- int aws_socket_read(struct aws_socket *socket, struct aws_byte_buf *buffer, size_t *amount_read) {
- AWS_ASSERT(amount_read);
- if (!aws_event_loop_thread_is_callers_thread(socket->event_loop)) {
- AWS_LOGF_ERROR(
- AWS_LS_IO_SOCKET,
- "id=%p fd=%d: cannot read from a different thread than event loop %p",
- (void *)socket,
- socket->io_handle.data.fd,
- (void *)socket->event_loop);
- return aws_raise_error(AWS_ERROR_IO_EVENT_LOOP_THREAD_ONLY);
- }
- if (!(socket->state & CONNECTED_READ)) {
- AWS_LOGF_ERROR(
- AWS_LS_IO_SOCKET,
- "id=%p fd=%d: cannot read because it is not connected",
- (void *)socket,
- socket->io_handle.data.fd);
- return aws_raise_error(AWS_IO_SOCKET_NOT_CONNECTED);
- }
- ssize_t read_val = read(socket->io_handle.data.fd, buffer->buffer + buffer->len, buffer->capacity - buffer->len);
- int errno_value = errno; /* Always cache errno before potential side-effect */
- AWS_LOGF_TRACE(
- AWS_LS_IO_SOCKET, "id=%p fd=%d: read of %d", (void *)socket, socket->io_handle.data.fd, (int)read_val);
- if (read_val > 0) {
- *amount_read = (size_t)read_val;
- buffer->len += *amount_read;
- return AWS_OP_SUCCESS;
- }
- /* read_val of 0 means EOF which we'll treat as AWS_IO_SOCKET_CLOSED */
- if (read_val == 0) {
- AWS_LOGF_INFO(
- AWS_LS_IO_SOCKET, "id=%p fd=%d: zero read, socket is closed", (void *)socket, socket->io_handle.data.fd);
- *amount_read = 0;
- if (buffer->capacity - buffer->len > 0) {
- return aws_raise_error(AWS_IO_SOCKET_CLOSED);
- }
- return AWS_OP_SUCCESS;
- }
- #if defined(EWOULDBLOCK)
- if (errno_value == EAGAIN || errno_value == EWOULDBLOCK) {
- #else
- if (errno_value == EAGAIN) {
- #endif
- AWS_LOGF_TRACE(AWS_LS_IO_SOCKET, "id=%p fd=%d: read would block", (void *)socket, socket->io_handle.data.fd);
- return aws_raise_error(AWS_IO_READ_WOULD_BLOCK);
- }
- if (errno_value == EPIPE || errno_value == ECONNRESET) {
- AWS_LOGF_INFO(AWS_LS_IO_SOCKET, "id=%p fd=%d: socket is closed.", (void *)socket, socket->io_handle.data.fd);
- return aws_raise_error(AWS_IO_SOCKET_CLOSED);
- }
- if (errno_value == ETIMEDOUT) {
- AWS_LOGF_ERROR(AWS_LS_IO_SOCKET, "id=%p fd=%d: socket timed out.", (void *)socket, socket->io_handle.data.fd);
- return aws_raise_error(AWS_IO_SOCKET_TIMEOUT);
- }
- AWS_LOGF_ERROR(
- AWS_LS_IO_SOCKET,
- "id=%p fd=%d: read failed with error: %s",
- (void *)socket,
- socket->io_handle.data.fd,
- strerror(errno_value));
- return aws_raise_error(s_determine_socket_error(errno_value));
- }
- int aws_socket_write(
- struct aws_socket *socket,
- const struct aws_byte_cursor *cursor,
- aws_socket_on_write_completed_fn *written_fn,
- void *user_data) {
- if (!aws_event_loop_thread_is_callers_thread(socket->event_loop)) {
- return aws_raise_error(AWS_ERROR_IO_EVENT_LOOP_THREAD_ONLY);
- }
- if (!(socket->state & CONNECTED_WRITE)) {
- AWS_LOGF_ERROR(
- AWS_LS_IO_SOCKET,
- "id=%p fd=%d: cannot write to because it is not connected",
- (void *)socket,
- socket->io_handle.data.fd);
- return aws_raise_error(AWS_IO_SOCKET_NOT_CONNECTED);
- }
- AWS_ASSERT(written_fn);
- struct posix_socket *socket_impl = socket->impl;
- struct socket_write_request *write_request =
- aws_mem_calloc(socket->allocator, 1, sizeof(struct socket_write_request));
- if (!write_request) {
- return AWS_OP_ERR;
- }
- write_request->original_buffer_len = cursor->len;
- write_request->written_fn = written_fn;
- write_request->write_user_data = user_data;
- write_request->cursor_cpy = *cursor;
- aws_linked_list_push_back(&socket_impl->write_queue, &write_request->node);
- return s_process_socket_write_requests(socket, write_request);
- }
- int aws_socket_get_error(struct aws_socket *socket) {
- int connect_result;
- socklen_t result_length = sizeof(connect_result);
- if (getsockopt(socket->io_handle.data.fd, SOL_SOCKET, SO_ERROR, &connect_result, &result_length) < 0) {
- return AWS_OP_ERR;
- }
- if (connect_result) {
- return s_determine_socket_error(connect_result);
- }
- return AWS_OP_SUCCESS;
- }
- bool aws_socket_is_open(struct aws_socket *socket) {
- return socket->io_handle.data.fd >= 0;
- }
|