event_stream_rpc_server.c 48 KB

1234567891011121314151617181920212223242526272829303132333435363738394041424344454647484950515253545556575859606162636465666768697071727374757677787980818283848586878889909192939495969798991001011021031041051061071081091101111121131141151161171181191201211221231241251261271281291301311321331341351361371381391401411421431441451461471481491501511521531541551561571581591601611621631641651661671681691701711721731741751761771781791801811821831841851861871881891901911921931941951961971981992002012022032042052062072082092102112122132142152162172182192202212222232242252262272282292302312322332342352362372382392402412422432442452462472482492502512522532542552562572582592602612622632642652662672682692702712722732742752762772782792802812822832842852862872882892902912922932942952962972982993003013023033043053063073083093103113123133143153163173183193203213223233243253263273283293303313323333343353363373383393403413423433443453463473483493503513523533543553563573583593603613623633643653663673683693703713723733743753763773783793803813823833843853863873883893903913923933943953963973983994004014024034044054064074084094104114124134144154164174184194204214224234244254264274284294304314324334344354364374384394404414424434444454464474484494504514524534544554564574584594604614624634644654664674684694704714724734744754764774784794804814824834844854864874884894904914924934944954964974984995005015025035045055065075085095105115125135145155165175185195205215225235245255265275285295305315325335345355365375385395405415425435445455465475485495505515525535545555565575585595605615625635645655665675685695705715725735745755765775785795805815825835845855865875885895905915925935945955965975985996006016026036046056066076086096106116126136146156166176186196206216226236246256266276286296306316326336346356366376386396406416426436446456466476486496506516526536546556566576586596606616626636646656666676686696706716726736746756766776786796806816826836846856866876886896906916926936946956966976986997007017027037047057067077087097107117127137147157167177187197207217227237247257267277287297307317327337347357367377387397407417427437447457467477487497507517527537547557567577587597607617627637647657667677687697707717727737747757767777787797807817827837847857867877887897907917927937947957967977987998008018028038048058068078088098108118128138148158168178188198208218228238248258268278288298308318328338348358368378388398408418428438448458468478488498508518528538548558568578588598608618628638648658668678688698708718728738748758768778788798808818828838848858868878888898908918928938948958968978988999009019029039049059069079089099109119129139149159169179189199209219229239249259269279289299309319329339349359369379389399409419429439449459469479489499509519529539549559569579589599609619629639649659669679689699709719729739749759769779789799809819829839849859869879889899909919929939949959969979989991000100110021003100410051006100710081009101010111012101310141015101610171018101910201021102210231024102510261027102810291030103110321033103410351036103710381039104010411042104310441045104610471048104910501051105210531054105510561057105810591060106110621063106410651066106710681069107010711072107310741075107610771078107910801081108210831084108510861087108810891090109110921093109410951096109710981099110011011102110311041105110611071108110911101111111211131114111511161117111811191120112111221123112411251126112711281129113011311132113311341135113611371138113911401141114211431144114511461147114811491150115111521153115411551156115711581159116011611162116311641165116611671168
  1. /*
  2. * Copyright 2010-2020 Amazon.com, Inc. or its affiliates. All Rights Reserved.
  3. *
  4. * Licensed under the Apache License, Version 2.0 (the "License").
  5. * You may not use this file except in compliance with the License.
  6. * A copy of the License is located at
  7. *
  8. * http://aws.amazon.com/apache2.0
  9. *
  10. * or in the "license" file accompanying this file. This file is distributed
  11. * on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either
  12. * express or implied. See the License for the specific language governing
  13. * permissions and limitations under the License.
  14. */
  15. #include <aws/event-stream/event_stream_channel_handler.h>
  16. #include <aws/event-stream/event_stream_rpc_server.h>
  17. #include <aws/event-stream/private/event_stream_rpc_priv.h>
  18. #include <aws/common/atomics.h>
  19. #include <aws/common/hash_table.h>
  20. #include <aws/io/channel.h>
  21. #include <aws/io/channel_bootstrap.h>
  22. #include <aws/io/socket.h>
  23. #include <inttypes.h>
  24. #if defined(_MSC_VER)
  25. /* allow non-constant aggregate initializer */
  26. # pragma warning(disable : 4204)
  27. /* allow passing a pointer to an automatically allocated variable around, cause I'm smarter than the compiler. */
  28. # pragma warning(disable : 4221)
  29. #endif
  30. static const struct aws_byte_cursor s_missing_operation_name_error = AWS_BYTE_CUR_INIT_FROM_STRING_LITERAL(
  31. "{ \"message\": \"The first message for on a non-zero :stream-id must contain an operation header value.\"; }");
  32. struct aws_event_stream_rpc_server_listener {
  33. struct aws_allocator *allocator;
  34. struct aws_socket *listener;
  35. struct aws_server_bootstrap *bootstrap;
  36. struct aws_atomic_var ref_count;
  37. aws_event_stream_rpc_server_on_new_connection_fn *on_new_connection;
  38. aws_event_stream_rpc_server_on_connection_shutdown_fn *on_connection_shutdown;
  39. aws_event_stream_rpc_server_on_listener_destroy_fn *on_destroy_callback;
  40. size_t initial_window_size;
  41. bool enable_read_backpressure;
  42. bool initialized;
  43. void *user_data;
  44. };
  45. struct aws_event_stream_rpc_server_connection {
  46. struct aws_allocator *allocator;
  47. struct aws_hash_table continuation_table;
  48. struct aws_event_stream_rpc_server_listener *server;
  49. struct aws_atomic_var ref_count;
  50. aws_event_stream_rpc_server_on_incoming_stream_fn *on_incoming_stream;
  51. aws_event_stream_rpc_server_connection_protocol_message_fn *on_connection_protocol_message;
  52. struct aws_channel *channel;
  53. struct aws_channel_handler *event_stream_handler;
  54. uint32_t latest_stream_id;
  55. void *user_data;
  56. struct aws_atomic_var is_open;
  57. struct aws_atomic_var handshake_state;
  58. bool bootstrap_owned;
  59. };
  60. struct aws_event_stream_rpc_server_continuation_token {
  61. uint32_t stream_id;
  62. struct aws_event_stream_rpc_server_connection *connection;
  63. aws_event_stream_rpc_server_stream_continuation_fn *continuation_fn;
  64. aws_event_stream_rpc_server_stream_continuation_closed_fn *closed_fn;
  65. void *user_data;
  66. struct aws_atomic_var ref_count;
  67. struct aws_atomic_var is_closed;
  68. };
  69. /** This is the destructor callback invoked by the connections continuation table when a continuation is removed
  70. * from the hash table.
  71. */
  72. void s_continuation_destroy(void *value) {
  73. struct aws_event_stream_rpc_server_continuation_token *continuation = value;
  74. AWS_LOGF_DEBUG(AWS_LS_EVENT_STREAM_RPC_SERVER, "id=%p: destroying continuation", (void *)continuation);
  75. /*
  76. * When creating a stream, we end up putting the continuation in the table before we finish initializing it.
  77. * If an error occurs in the on incoming stream callback, we end up with a continuation with no user data or
  78. * callbacks. This means we have to check closed_fn for validity even though the success path does a fatal assert
  79. * on validity.
  80. */
  81. if (continuation->closed_fn != NULL) {
  82. continuation->closed_fn(continuation, continuation->user_data);
  83. }
  84. aws_event_stream_rpc_server_continuation_release(continuation);
  85. }
  86. static void s_on_message_received(struct aws_event_stream_message *message, int error_code, void *user_data);
  87. /* We have two paths for creating a connection on a channel. The first is an incoming connection on the server listener.
  88. * The second is adding a connection to an already existing channel. This is the code common to both cases. */
  89. static struct aws_event_stream_rpc_server_connection *s_create_connection_on_channel(
  90. struct aws_event_stream_rpc_server_listener *server,
  91. struct aws_channel *channel) {
  92. AWS_LOGF_TRACE(
  93. AWS_LS_EVENT_STREAM_RPC_SERVER, "id=%p: creating connection on channel %p", (void *)server, (void *)channel);
  94. struct aws_event_stream_rpc_server_connection *connection =
  95. aws_mem_calloc(server->allocator, 1, sizeof(struct aws_event_stream_rpc_server_connection));
  96. struct aws_channel_handler *event_stream_handler = NULL;
  97. struct aws_channel_slot *slot = NULL;
  98. if (!connection) {
  99. AWS_LOGF_ERROR(
  100. AWS_LS_EVENT_STREAM_RPC_SERVER,
  101. "id=%p: allocation failed for connection with error %s",
  102. (void *)server,
  103. aws_error_debug_str(aws_last_error()));
  104. return NULL;
  105. }
  106. AWS_LOGF_DEBUG(AWS_LS_EVENT_STREAM_RPC_SERVER, "id=%p: new connection is %p", (void *)server, (void *)connection);
  107. aws_atomic_init_int(&connection->ref_count, 1u);
  108. aws_atomic_init_int(&connection->is_open, 1u);
  109. /* handshake step 1 is a connect message being received. Handshake 2 is the connect ack being sent.
  110. * no messages other than connect and connect ack are allowed until this count reaches 2. */
  111. aws_atomic_init_int(&connection->handshake_state, CONNECTION_HANDSHAKE_STATE_INITIALIZED);
  112. connection->allocator = server->allocator;
  113. if (aws_hash_table_init(
  114. &connection->continuation_table,
  115. server->allocator,
  116. 64,
  117. aws_event_stream_rpc_hash_streamid,
  118. aws_event_stream_rpc_streamid_eq,
  119. NULL,
  120. s_continuation_destroy)) {
  121. AWS_LOGF_ERROR(
  122. AWS_LS_EVENT_STREAM_RPC_SERVER,
  123. "id=%p: initialization of connection stream table failed with error %s",
  124. (void *)connection,
  125. aws_error_debug_str(aws_last_error()));
  126. goto error;
  127. }
  128. struct aws_event_stream_channel_handler_options handler_options = {
  129. .on_message_received = s_on_message_received,
  130. .user_data = connection,
  131. .initial_window_size = server->initial_window_size,
  132. .manual_window_management = server->enable_read_backpressure,
  133. };
  134. event_stream_handler = aws_event_stream_channel_handler_new(server->allocator, &handler_options);
  135. if (!event_stream_handler) {
  136. AWS_LOGF_ERROR(
  137. AWS_LS_EVENT_STREAM_RPC_SERVER,
  138. "id=%p: initialization of event-stream handler failed with error %s",
  139. (void *)connection,
  140. aws_error_debug_str(aws_last_error()));
  141. goto error;
  142. }
  143. slot = aws_channel_slot_new(channel);
  144. if (!slot) {
  145. AWS_LOGF_ERROR(
  146. AWS_LS_EVENT_STREAM_RPC_SERVER,
  147. "id=%p: initialization of channel slot failed with error %s",
  148. (void *)connection,
  149. aws_error_debug_str(aws_last_error()));
  150. goto error;
  151. }
  152. aws_channel_slot_insert_end(channel, slot);
  153. if (aws_channel_slot_set_handler(slot, event_stream_handler)) {
  154. AWS_LOGF_ERROR(
  155. AWS_LS_EVENT_STREAM_RPC_SERVER,
  156. "id=%p: setting the handler on the slot failed with error %s",
  157. (void *)connection,
  158. aws_error_debug_str(aws_last_error()));
  159. goto error;
  160. }
  161. aws_event_stream_rpc_server_listener_acquire(server);
  162. connection->server = server;
  163. connection->event_stream_handler = event_stream_handler;
  164. connection->channel = channel;
  165. aws_channel_acquire_hold(channel);
  166. return connection;
  167. error:
  168. if (!slot && event_stream_handler) {
  169. aws_channel_handler_destroy(event_stream_handler);
  170. }
  171. if (connection) {
  172. aws_event_stream_rpc_server_connection_release(connection);
  173. }
  174. return NULL;
  175. }
  176. struct aws_event_stream_rpc_server_connection *aws_event_stream_rpc_server_connection_from_existing_channel(
  177. struct aws_event_stream_rpc_server_listener *server,
  178. struct aws_channel *channel,
  179. const struct aws_event_stream_rpc_connection_options *connection_options) {
  180. AWS_FATAL_ASSERT(
  181. connection_options->on_connection_protocol_message && "on_connection_protocol_message must be specified!");
  182. AWS_FATAL_ASSERT(connection_options->on_incoming_stream && "on_incoming_stream must be specified");
  183. struct aws_event_stream_rpc_server_connection *connection = s_create_connection_on_channel(server, channel);
  184. if (!connection) {
  185. return NULL;
  186. }
  187. connection->on_incoming_stream = connection_options->on_incoming_stream;
  188. connection->on_connection_protocol_message = connection_options->on_connection_protocol_message;
  189. connection->user_data = connection_options->user_data;
  190. aws_event_stream_rpc_server_connection_acquire(connection);
  191. return connection;
  192. }
  193. void aws_event_stream_rpc_server_connection_acquire(struct aws_event_stream_rpc_server_connection *connection) {
  194. size_t current_count = aws_atomic_fetch_add_explicit(&connection->ref_count, 1, aws_memory_order_relaxed);
  195. AWS_LOGF_TRACE(
  196. AWS_LS_EVENT_STREAM_RPC_SERVER,
  197. "id=%p: connection acquired, new ref count is %zu.",
  198. (void *)connection,
  199. current_count + 1);
  200. }
  201. void aws_event_stream_rpc_server_connection_release(struct aws_event_stream_rpc_server_connection *connection) {
  202. if (!connection) {
  203. return;
  204. }
  205. size_t value = aws_atomic_fetch_sub_explicit(&connection->ref_count, 1, aws_memory_order_seq_cst);
  206. AWS_LOGF_TRACE(
  207. AWS_LS_EVENT_STREAM_RPC_SERVER,
  208. "id=%p: connection released, new ref count is %zu.",
  209. (void *)connection,
  210. value - 1);
  211. if (value == 1) {
  212. AWS_LOGF_DEBUG(AWS_LS_EVENT_STREAM_RPC_SERVER, "id=%p: destroying connection.", (void *)connection);
  213. aws_channel_release_hold(connection->channel);
  214. aws_hash_table_clean_up(&connection->continuation_table);
  215. aws_event_stream_rpc_server_listener_release(connection->server);
  216. aws_mem_release(connection->allocator, connection);
  217. }
  218. }
  219. /* incoming from a socket on this listener. */
  220. static void s_on_accept_channel_setup(
  221. struct aws_server_bootstrap *bootstrap,
  222. int error_code,
  223. struct aws_channel *channel,
  224. void *user_data) {
  225. (void)bootstrap;
  226. struct aws_event_stream_rpc_server_listener *server = user_data;
  227. if (!error_code) {
  228. AWS_LOGF_INFO(
  229. AWS_LS_EVENT_STREAM_RPC_SERVER,
  230. "id=%p: incoming connection with channel %p.",
  231. (void *)server,
  232. (void *)channel);
  233. AWS_FATAL_ASSERT(channel && "Channel should never be null with a 0 error code.");
  234. struct aws_event_stream_rpc_server_connection *connection = s_create_connection_on_channel(server, channel);
  235. if (!connection) {
  236. int error = aws_last_error();
  237. server->on_new_connection(NULL, error, NULL, server->user_data);
  238. aws_channel_shutdown(channel, error);
  239. }
  240. struct aws_event_stream_rpc_connection_options connection_options;
  241. AWS_ZERO_STRUCT(connection_options);
  242. aws_event_stream_rpc_server_connection_acquire(connection);
  243. AWS_LOGF_TRACE(
  244. AWS_LS_EVENT_STREAM_RPC_SERVER,
  245. "id=%p: invoking on_new_connection with connection %p.",
  246. (void *)server,
  247. (void *)connection);
  248. if (server->on_new_connection(connection, AWS_ERROR_SUCCESS, &connection_options, server->user_data)) {
  249. aws_channel_shutdown(channel, aws_last_error());
  250. aws_event_stream_rpc_server_connection_release(connection);
  251. return;
  252. }
  253. AWS_FATAL_ASSERT(
  254. connection_options.on_connection_protocol_message && "on_connection_protocol_message must be specified!");
  255. AWS_FATAL_ASSERT(connection_options.on_incoming_stream && "on_incoming_stream must be specified");
  256. connection->on_incoming_stream = connection_options.on_incoming_stream;
  257. connection->on_connection_protocol_message = connection_options.on_connection_protocol_message;
  258. connection->user_data = connection_options.user_data;
  259. connection->bootstrap_owned = true;
  260. aws_event_stream_rpc_server_connection_release(connection);
  261. } else {
  262. AWS_LOGF_ERROR(
  263. AWS_LS_EVENT_STREAM_RPC_SERVER,
  264. "id=%p: invoking on_new_connection with error %s",
  265. (void *)server,
  266. aws_error_debug_str(error_code));
  267. server->on_new_connection(NULL, error_code, NULL, server->user_data);
  268. }
  269. }
  270. /* this is just to get the connection object off of the channel. */
  271. static inline struct aws_event_stream_rpc_server_connection *s_rpc_connection_from_channel(
  272. struct aws_channel *channel) {
  273. struct aws_channel_slot *our_slot = NULL;
  274. struct aws_channel_slot *current_slot = aws_channel_get_first_slot(channel);
  275. AWS_FATAL_ASSERT(
  276. current_slot &&
  277. "It should be logically impossible to have a channel in this callback that doesn't have a slot in it");
  278. while (current_slot->adj_right) {
  279. current_slot = current_slot->adj_right;
  280. }
  281. our_slot = current_slot;
  282. struct aws_channel_handler *our_handler = our_slot->handler;
  283. return aws_event_stream_channel_handler_get_user_data(our_handler);
  284. }
  285. static void s_on_accept_channel_shutdown(
  286. struct aws_server_bootstrap *bootstrap,
  287. int error_code,
  288. struct aws_channel *channel,
  289. void *user_data) {
  290. (void)bootstrap;
  291. struct aws_event_stream_rpc_server_listener *server = user_data;
  292. struct aws_event_stream_rpc_server_connection *connection = s_rpc_connection_from_channel(channel);
  293. AWS_LOGF_DEBUG(
  294. AWS_LS_EVENT_STREAM_RPC_SERVER,
  295. "id=%p: channel %p and connection %p shutdown occurred with error %s",
  296. (void *)server,
  297. (void *)channel,
  298. (void *)connection,
  299. aws_error_debug_str(error_code));
  300. aws_atomic_store_int(&connection->is_open, 0U);
  301. aws_hash_table_clear(&connection->continuation_table);
  302. aws_event_stream_rpc_server_connection_acquire(connection);
  303. server->on_connection_shutdown(connection, error_code, server->user_data);
  304. aws_event_stream_rpc_server_connection_release(connection);
  305. aws_event_stream_rpc_server_connection_release(connection);
  306. }
  307. static void s_on_server_listener_destroy(struct aws_server_bootstrap *bootstrap, void *user_data) {
  308. (void)bootstrap;
  309. struct aws_event_stream_rpc_server_listener *listener = user_data;
  310. AWS_LOGF_INFO(AWS_LS_EVENT_STREAM_RPC_SERVER, "id=%p: destroying server", (void *)listener);
  311. /* server bootstrap invokes this callback regardless of if the listener was successfully created, so
  312. * just check that we successfully set it up before freeing anything. When that's fixed in aws-c-io, this
  313. * code will still be correct, so just leave it here for now. */
  314. if (listener->initialized) {
  315. if (listener->on_destroy_callback) {
  316. listener->on_destroy_callback(listener, listener->user_data);
  317. }
  318. aws_mem_release(listener->allocator, listener);
  319. }
  320. }
  321. struct aws_event_stream_rpc_server_listener *aws_event_stream_rpc_server_new_listener(
  322. struct aws_allocator *allocator,
  323. struct aws_event_stream_rpc_server_listener_options *options) {
  324. struct aws_event_stream_rpc_server_listener *server =
  325. aws_mem_calloc(allocator, 1, sizeof(struct aws_event_stream_rpc_server_listener));
  326. if (!server) {
  327. AWS_LOGF_ERROR(
  328. AWS_LS_EVENT_STREAM_RPC_SERVER,
  329. "static: failed to allocate new server with error %s",
  330. aws_error_debug_str(aws_last_error()));
  331. return NULL;
  332. }
  333. AWS_LOGF_DEBUG(AWS_LS_EVENT_STREAM_RPC_SERVER, "static: new server is %p", (void *)server);
  334. aws_atomic_init_int(&server->ref_count, 1);
  335. struct aws_server_socket_channel_bootstrap_options bootstrap_options = {
  336. .bootstrap = options->bootstrap,
  337. .socket_options = options->socket_options,
  338. .tls_options = options->tls_options,
  339. .enable_read_back_pressure = false,
  340. .host_name = options->host_name,
  341. .port = options->port,
  342. .incoming_callback = s_on_accept_channel_setup,
  343. .shutdown_callback = s_on_accept_channel_shutdown,
  344. .destroy_callback = s_on_server_listener_destroy,
  345. .user_data = server,
  346. };
  347. server->bootstrap = options->bootstrap;
  348. server->allocator = allocator;
  349. server->on_destroy_callback = options->on_destroy_callback;
  350. server->on_new_connection = options->on_new_connection;
  351. server->on_connection_shutdown = options->on_connection_shutdown;
  352. server->user_data = options->user_data;
  353. server->listener = aws_server_bootstrap_new_socket_listener(&bootstrap_options);
  354. if (!server->listener) {
  355. AWS_LOGF_ERROR(
  356. AWS_LS_EVENT_STREAM_RPC_SERVER,
  357. "static: failed to allocate new socket listener with error %s",
  358. aws_error_debug_str(aws_last_error()));
  359. goto error;
  360. }
  361. server->initialized = true;
  362. return server;
  363. error:
  364. if (server->listener) {
  365. aws_server_bootstrap_destroy_socket_listener(options->bootstrap, server->listener);
  366. }
  367. aws_mem_release(server->allocator, server);
  368. return NULL;
  369. }
  370. uint16_t aws_event_stream_rpc_server_listener_get_bound_port(
  371. const struct aws_event_stream_rpc_server_listener *server) {
  372. struct aws_socket_endpoint address;
  373. AWS_ZERO_STRUCT(address);
  374. /* not checking error code because it can't fail when called on a listening socket */
  375. aws_socket_get_bound_address(server->listener, &address);
  376. return address.port;
  377. }
  378. void aws_event_stream_rpc_server_listener_acquire(struct aws_event_stream_rpc_server_listener *server) {
  379. size_t current_count = aws_atomic_fetch_add_explicit(&server->ref_count, 1, aws_memory_order_relaxed);
  380. AWS_LOGF_TRACE(
  381. AWS_LS_EVENT_STREAM_RPC_SERVER,
  382. "id=%p: server acquired, new ref count is %zu.",
  383. (void *)server,
  384. current_count + 1);
  385. }
  386. static void s_destroy_server(struct aws_event_stream_rpc_server_listener *server) {
  387. if (server) {
  388. AWS_LOGF_INFO(AWS_LS_EVENT_STREAM_RPC_SERVER, "id=%p: destroying server", (void *)server);
  389. /* the memory for this is cleaned up in the listener shutdown complete callback. */
  390. aws_server_bootstrap_destroy_socket_listener(server->bootstrap, server->listener);
  391. }
  392. }
  393. void aws_event_stream_rpc_server_listener_release(struct aws_event_stream_rpc_server_listener *server) {
  394. if (!server) {
  395. return;
  396. }
  397. size_t ref_count = aws_atomic_fetch_sub_explicit(&server->ref_count, 1, aws_memory_order_seq_cst);
  398. AWS_LOGF_TRACE(
  399. AWS_LS_EVENT_STREAM_RPC_SERVER, "id=%p: server released, new ref count is %zu.", (void *)server, ref_count - 1);
  400. if (ref_count == 1) {
  401. s_destroy_server(server);
  402. }
  403. }
  404. struct event_stream_connection_send_message_args {
  405. struct aws_allocator *allocator;
  406. struct aws_event_stream_message message;
  407. enum aws_event_stream_rpc_message_type message_type;
  408. struct aws_event_stream_rpc_server_connection *connection;
  409. struct aws_event_stream_rpc_server_continuation_token *continuation;
  410. aws_event_stream_rpc_server_message_flush_fn *flush_fn;
  411. void *user_data;
  412. bool end_stream;
  413. bool terminate_connection;
  414. };
  415. static void s_on_protocol_message_written_fn(
  416. struct aws_event_stream_message *message,
  417. int error_code,
  418. void *user_data) {
  419. (void)message;
  420. struct event_stream_connection_send_message_args *message_args = user_data;
  421. AWS_LOGF_TRACE(
  422. AWS_LS_EVENT_STREAM_RPC_SERVER,
  423. "id=%p: message flushed to channel with error %s",
  424. (void *)message_args->connection,
  425. aws_error_debug_str(error_code));
  426. if (message_args->message_type == AWS_EVENT_STREAM_RPC_MESSAGE_TYPE_CONNECT_ACK) {
  427. AWS_LOGF_TRACE(
  428. AWS_LS_EVENT_STREAM_RPC_SERVER,
  429. "id=%p: connect ack message flushed to wire",
  430. (void *)message_args->connection);
  431. }
  432. if (message_args->end_stream) {
  433. AWS_FATAL_ASSERT(message_args->continuation && "end stream flag was set but it wasn't on a continuation");
  434. AWS_LOGF_DEBUG(
  435. AWS_LS_EVENT_STREAM_RPC_SERVER,
  436. "id=%p: end_stream flag for continuation %p was set, closing",
  437. (void *)message_args->connection,
  438. (void *)message_args->continuation);
  439. aws_atomic_store_int(&message_args->continuation->is_closed, 1U);
  440. aws_hash_table_remove(
  441. &message_args->connection->continuation_table, &message_args->continuation->stream_id, NULL, NULL);
  442. }
  443. message_args->flush_fn(error_code, message_args->user_data);
  444. if (message_args->terminate_connection) {
  445. AWS_LOGF_INFO(
  446. AWS_LS_EVENT_STREAM_RPC_SERVER,
  447. "id=%p: terminate connection flag was set. closing",
  448. (void *)message_args->connection);
  449. aws_event_stream_rpc_server_connection_close(message_args->connection, AWS_ERROR_SUCCESS);
  450. }
  451. aws_event_stream_rpc_server_connection_release(message_args->connection);
  452. if (message_args->continuation) {
  453. aws_event_stream_rpc_server_continuation_release(message_args->continuation);
  454. }
  455. aws_event_stream_message_clean_up(&message_args->message);
  456. aws_mem_release(message_args->allocator, message_args);
  457. }
  458. static int s_send_protocol_message(
  459. struct aws_event_stream_rpc_server_connection *connection,
  460. struct aws_event_stream_rpc_server_continuation_token *continuation,
  461. const struct aws_event_stream_rpc_message_args *message_args,
  462. int32_t stream_id,
  463. aws_event_stream_rpc_server_message_flush_fn *flush_fn,
  464. void *user_data) {
  465. size_t connect_handshake_state = aws_atomic_load_int(&connection->handshake_state);
  466. AWS_LOGF_TRACE(
  467. AWS_LS_EVENT_STREAM_RPC_SERVER,
  468. "id=%p: connect handshake state %zu",
  469. (void *)connection,
  470. connect_handshake_state);
  471. /* handshake step 1 is a connect message being received. Handshake 2 is the connect ack being sent.
  472. * no messages other than connect and connect ack are allowed until this count reaches 2. */
  473. if (connect_handshake_state != CONNECTION_HANDSHAKE_STATE_CONNECT_ACK_PROCESSED &&
  474. message_args->message_type < AWS_EVENT_STREAM_RPC_MESSAGE_TYPE_CONNECT_ACK) {
  475. AWS_LOGF_TRACE(
  476. AWS_LS_EVENT_STREAM_RPC_SERVER,
  477. "id=%p: invalid state, a message was received prior to connect handshake completion",
  478. (void *)connection);
  479. return aws_raise_error(AWS_ERROR_EVENT_STREAM_RPC_PROTOCOL_ERROR);
  480. }
  481. struct event_stream_connection_send_message_args *args =
  482. aws_mem_calloc(connection->allocator, 1, sizeof(struct event_stream_connection_send_message_args));
  483. if (!message_args) {
  484. AWS_LOGF_ERROR(
  485. AWS_LS_EVENT_STREAM_RPC_SERVER,
  486. "id=%p: allocation of callback args failed with error %s",
  487. (void *)connection,
  488. aws_error_debug_str(aws_last_error()));
  489. return AWS_OP_ERR;
  490. }
  491. args->allocator = connection->allocator;
  492. args->user_data = user_data;
  493. args->message_type = message_args->message_type;
  494. args->connection = connection;
  495. args->flush_fn = flush_fn;
  496. if (continuation) {
  497. args->continuation = continuation;
  498. aws_event_stream_rpc_server_continuation_acquire(continuation);
  499. if (message_args->message_flags & AWS_EVENT_STREAM_RPC_MESSAGE_FLAG_TERMINATE_STREAM) {
  500. AWS_LOGF_DEBUG(
  501. AWS_LS_EVENT_STREAM_RPC_SERVER,
  502. "id=%p: continuation with terminate stream flag was specified closing",
  503. (void *)continuation);
  504. args->end_stream = true;
  505. }
  506. }
  507. if (message_args->message_type == AWS_EVENT_STREAM_RPC_MESSAGE_TYPE_CONNECT_ACK) {
  508. AWS_LOGF_INFO(
  509. AWS_LS_EVENT_STREAM_RPC_SERVER,
  510. "id=%p: sending connect ack message, the connect handshake is completed",
  511. (void *)connection);
  512. aws_atomic_store_int(&connection->handshake_state, CONNECTION_HANDSHAKE_STATE_CONNECT_ACK_PROCESSED);
  513. if (!(message_args->message_flags & AWS_EVENT_STREAM_RPC_MESSAGE_FLAG_CONNECTION_ACCEPTED)) {
  514. AWS_LOGF_DEBUG(
  515. AWS_LS_EVENT_STREAM_RPC_SERVER,
  516. "id=%p: connection ack was rejected closing connection",
  517. (void *)connection);
  518. args->terminate_connection = true;
  519. }
  520. }
  521. args->flush_fn = flush_fn;
  522. size_t headers_count = 0;
  523. if (aws_add_size_checked(message_args->headers_count, 3, &headers_count)) {
  524. AWS_LOGF_ERROR(
  525. AWS_LS_EVENT_STREAM_RPC_SERVER,
  526. "id=%p: integer overflow detected when using headers_count %zu",
  527. (void *)connection,
  528. message_args->headers_count);
  529. goto args_allocated_before_failure;
  530. }
  531. struct aws_array_list headers_list;
  532. AWS_ZERO_STRUCT(headers_list);
  533. if (aws_array_list_init_dynamic(
  534. &headers_list, connection->allocator, headers_count, sizeof(struct aws_event_stream_header_value_pair))) {
  535. AWS_LOGF_ERROR(
  536. AWS_LS_EVENT_STREAM_RPC_SERVER,
  537. "id=%p: allocation of headers failed with error %s",
  538. (void *)connection,
  539. aws_error_debug_str(aws_last_error()));
  540. goto args_allocated_before_failure;
  541. }
  542. /* since we preallocated the space for the headers, these can't fail, but we'll go ahead an assert on them just in
  543. * case */
  544. for (size_t i = 0; i < message_args->headers_count; ++i) {
  545. AWS_FATAL_ASSERT(!aws_array_list_push_back(&headers_list, &message_args->headers[i]));
  546. }
  547. AWS_FATAL_ASSERT(!aws_event_stream_add_int32_header(
  548. &headers_list,
  549. (const char *)aws_event_stream_rpc_message_type_name.ptr,
  550. (uint8_t)aws_event_stream_rpc_message_type_name.len,
  551. message_args->message_type));
  552. AWS_FATAL_ASSERT(!aws_event_stream_add_int32_header(
  553. &headers_list,
  554. (const char *)aws_event_stream_rpc_message_flags_name.ptr,
  555. (uint8_t)aws_event_stream_rpc_message_flags_name.len,
  556. message_args->message_flags));
  557. AWS_FATAL_ASSERT(!aws_event_stream_add_int32_header(
  558. &headers_list,
  559. (const char *)aws_event_stream_rpc_stream_id_name.ptr,
  560. (uint8_t)aws_event_stream_rpc_stream_id_name.len,
  561. stream_id));
  562. int message_init_err_code =
  563. aws_event_stream_message_init(&args->message, connection->allocator, &headers_list, message_args->payload);
  564. aws_array_list_clean_up(&headers_list);
  565. if (message_init_err_code) {
  566. AWS_LOGF_ERROR(
  567. AWS_LS_EVENT_STREAM_RPC_SERVER,
  568. "id=%p: initialization of message failed with error %s",
  569. (void *)connection,
  570. aws_error_debug_str(aws_last_error()));
  571. goto args_allocated_before_failure;
  572. }
  573. aws_event_stream_rpc_server_connection_acquire(connection);
  574. if (aws_event_stream_channel_handler_write_message(
  575. connection->event_stream_handler, &args->message, s_on_protocol_message_written_fn, args)) {
  576. AWS_LOGF_ERROR(
  577. AWS_LS_EVENT_STREAM_RPC_SERVER,
  578. "id=%p: message send failed with error %s",
  579. (void *)connection,
  580. aws_error_debug_str(aws_last_error()));
  581. goto message_initialized_before_failure;
  582. }
  583. return AWS_OP_SUCCESS;
  584. message_initialized_before_failure:
  585. aws_event_stream_message_clean_up(&args->message);
  586. args_allocated_before_failure:
  587. aws_mem_release(args->allocator, args);
  588. aws_event_stream_rpc_server_connection_release(connection);
  589. return AWS_OP_ERR;
  590. }
  591. int aws_event_stream_rpc_server_connection_send_protocol_message(
  592. struct aws_event_stream_rpc_server_connection *connection,
  593. const struct aws_event_stream_rpc_message_args *message_args,
  594. aws_event_stream_rpc_server_message_flush_fn *flush_fn,
  595. void *user_data) {
  596. if (!aws_event_stream_rpc_server_connection_is_open(connection)) {
  597. return aws_raise_error(AWS_ERROR_EVENT_STREAM_RPC_CONNECTION_CLOSED);
  598. }
  599. return s_send_protocol_message(connection, NULL, message_args, 0, flush_fn, user_data);
  600. }
  601. void *aws_event_stream_rpc_server_connection_get_user_data(struct aws_event_stream_rpc_server_connection *connection) {
  602. return connection->user_data;
  603. }
  604. AWS_EVENT_STREAM_API void aws_event_stream_rpc_server_override_last_stream_id(
  605. struct aws_event_stream_rpc_server_connection *connection,
  606. int32_t value) {
  607. connection->latest_stream_id = value;
  608. }
  609. void aws_event_stream_rpc_server_connection_close(
  610. struct aws_event_stream_rpc_server_connection *connection,
  611. int shutdown_error_code) {
  612. if (aws_event_stream_rpc_server_connection_is_open(connection)) {
  613. AWS_LOGF_DEBUG(
  614. AWS_LS_EVENT_STREAM_RPC_SERVER,
  615. "id=%p: closing connection with error %s",
  616. (void *)connection,
  617. aws_error_debug_str(shutdown_error_code));
  618. aws_atomic_store_int(&connection->is_open, 0U);
  619. aws_channel_shutdown(connection->channel, shutdown_error_code);
  620. if (!connection->bootstrap_owned) {
  621. aws_hash_table_clear(&connection->continuation_table);
  622. aws_event_stream_rpc_server_connection_release(connection);
  623. }
  624. }
  625. }
  626. bool aws_event_stream_rpc_server_continuation_is_closed(
  627. struct aws_event_stream_rpc_server_continuation_token *continuation) {
  628. return aws_atomic_load_int(&continuation->is_closed) == 1U;
  629. }
  630. bool aws_event_stream_rpc_server_connection_is_open(struct aws_event_stream_rpc_server_connection *connection) {
  631. return aws_atomic_load_int(&connection->is_open) == 1U;
  632. }
  633. void aws_event_stream_rpc_server_continuation_acquire(
  634. struct aws_event_stream_rpc_server_continuation_token *continuation) {
  635. size_t current_count = aws_atomic_fetch_add_explicit(&continuation->ref_count, 1, aws_memory_order_relaxed);
  636. AWS_LOGF_TRACE(
  637. AWS_LS_EVENT_STREAM_RPC_SERVER,
  638. "id=%p: continuation acquired, new ref count is %zu.",
  639. (void *)continuation,
  640. current_count + 1);
  641. }
  642. void aws_event_stream_rpc_server_continuation_release(
  643. struct aws_event_stream_rpc_server_continuation_token *continuation) {
  644. size_t value = aws_atomic_fetch_sub_explicit(&continuation->ref_count, 1, aws_memory_order_seq_cst);
  645. AWS_LOGF_TRACE(
  646. AWS_LS_EVENT_STREAM_RPC_SERVER,
  647. "id=%p: continuation released, new ref count is %zu.",
  648. (void *)continuation,
  649. value - 1);
  650. if (value == 1) {
  651. AWS_LOGF_DEBUG(AWS_LS_EVENT_STREAM_RPC_SERVER, "id=%p: destroying continuation.", (void *)continuation);
  652. struct aws_allocator *allocator = continuation->connection->allocator;
  653. aws_event_stream_rpc_server_connection_release(continuation->connection);
  654. aws_mem_release(allocator, continuation);
  655. }
  656. }
  657. int aws_event_stream_rpc_server_continuation_send_message(
  658. struct aws_event_stream_rpc_server_continuation_token *continuation,
  659. const struct aws_event_stream_rpc_message_args *message_args,
  660. aws_event_stream_rpc_server_message_flush_fn *flush_fn,
  661. void *user_data) {
  662. AWS_FATAL_PRECONDITION(continuation->continuation_fn);
  663. AWS_FATAL_PRECONDITION(continuation->closed_fn);
  664. if (aws_event_stream_rpc_server_continuation_is_closed(continuation)) {
  665. return aws_raise_error(AWS_ERROR_EVENT_STREAM_RPC_STREAM_CLOSED);
  666. }
  667. return s_send_protocol_message(
  668. continuation->connection, continuation, message_args, continuation->stream_id, flush_fn, user_data);
  669. }
  670. static void s_connection_error_message_flush_fn(int error_code, void *user_data) {
  671. (void)error_code;
  672. struct aws_event_stream_rpc_server_connection *connection = user_data;
  673. aws_event_stream_rpc_server_connection_close(connection, AWS_ERROR_EVENT_STREAM_RPC_PROTOCOL_ERROR);
  674. }
  675. static void s_send_connection_level_error(
  676. struct aws_event_stream_rpc_server_connection *connection,
  677. uint32_t message_type,
  678. uint32_t message_flags,
  679. const struct aws_byte_cursor *message) {
  680. struct aws_byte_buf payload_buf = aws_byte_buf_from_array(message->ptr, message->len);
  681. AWS_LOGF_DEBUG(
  682. AWS_LS_EVENT_STREAM_RPC_SERVER,
  683. "id=%p: sending connection-level error\n" PRInSTR,
  684. (void *)connection,
  685. AWS_BYTE_BUF_PRI(payload_buf));
  686. struct aws_event_stream_header_value_pair content_type_header =
  687. aws_event_stream_create_string_header(s_json_content_type_name, s_json_content_type_value);
  688. struct aws_event_stream_header_value_pair headers[] = {
  689. content_type_header,
  690. };
  691. struct aws_event_stream_rpc_message_args message_args = {
  692. .message_type = message_type,
  693. .message_flags = message_flags,
  694. .payload = &payload_buf,
  695. .headers_count = 1,
  696. .headers = headers,
  697. };
  698. aws_event_stream_rpc_server_connection_send_protocol_message(
  699. connection, &message_args, s_connection_error_message_flush_fn, connection);
  700. }
  701. /* TODO: come back and make this a proper state pattern. For now it's branches all over the place until we nail
  702. * down the spec. */
  703. static void s_route_message_by_type(
  704. struct aws_event_stream_rpc_server_connection *connection,
  705. struct aws_event_stream_message *message,
  706. struct aws_array_list *headers_list,
  707. uint32_t stream_id,
  708. uint32_t message_type,
  709. uint32_t message_flags,
  710. struct aws_byte_cursor operation_name) {
  711. struct aws_byte_buf payload_buf = aws_byte_buf_from_array(
  712. aws_event_stream_message_payload(message), aws_event_stream_message_payload_len(message));
  713. struct aws_event_stream_rpc_message_args message_args = {
  714. .headers = headers_list->data,
  715. .headers_count = aws_array_list_length(headers_list),
  716. .payload = &payload_buf,
  717. .message_flags = message_flags,
  718. .message_type = message_type,
  719. };
  720. size_t handshake_state = aws_atomic_load_int(&connection->handshake_state);
  721. /* make sure if this is not a CONNECT message being received, the handshake has been completed. */
  722. if (handshake_state < CONNECTION_HANDSHAKE_STATE_CONNECT_ACK_PROCESSED &&
  723. message_type != AWS_EVENT_STREAM_RPC_MESSAGE_TYPE_CONNECT) {
  724. AWS_LOGF_ERROR(
  725. AWS_LS_EVENT_STREAM_RPC_SERVER,
  726. "id=%p: a message was received on this connection prior to the "
  727. "connect handshake completing",
  728. (void *)connection);
  729. aws_raise_error(AWS_ERROR_EVENT_STREAM_RPC_PROTOCOL_ERROR);
  730. s_send_connection_level_error(
  731. connection, AWS_EVENT_STREAM_RPC_MESSAGE_TYPE_PROTOCOL_ERROR, 0, &s_connect_not_completed_error);
  732. return;
  733. }
  734. /* stream_id being non zero ALWAYS indicates APPLICATION_DATA or APPLICATION_ERROR. */
  735. if (stream_id > 0) {
  736. AWS_LOGF_TRACE(AWS_LS_EVENT_STREAM_RPC_SERVER, "id=%p: stream id %" PRIu32, (void *)connection, stream_id);
  737. struct aws_event_stream_rpc_server_continuation_token *continuation = NULL;
  738. if (message_type > AWS_EVENT_STREAM_RPC_MESSAGE_TYPE_APPLICATION_ERROR) {
  739. AWS_LOGF_ERROR(
  740. AWS_LS_EVENT_STREAM_RPC_SERVER,
  741. "id=%p: only application messages can be sent on a stream id, "
  742. "but this message is the incorrect type",
  743. (void *)connection);
  744. aws_raise_error(AWS_ERROR_EVENT_STREAM_RPC_PROTOCOL_ERROR);
  745. s_send_connection_level_error(
  746. connection, AWS_EVENT_STREAM_RPC_MESSAGE_TYPE_PROTOCOL_ERROR, 0, &s_invalid_stream_id_error);
  747. return;
  748. }
  749. /* INT32_MAX is the max stream id. */
  750. if (stream_id > INT32_MAX) {
  751. AWS_LOGF_ERROR(
  752. AWS_LS_EVENT_STREAM_RPC_SERVER,
  753. "id=%p: stream_id is larger than the max acceptable value",
  754. (void *)connection);
  755. aws_raise_error(AWS_ERROR_EVENT_STREAM_RPC_PROTOCOL_ERROR);
  756. s_send_connection_level_error(
  757. connection, AWS_EVENT_STREAM_RPC_MESSAGE_TYPE_PROTOCOL_ERROR, 0, &s_invalid_stream_id_error);
  758. return;
  759. }
  760. /* if the stream is in the past, look it up from the continuation table. If it's not there, that's an error.
  761. * if it is, find it and notify the user a message arrived */
  762. if (stream_id <= connection->latest_stream_id) {
  763. AWS_LOGF_ERROR(
  764. AWS_LS_EVENT_STREAM_RPC_SERVER,
  765. "id=%p: stream_id is an already seen stream_id, looking for existing continuation",
  766. (void *)connection);
  767. struct aws_hash_element *continuation_element = NULL;
  768. if (aws_hash_table_find(&connection->continuation_table, &stream_id, &continuation_element) ||
  769. !continuation_element) {
  770. AWS_LOGF_ERROR(
  771. AWS_LS_EVENT_STREAM_RPC_SERVER,
  772. "id=%p: stream_id does not have a corresponding continuation",
  773. (void *)connection);
  774. aws_raise_error(AWS_ERROR_EVENT_STREAM_RPC_PROTOCOL_ERROR);
  775. s_send_connection_level_error(
  776. connection, AWS_EVENT_STREAM_RPC_MESSAGE_TYPE_PROTOCOL_ERROR, 0, &s_invalid_client_stream_id_error);
  777. return;
  778. }
  779. continuation = continuation_element->value;
  780. AWS_LOGF_TRACE(
  781. AWS_LS_EVENT_STREAM_RPC_SERVER,
  782. "id=%p: stream_id corresponds to continuation %p",
  783. (void *)connection,
  784. (void *)continuation);
  785. /*
  786. * I don't think it's possible for the continuation_fn to be NULL at this point, but given the
  787. * multiple partially-initialized object crashes we've had, let's be safe.
  788. */
  789. if (continuation->continuation_fn != NULL) {
  790. aws_event_stream_rpc_server_continuation_acquire(continuation);
  791. continuation->continuation_fn(continuation, &message_args, continuation->user_data);
  792. aws_event_stream_rpc_server_continuation_release(continuation);
  793. }
  794. /* now these are potentially new streams. Make sure they're in bounds, create a new continuation
  795. * and notify the user the stream has been created, then send them the message. */
  796. } else {
  797. AWS_LOGF_TRACE(
  798. AWS_LS_EVENT_STREAM_RPC_SERVER,
  799. "id=%p: stream_id is unknown, attempting to create a continuation for it",
  800. (void *)connection);
  801. if (stream_id != connection->latest_stream_id + 1) {
  802. AWS_LOGF_ERROR(
  803. AWS_LS_EVENT_STREAM_RPC_SERVER,
  804. "id=%p: stream_id is invalid because it's not sequentially increasing",
  805. (void *)connection);
  806. aws_raise_error(AWS_ERROR_EVENT_STREAM_RPC_PROTOCOL_ERROR);
  807. s_send_connection_level_error(
  808. connection,
  809. AWS_EVENT_STREAM_RPC_MESSAGE_TYPE_PROTOCOL_ERROR,
  810. 0,
  811. &s_invalid_new_client_stream_id_error);
  812. return;
  813. }
  814. /* new streams must always have an operation name. */
  815. if (operation_name.len == 0) {
  816. AWS_LOGF_ERROR(
  817. AWS_LS_EVENT_STREAM_RPC_SERVER,
  818. "id=%p: new stream_id encountered, but an operation name was not received",
  819. (void *)connection);
  820. aws_raise_error(AWS_ERROR_EVENT_STREAM_RPC_PROTOCOL_ERROR);
  821. s_send_connection_level_error(
  822. connection, AWS_EVENT_STREAM_RPC_MESSAGE_TYPE_PROTOCOL_ERROR, 0, &s_missing_operation_name_error);
  823. return;
  824. }
  825. AWS_LOGF_DEBUG(
  826. AWS_LS_EVENT_STREAM_RPC_SERVER,
  827. "id=%p: stream_id is a valid new stream. Creating continuation",
  828. (void *)connection);
  829. continuation =
  830. aws_mem_calloc(connection->allocator, 1, sizeof(struct aws_event_stream_rpc_server_continuation_token));
  831. if (!continuation) {
  832. AWS_LOGF_ERROR(
  833. AWS_LS_EVENT_STREAM_RPC_SERVER,
  834. "id=%p: continuation allocation failed with error %s",
  835. (void *)connection,
  836. aws_error_debug_str(aws_last_error()));
  837. s_send_connection_level_error(
  838. connection, AWS_EVENT_STREAM_RPC_MESSAGE_TYPE_INTERNAL_ERROR, 0, &s_internal_error);
  839. return;
  840. }
  841. AWS_LOGF_DEBUG(
  842. AWS_LS_EVENT_STREAM_RPC_SERVER,
  843. "id=%p: new continuation is %p",
  844. (void *)connection,
  845. (void *)continuation);
  846. continuation->stream_id = stream_id;
  847. continuation->connection = connection;
  848. aws_event_stream_rpc_server_connection_acquire(continuation->connection);
  849. aws_atomic_init_int(&continuation->ref_count, 1);
  850. if (aws_hash_table_put(&connection->continuation_table, &continuation->stream_id, continuation, NULL)) {
  851. AWS_LOGF_ERROR(
  852. AWS_LS_EVENT_STREAM_RPC_SERVER,
  853. "id=%p: continuation table update failed with error %s",
  854. (void *)connection,
  855. aws_error_debug_str(aws_last_error()));
  856. /* continuation release will drop the connection reference as well */
  857. aws_event_stream_rpc_server_continuation_release(continuation);
  858. s_send_connection_level_error(
  859. connection, AWS_EVENT_STREAM_RPC_MESSAGE_TYPE_INTERNAL_ERROR, 0, &s_internal_error);
  860. return;
  861. }
  862. struct aws_event_stream_rpc_server_stream_continuation_options options;
  863. AWS_ZERO_STRUCT(options);
  864. aws_event_stream_rpc_server_continuation_acquire(continuation);
  865. AWS_LOGF_TRACE(
  866. AWS_LS_EVENT_STREAM_RPC_SERVER, "id=%p: invoking on_incoming_stream callback", (void *)connection);
  867. /*
  868. * This callback must only keep a ref to the continuation on a success path. On a failure, it must
  869. * leave the ref count alone so that the release + removal destroys the continuation
  870. */
  871. if (connection->on_incoming_stream == NULL ||
  872. connection->on_incoming_stream(
  873. continuation->connection, continuation, operation_name, &options, connection->user_data)) {
  874. AWS_FATAL_ASSERT(aws_atomic_load_int(&continuation->ref_count) == 2);
  875. /* undo the continuation acquire that was done a few lines above */
  876. aws_event_stream_rpc_server_continuation_release(continuation);
  877. /* removing the continuation from the table will do the final decref on the continuation */
  878. aws_hash_table_remove(&connection->continuation_table, &continuation->stream_id, NULL, NULL);
  879. s_send_connection_level_error(
  880. connection, AWS_EVENT_STREAM_RPC_MESSAGE_TYPE_INTERNAL_ERROR, 0, &s_internal_error);
  881. return;
  882. }
  883. AWS_FATAL_ASSERT(options.on_continuation);
  884. AWS_FATAL_ASSERT(options.on_continuation_closed);
  885. continuation->continuation_fn = options.on_continuation;
  886. continuation->closed_fn = options.on_continuation_closed;
  887. continuation->user_data = options.user_data;
  888. connection->latest_stream_id = stream_id;
  889. continuation->continuation_fn(continuation, &message_args, continuation->user_data);
  890. /* undo the acquire made before the on_incoming_stream callback invocation */
  891. aws_event_stream_rpc_server_continuation_release(continuation);
  892. }
  893. /* if it was a terminal stream message purge it from the hash table. The delete will decref the continuation. */
  894. if (message_flags & AWS_EVENT_STREAM_RPC_MESSAGE_FLAG_TERMINATE_STREAM) {
  895. AWS_LOGF_DEBUG(
  896. AWS_LS_EVENT_STREAM_RPC_SERVER,
  897. "id=%p: the terminate_stream flag was received for continuation %p, closing",
  898. (void *)connection,
  899. (void *)continuation);
  900. aws_atomic_store_int(&continuation->is_closed, 1U);
  901. aws_hash_table_remove(&connection->continuation_table, &stream_id, NULL, NULL);
  902. }
  903. } else {
  904. if (message_type <= AWS_EVENT_STREAM_RPC_MESSAGE_TYPE_APPLICATION_ERROR ||
  905. message_type >= AWS_EVENT_STREAM_RPC_MESSAGE_TYPE_COUNT) {
  906. AWS_LOGF_ERROR(
  907. AWS_LS_EVENT_STREAM_RPC_SERVER,
  908. "id=%p: a zero stream id was received with an invalid message-type %" PRIu32,
  909. (void *)connection,
  910. message_type);
  911. s_send_connection_level_error(
  912. connection, AWS_EVENT_STREAM_RPC_MESSAGE_TYPE_PROTOCOL_ERROR, 0, &s_invalid_message_type_error);
  913. return;
  914. }
  915. if (message_type == AWS_EVENT_STREAM_RPC_MESSAGE_TYPE_CONNECT) {
  916. if (handshake_state) {
  917. AWS_LOGF_ERROR(
  918. AWS_LS_EVENT_STREAM_RPC_SERVER,
  919. "id=%p: connect received but the handshake is already completed. Only one is allowed.",
  920. (void *)connection);
  921. /* only one connect is allowed. This would be a duplicate. */
  922. s_send_connection_level_error(
  923. connection, AWS_EVENT_STREAM_RPC_MESSAGE_TYPE_PROTOCOL_ERROR, 0, &s_connect_not_completed_error);
  924. return;
  925. }
  926. aws_atomic_store_int(&connection->handshake_state, CONNECTION_HANDSHAKE_STATE_CONNECT_PROCESSED);
  927. AWS_LOGF_INFO(
  928. AWS_LS_EVENT_STREAM_RPC_SERVER,
  929. "id=%p: connect received, connection handshake completion pending the server sending an ack.",
  930. (void *)connection);
  931. }
  932. if (connection->on_connection_protocol_message != NULL) {
  933. connection->on_connection_protocol_message(connection, &message_args, connection->user_data);
  934. }
  935. }
  936. }
  937. /* invoked by the event stream channel handler when a complete message has been read from the channel. */
  938. static void s_on_message_received(struct aws_event_stream_message *message, int error_code, void *user_data) {
  939. if (!error_code) {
  940. struct aws_event_stream_rpc_server_connection *connection = user_data;
  941. AWS_LOGF_TRACE(
  942. AWS_LS_EVENT_STREAM_RPC_SERVER,
  943. "id=%p: message received on connection of length %" PRIu32,
  944. (void *)connection,
  945. aws_event_stream_message_total_length(message));
  946. struct aws_array_list headers;
  947. if (aws_array_list_init_dynamic(
  948. &headers, connection->allocator, 8, sizeof(struct aws_event_stream_header_value_pair))) {
  949. AWS_LOGF_ERROR(
  950. AWS_LS_EVENT_STREAM_RPC_SERVER,
  951. "id=%p: error initializing headers %s",
  952. (void *)connection,
  953. aws_error_debug_str(aws_last_error()));
  954. s_send_connection_level_error(
  955. connection, AWS_EVENT_STREAM_RPC_MESSAGE_TYPE_INTERNAL_ERROR, 0, &s_internal_error);
  956. return;
  957. }
  958. if (aws_event_stream_message_headers(message, &headers)) {
  959. AWS_LOGF_ERROR(
  960. AWS_LS_EVENT_STREAM_RPC_SERVER,
  961. "id=%p: error fetching headers %s",
  962. (void *)connection,
  963. aws_error_debug_str(aws_last_error()));
  964. s_send_connection_level_error(
  965. connection, AWS_EVENT_STREAM_RPC_MESSAGE_TYPE_INTERNAL_ERROR, 0, &s_internal_error);
  966. goto clean_up;
  967. }
  968. int32_t stream_id = -1;
  969. int32_t message_type = -1;
  970. int32_t message_flags = -1;
  971. struct aws_byte_buf operation_name_buf;
  972. AWS_ZERO_STRUCT(operation_name_buf);
  973. if (aws_event_stream_rpc_extract_message_metadata(
  974. &headers, &stream_id, &message_type, &message_flags, &operation_name_buf)) {
  975. AWS_LOGF_ERROR(
  976. AWS_LS_EVENT_STREAM_RPC_SERVER,
  977. "id=%p: invalid protocol message with error %s",
  978. (void *)connection,
  979. aws_error_debug_str(aws_last_error()));
  980. s_send_connection_level_error(
  981. connection, AWS_EVENT_STREAM_RPC_MESSAGE_TYPE_PROTOCOL_ERROR, 0, &s_invalid_message_error);
  982. goto clean_up;
  983. }
  984. AWS_LOGF_TRACE(AWS_LS_EVENT_STREAM_RPC_SERVER, "id=%p: routing message", (void *)connection);
  985. s_route_message_by_type(
  986. connection,
  987. message,
  988. &headers,
  989. stream_id,
  990. message_type,
  991. message_flags,
  992. aws_byte_cursor_from_buf(&operation_name_buf));
  993. clean_up:
  994. aws_event_stream_headers_list_cleanup(&headers);
  995. }
  996. }