event_stream_rpc_server.c 46 KB

12345678910111213141516171819202122232425262728293031323334353637383940414243444546474849505152535455565758596061626364656667686970717273747576777879808182838485868788899091929394959697989910010110210310410510610710810911011111211311411511611711811912012112212312412512612712812913013113213313413513613713813914014114214314414514614714814915015115215315415515615715815916016116216316416516616716816917017117217317417517617717817918018118218318418518618718818919019119219319419519619719819920020120220320420520620720820921021121221321421521621721821922022122222322422522622722822923023123223323423523623723823924024124224324424524624724824925025125225325425525625725825926026126226326426526626726826927027127227327427527627727827928028128228328428528628728828929029129229329429529629729829930030130230330430530630730830931031131231331431531631731831932032132232332432532632732832933033133233333433533633733833934034134234334434534634734834935035135235335435535635735835936036136236336436536636736836937037137237337437537637737837938038138238338438538638738838939039139239339439539639739839940040140240340440540640740840941041141241341441541641741841942042142242342442542642742842943043143243343443543643743843944044144244344444544644744844945045145245345445545645745845946046146246346446546646746846947047147247347447547647747847948048148248348448548648748848949049149249349449549649749849950050150250350450550650750850951051151251351451551651751851952052152252352452552652752852953053153253353453553653753853954054154254354454554654754854955055155255355455555655755855956056156256356456556656756856957057157257357457557657757857958058158258358458558658758858959059159259359459559659759859960060160260360460560660760860961061161261361461561661761861962062162262362462562662762862963063163263363463563663763863964064164264364464564664764864965065165265365465565665765865966066166266366466566666766866967067167267367467567667767867968068168268368468568668768868969069169269369469569669769869970070170270370470570670770870971071171271371471571671771871972072172272372472572672772872973073173273373473573673773873974074174274374474574674774874975075175275375475575675775875976076176276376476576676776876977077177277377477577677777877978078178278378478578678778878979079179279379479579679779879980080180280380480580680780880981081181281381481581681781881982082182282382482582682782882983083183283383483583683783883984084184284384484584684784884985085185285385485585685785885986086186286386486586686786886987087187287387487587687787887988088188288388488588688788888989089189289389489589689789889990090190290390490590690790890991091191291391491591691791891992092192292392492592692792892993093193293393493593693793893994094194294394494594694794894995095195295395495595695795895996096196296396496596696796896997097197297397497597697797897998098198298398498598698798898999099199299399499599699799899910001001100210031004100510061007100810091010101110121013101410151016101710181019102010211022102310241025102610271028102910301031103210331034103510361037103810391040104110421043104410451046104710481049105010511052105310541055105610571058105910601061106210631064106510661067106810691070107110721073107410751076107710781079108010811082108310841085108610871088108910901091109210931094109510961097109810991100110111021103110411051106110711081109111011111112111311141115
  1. /*
  2. * Copyright 2010-2020 Amazon.com, Inc. or its affiliates. All Rights Reserved.
  3. *
  4. * Licensed under the Apache License, Version 2.0 (the "License").
  5. * You may not use this file except in compliance with the License.
  6. * A copy of the License is located at
  7. *
  8. * http://aws.amazon.com/apache2.0
  9. *
  10. * or in the "license" file accompanying this file. This file is distributed
  11. * on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either
  12. * express or implied. See the License for the specific language governing
  13. * permissions and limitations under the License.
  14. */
  15. #include <aws/event-stream/event_stream_channel_handler.h>
  16. #include <aws/event-stream/event_stream_rpc_server.h>
  17. #include <aws/event-stream/private/event_stream_rpc_priv.h>
  18. #include <aws/common/atomics.h>
  19. #include <aws/common/hash_table.h>
  20. #include <aws/io/channel.h>
  21. #include <aws/io/channel_bootstrap.h>
  22. #include <inttypes.h>
  23. #if defined(_MSC_VER)
  24. /* allow non-constant aggregate initializer */
  25. # pragma warning(disable : 4204)
  26. /* allow passing a pointer to an automatically allocated variable around, cause I'm smarter than the compiler. */
  27. # pragma warning(disable : 4221)
  28. #endif
  29. static const struct aws_byte_cursor s_missing_operation_name_error = AWS_BYTE_CUR_INIT_FROM_STRING_LITERAL(
  30. "{ \"message\": \"The first message for on a non-zero :stream-id must contain an operation header value.\"; }");
  31. struct aws_event_stream_rpc_server_listener {
  32. struct aws_allocator *allocator;
  33. struct aws_socket *listener;
  34. struct aws_server_bootstrap *bootstrap;
  35. struct aws_atomic_var ref_count;
  36. aws_event_stream_rpc_server_on_new_connection_fn *on_new_connection;
  37. aws_event_stream_rpc_server_on_connection_shutdown_fn *on_connection_shutdown;
  38. aws_event_stream_rpc_server_on_listener_destroy_fn *on_destroy_callback;
  39. size_t initial_window_size;
  40. bool enable_read_backpressure;
  41. bool initialized;
  42. void *user_data;
  43. };
  44. struct aws_event_stream_rpc_server_connection {
  45. struct aws_allocator *allocator;
  46. struct aws_hash_table continuation_table;
  47. struct aws_event_stream_rpc_server_listener *server;
  48. struct aws_atomic_var ref_count;
  49. aws_event_stream_rpc_server_on_incoming_stream_fn *on_incoming_stream;
  50. aws_event_stream_rpc_server_connection_protocol_message_fn *on_connection_protocol_message;
  51. struct aws_channel *channel;
  52. struct aws_channel_handler *event_stream_handler;
  53. uint32_t latest_stream_id;
  54. void *user_data;
  55. struct aws_atomic_var is_open;
  56. struct aws_atomic_var handshake_state;
  57. bool bootstrap_owned;
  58. };
  59. struct aws_event_stream_rpc_server_continuation_token {
  60. uint32_t stream_id;
  61. struct aws_event_stream_rpc_server_connection *connection;
  62. aws_event_stream_rpc_server_stream_continuation_fn *continuation_fn;
  63. aws_event_stream_rpc_server_stream_continuation_closed_fn *closed_fn;
  64. void *user_data;
  65. struct aws_atomic_var ref_count;
  66. struct aws_atomic_var is_closed;
  67. };
  68. /** This is the destructor callback invoked by the connections continuation table when a continuation is removed
  69. * from the hash table.
  70. */
  71. void s_continuation_destroy(void *value) {
  72. struct aws_event_stream_rpc_server_continuation_token *continuation = value;
  73. AWS_LOGF_DEBUG(AWS_LS_EVENT_STREAM_RPC_SERVER, "id=%p: destroying continuation", (void *)continuation);
  74. continuation->closed_fn(continuation, continuation->user_data);
  75. aws_event_stream_rpc_server_continuation_release(continuation);
  76. }
  77. static void s_on_message_received(struct aws_event_stream_message *message, int error_code, void *user_data);
  78. /* We have two paths for creating a connection on a channel. The first is an incoming connection on the server listener.
  79. * The second is adding a connection to an already existing channel. This is the code common to both cases. */
  80. static struct aws_event_stream_rpc_server_connection *s_create_connection_on_channel(
  81. struct aws_event_stream_rpc_server_listener *server,
  82. struct aws_channel *channel) {
  83. AWS_LOGF_TRACE(
  84. AWS_LS_EVENT_STREAM_RPC_SERVER, "id=%p: creating connection on channel %p", (void *)server, (void *)channel);
  85. struct aws_event_stream_rpc_server_connection *connection =
  86. aws_mem_calloc(server->allocator, 1, sizeof(struct aws_event_stream_rpc_server_connection));
  87. struct aws_channel_handler *event_stream_handler = NULL;
  88. struct aws_channel_slot *slot = NULL;
  89. if (!connection) {
  90. AWS_LOGF_ERROR(
  91. AWS_LS_EVENT_STREAM_RPC_SERVER,
  92. "id=%p: allocation failed for connection with error %s",
  93. (void *)server,
  94. aws_error_debug_str(aws_last_error()));
  95. return NULL;
  96. }
  97. AWS_LOGF_DEBUG(AWS_LS_EVENT_STREAM_RPC_SERVER, "id=%p: new connection is %p", (void *)server, (void *)connection);
  98. aws_atomic_init_int(&connection->ref_count, 1u);
  99. aws_atomic_init_int(&connection->is_open, 1u);
  100. /* handshake step 1 is a connect message being received. Handshake 2 is the connect ack being sent.
  101. * no messages other than connect and connect ack are allowed until this count reaches 2. */
  102. aws_atomic_init_int(&connection->handshake_state, CONNECTION_HANDSHAKE_STATE_INITIALIZED);
  103. connection->allocator = server->allocator;
  104. if (aws_hash_table_init(
  105. &connection->continuation_table,
  106. server->allocator,
  107. 64,
  108. aws_event_stream_rpc_hash_streamid,
  109. aws_event_stream_rpc_streamid_eq,
  110. NULL,
  111. s_continuation_destroy)) {
  112. AWS_LOGF_ERROR(
  113. AWS_LS_EVENT_STREAM_RPC_SERVER,
  114. "id=%p: initialization of connection stream table failed with error %s",
  115. (void *)connection,
  116. aws_error_debug_str(aws_last_error()));
  117. goto error;
  118. }
  119. struct aws_event_stream_channel_handler_options handler_options = {
  120. .on_message_received = s_on_message_received,
  121. .user_data = connection,
  122. .initial_window_size = server->initial_window_size,
  123. .manual_window_management = server->enable_read_backpressure,
  124. };
  125. event_stream_handler = aws_event_stream_channel_handler_new(server->allocator, &handler_options);
  126. if (!event_stream_handler) {
  127. AWS_LOGF_ERROR(
  128. AWS_LS_EVENT_STREAM_RPC_SERVER,
  129. "id=%p: initialization of event-stream handler failed with error %s",
  130. (void *)connection,
  131. aws_error_debug_str(aws_last_error()));
  132. goto error;
  133. }
  134. slot = aws_channel_slot_new(channel);
  135. if (!slot) {
  136. AWS_LOGF_ERROR(
  137. AWS_LS_EVENT_STREAM_RPC_SERVER,
  138. "id=%p: initialization of channel slot failed with error %s",
  139. (void *)connection,
  140. aws_error_debug_str(aws_last_error()));
  141. goto error;
  142. }
  143. aws_channel_slot_insert_end(channel, slot);
  144. if (aws_channel_slot_set_handler(slot, event_stream_handler)) {
  145. AWS_LOGF_ERROR(
  146. AWS_LS_EVENT_STREAM_RPC_SERVER,
  147. "id=%p: setting the handler on the slot failed with error %s",
  148. (void *)connection,
  149. aws_error_debug_str(aws_last_error()));
  150. goto error;
  151. }
  152. aws_event_stream_rpc_server_listener_acquire(server);
  153. connection->server = server;
  154. connection->event_stream_handler = event_stream_handler;
  155. connection->channel = channel;
  156. aws_channel_acquire_hold(channel);
  157. return connection;
  158. error:
  159. if (!slot && event_stream_handler) {
  160. aws_channel_handler_destroy(event_stream_handler);
  161. }
  162. if (connection) {
  163. aws_event_stream_rpc_server_connection_release(connection);
  164. }
  165. return NULL;
  166. }
  167. struct aws_event_stream_rpc_server_connection *aws_event_stream_rpc_server_connection_from_existing_channel(
  168. struct aws_event_stream_rpc_server_listener *server,
  169. struct aws_channel *channel,
  170. const struct aws_event_stream_rpc_connection_options *connection_options) {
  171. AWS_FATAL_ASSERT(
  172. connection_options->on_connection_protocol_message && "on_connection_protocol_message must be specified!");
  173. AWS_FATAL_ASSERT(connection_options->on_incoming_stream && "on_connection_protocol_message must be specified");
  174. struct aws_event_stream_rpc_server_connection *connection = s_create_connection_on_channel(server, channel);
  175. if (!connection) {
  176. return NULL;
  177. }
  178. connection->on_incoming_stream = connection_options->on_incoming_stream;
  179. connection->on_connection_protocol_message = connection_options->on_connection_protocol_message;
  180. connection->user_data = connection_options->user_data;
  181. aws_event_stream_rpc_server_connection_acquire(connection);
  182. return connection;
  183. }
  184. void aws_event_stream_rpc_server_connection_acquire(struct aws_event_stream_rpc_server_connection *connection) {
  185. size_t current_count = aws_atomic_fetch_add_explicit(&connection->ref_count, 1, aws_memory_order_relaxed);
  186. AWS_LOGF_TRACE(
  187. AWS_LS_EVENT_STREAM_RPC_SERVER,
  188. "id=%p: connection acquired, new ref count is %zu.",
  189. (void *)connection,
  190. current_count + 1);
  191. }
  192. void aws_event_stream_rpc_server_connection_release(struct aws_event_stream_rpc_server_connection *connection) {
  193. if (!connection) {
  194. return;
  195. }
  196. size_t value = aws_atomic_fetch_sub_explicit(&connection->ref_count, 1, aws_memory_order_seq_cst);
  197. AWS_LOGF_TRACE(
  198. AWS_LS_EVENT_STREAM_RPC_SERVER,
  199. "id=%p: connection released, new ref count is %zu.",
  200. (void *)connection,
  201. value - 1);
  202. if (value == 1) {
  203. AWS_LOGF_DEBUG(AWS_LS_EVENT_STREAM_RPC_SERVER, "id=%p: destroying connection.", (void *)connection);
  204. aws_channel_release_hold(connection->channel);
  205. aws_hash_table_clean_up(&connection->continuation_table);
  206. aws_event_stream_rpc_server_listener_release(connection->server);
  207. aws_mem_release(connection->allocator, connection);
  208. }
  209. }
  210. /* incoming from a socket on this listener. */
  211. static void s_on_accept_channel_setup(
  212. struct aws_server_bootstrap *bootstrap,
  213. int error_code,
  214. struct aws_channel *channel,
  215. void *user_data) {
  216. (void)bootstrap;
  217. struct aws_event_stream_rpc_server_listener *server = user_data;
  218. if (!error_code) {
  219. AWS_LOGF_INFO(
  220. AWS_LS_EVENT_STREAM_RPC_SERVER,
  221. "id=%p: incoming connection with channel %p.",
  222. (void *)server,
  223. (void *)channel);
  224. AWS_FATAL_ASSERT(channel && "Channel should never be null with a 0 error code.");
  225. struct aws_event_stream_rpc_server_connection *connection = s_create_connection_on_channel(server, channel);
  226. if (!connection) {
  227. int error = aws_last_error();
  228. server->on_new_connection(NULL, error, NULL, server->user_data);
  229. aws_channel_shutdown(channel, error);
  230. }
  231. struct aws_event_stream_rpc_connection_options connection_options;
  232. AWS_ZERO_STRUCT(connection_options);
  233. aws_event_stream_rpc_server_connection_acquire(connection);
  234. AWS_LOGF_TRACE(
  235. AWS_LS_EVENT_STREAM_RPC_SERVER,
  236. "id=%p: invoking on_new_connection with connection %p.",
  237. (void *)server,
  238. (void *)connection);
  239. if (server->on_new_connection(connection, AWS_ERROR_SUCCESS, &connection_options, server->user_data)) {
  240. aws_channel_shutdown(channel, aws_last_error());
  241. aws_event_stream_rpc_server_connection_release(connection);
  242. return;
  243. }
  244. AWS_FATAL_ASSERT(
  245. connection_options.on_connection_protocol_message && "on_connection_protocol_message must be specified!");
  246. AWS_FATAL_ASSERT(connection_options.on_incoming_stream && "on_connection_protocol_message must be specified");
  247. connection->on_incoming_stream = connection_options.on_incoming_stream;
  248. connection->on_connection_protocol_message = connection_options.on_connection_protocol_message;
  249. connection->user_data = connection_options.user_data;
  250. connection->bootstrap_owned = true;
  251. aws_event_stream_rpc_server_connection_release(connection);
  252. } else {
  253. AWS_LOGF_ERROR(
  254. AWS_LS_EVENT_STREAM_RPC_SERVER,
  255. "id=%p: invoking on_new_connection with error %s",
  256. (void *)server,
  257. aws_error_debug_str(error_code));
  258. server->on_new_connection(NULL, error_code, NULL, server->user_data);
  259. }
  260. }
  261. /* this is just to get the connection object off of the channel. */
  262. static inline struct aws_event_stream_rpc_server_connection *s_rpc_connection_from_channel(
  263. struct aws_channel *channel) {
  264. struct aws_channel_slot *our_slot = NULL;
  265. struct aws_channel_slot *current_slot = aws_channel_get_first_slot(channel);
  266. AWS_FATAL_ASSERT(
  267. current_slot &&
  268. "It should be logically impossible to have a channel in this callback that doesn't have a slot in it");
  269. while (current_slot->adj_right) {
  270. current_slot = current_slot->adj_right;
  271. }
  272. our_slot = current_slot;
  273. struct aws_channel_handler *our_handler = our_slot->handler;
  274. return aws_event_stream_channel_handler_get_user_data(our_handler);
  275. }
  276. static void s_on_accept_channel_shutdown(
  277. struct aws_server_bootstrap *bootstrap,
  278. int error_code,
  279. struct aws_channel *channel,
  280. void *user_data) {
  281. (void)bootstrap;
  282. struct aws_event_stream_rpc_server_listener *server = user_data;
  283. struct aws_event_stream_rpc_server_connection *connection = s_rpc_connection_from_channel(channel);
  284. AWS_LOGF_DEBUG(
  285. AWS_LS_EVENT_STREAM_RPC_SERVER,
  286. "id=%p: channel %p and connection %p shutdown occurred with error %s",
  287. (void *)server,
  288. (void *)channel,
  289. (void *)connection,
  290. aws_error_debug_str(error_code));
  291. aws_atomic_store_int(&connection->is_open, 0U);
  292. aws_hash_table_clear(&connection->continuation_table);
  293. aws_event_stream_rpc_server_connection_acquire(connection);
  294. server->on_connection_shutdown(connection, error_code, server->user_data);
  295. aws_event_stream_rpc_server_connection_release(connection);
  296. aws_event_stream_rpc_server_connection_release(connection);
  297. }
  298. static void s_on_server_listener_destroy(struct aws_server_bootstrap *bootstrap, void *user_data) {
  299. (void)bootstrap;
  300. struct aws_event_stream_rpc_server_listener *listener = user_data;
  301. AWS_LOGF_INFO(AWS_LS_EVENT_STREAM_RPC_SERVER, "id=%p: destroying server", (void *)listener);
  302. /* server bootstrap invokes this callback regardless of if the listener was successfully created, so
  303. * just check that we successfully set it up before freeing anything. When that's fixed in aws-c-io, this
  304. * code will still be correct, so just leave it here for now. */
  305. if (listener->initialized) {
  306. if (listener->on_destroy_callback) {
  307. listener->on_destroy_callback(listener, listener->user_data);
  308. }
  309. aws_mem_release(listener->allocator, listener);
  310. }
  311. }
  312. struct aws_event_stream_rpc_server_listener *aws_event_stream_rpc_server_new_listener(
  313. struct aws_allocator *allocator,
  314. struct aws_event_stream_rpc_server_listener_options *options) {
  315. struct aws_event_stream_rpc_server_listener *server =
  316. aws_mem_calloc(allocator, 1, sizeof(struct aws_event_stream_rpc_server_listener));
  317. if (!server) {
  318. AWS_LOGF_ERROR(
  319. AWS_LS_EVENT_STREAM_RPC_SERVER,
  320. "static: failed to allocate new server with error %s",
  321. aws_error_debug_str(aws_last_error()));
  322. return NULL;
  323. }
  324. AWS_LOGF_DEBUG(AWS_LS_EVENT_STREAM_RPC_SERVER, "static: new server is %p", (void *)server);
  325. aws_atomic_init_int(&server->ref_count, 1);
  326. struct aws_server_socket_channel_bootstrap_options bootstrap_options = {
  327. .bootstrap = options->bootstrap,
  328. .socket_options = options->socket_options,
  329. .tls_options = options->tls_options,
  330. .enable_read_back_pressure = false,
  331. .host_name = options->host_name,
  332. .port = options->port,
  333. .incoming_callback = s_on_accept_channel_setup,
  334. .shutdown_callback = s_on_accept_channel_shutdown,
  335. .destroy_callback = s_on_server_listener_destroy,
  336. .user_data = server,
  337. };
  338. server->bootstrap = options->bootstrap;
  339. server->allocator = allocator;
  340. server->on_destroy_callback = options->on_destroy_callback;
  341. server->on_new_connection = options->on_new_connection;
  342. server->on_connection_shutdown = options->on_connection_shutdown;
  343. server->user_data = options->user_data;
  344. server->listener = aws_server_bootstrap_new_socket_listener(&bootstrap_options);
  345. if (!server->listener) {
  346. AWS_LOGF_ERROR(
  347. AWS_LS_EVENT_STREAM_RPC_SERVER,
  348. "static: failed to allocate new socket listener with error %s",
  349. aws_error_debug_str(aws_last_error()));
  350. goto error;
  351. }
  352. server->initialized = true;
  353. return server;
  354. error:
  355. if (server->listener) {
  356. aws_server_bootstrap_destroy_socket_listener(options->bootstrap, server->listener);
  357. }
  358. aws_mem_release(server->allocator, server);
  359. return NULL;
  360. }
  361. void aws_event_stream_rpc_server_listener_acquire(struct aws_event_stream_rpc_server_listener *server) {
  362. size_t current_count = aws_atomic_fetch_add_explicit(&server->ref_count, 1, aws_memory_order_relaxed);
  363. AWS_LOGF_TRACE(
  364. AWS_LS_EVENT_STREAM_RPC_SERVER,
  365. "id=%p: server acquired, new ref count is %zu.",
  366. (void *)server,
  367. current_count + 1);
  368. }
  369. static void s_destroy_server(struct aws_event_stream_rpc_server_listener *server) {
  370. if (server) {
  371. AWS_LOGF_INFO(AWS_LS_EVENT_STREAM_RPC_SERVER, "id=%p: destroying server", (void *)server);
  372. /* the memory for this is cleaned up in the listener shutdown complete callback. */
  373. aws_server_bootstrap_destroy_socket_listener(server->bootstrap, server->listener);
  374. }
  375. }
  376. void aws_event_stream_rpc_server_listener_release(struct aws_event_stream_rpc_server_listener *server) {
  377. if (!server) {
  378. return;
  379. }
  380. size_t ref_count = aws_atomic_fetch_sub_explicit(&server->ref_count, 1, aws_memory_order_seq_cst);
  381. AWS_LOGF_TRACE(
  382. AWS_LS_EVENT_STREAM_RPC_SERVER, "id=%p: server released, new ref count is %zu.", (void *)server, ref_count - 1);
  383. if (ref_count == 1) {
  384. s_destroy_server(server);
  385. }
  386. }
  387. struct event_stream_connection_send_message_args {
  388. struct aws_allocator *allocator;
  389. struct aws_event_stream_message message;
  390. enum aws_event_stream_rpc_message_type message_type;
  391. struct aws_event_stream_rpc_server_connection *connection;
  392. struct aws_event_stream_rpc_server_continuation_token *continuation;
  393. aws_event_stream_rpc_server_message_flush_fn *flush_fn;
  394. void *user_data;
  395. bool end_stream;
  396. bool terminate_connection;
  397. };
  398. static void s_on_protocol_message_written_fn(
  399. struct aws_event_stream_message *message,
  400. int error_code,
  401. void *user_data) {
  402. (void)message;
  403. struct event_stream_connection_send_message_args *message_args = user_data;
  404. AWS_LOGF_TRACE(
  405. AWS_LS_EVENT_STREAM_RPC_SERVER,
  406. "id=%p: message flushed to channel with error %s",
  407. (void *)message_args->connection,
  408. aws_error_debug_str(error_code));
  409. if (message_args->message_type == AWS_EVENT_STREAM_RPC_MESSAGE_TYPE_CONNECT_ACK) {
  410. AWS_LOGF_TRACE(
  411. AWS_LS_EVENT_STREAM_RPC_SERVER,
  412. "id=%p: connect ack message flushed to wire",
  413. (void *)message_args->connection);
  414. }
  415. if (message_args->end_stream) {
  416. AWS_FATAL_ASSERT(message_args->continuation && "end stream flag was set but it wasn't on a continuation");
  417. AWS_LOGF_DEBUG(
  418. AWS_LS_EVENT_STREAM_RPC_SERVER,
  419. "id=%p: end_stream flag for continuation %p was set, closing",
  420. (void *)message_args->connection,
  421. (void *)message_args->continuation);
  422. aws_atomic_store_int(&message_args->continuation->is_closed, 1U);
  423. aws_hash_table_remove(
  424. &message_args->connection->continuation_table, &message_args->continuation->stream_id, NULL, NULL);
  425. }
  426. message_args->flush_fn(error_code, message_args->user_data);
  427. if (message_args->terminate_connection) {
  428. AWS_LOGF_INFO(
  429. AWS_LS_EVENT_STREAM_RPC_SERVER,
  430. "id=%p: terminate connection flag was set. closing",
  431. (void *)message_args->connection);
  432. aws_event_stream_rpc_server_connection_close(message_args->connection, AWS_ERROR_SUCCESS);
  433. }
  434. aws_event_stream_rpc_server_connection_release(message_args->connection);
  435. if (message_args->continuation) {
  436. aws_event_stream_rpc_server_continuation_release(message_args->continuation);
  437. }
  438. aws_event_stream_message_clean_up(&message_args->message);
  439. aws_mem_release(message_args->allocator, message_args);
  440. }
  441. static int s_send_protocol_message(
  442. struct aws_event_stream_rpc_server_connection *connection,
  443. struct aws_event_stream_rpc_server_continuation_token *continuation,
  444. const struct aws_event_stream_rpc_message_args *message_args,
  445. int32_t stream_id,
  446. aws_event_stream_rpc_server_message_flush_fn *flush_fn,
  447. void *user_data) {
  448. size_t connect_handshake_state = aws_atomic_load_int(&connection->handshake_state);
  449. AWS_LOGF_TRACE(
  450. AWS_LS_EVENT_STREAM_RPC_SERVER,
  451. "id=%p: connect handshake state %zu",
  452. (void *)connection,
  453. connect_handshake_state);
  454. /* handshake step 1 is a connect message being received. Handshake 2 is the connect ack being sent.
  455. * no messages other than connect and connect ack are allowed until this count reaches 2. */
  456. if (connect_handshake_state != CONNECTION_HANDSHAKE_STATE_CONNECT_ACK_PROCESSED &&
  457. message_args->message_type < AWS_EVENT_STREAM_RPC_MESSAGE_TYPE_CONNECT_ACK) {
  458. AWS_LOGF_TRACE(
  459. AWS_LS_EVENT_STREAM_RPC_SERVER,
  460. "id=%p: invalid state, a message was received prior to connect handshake completion",
  461. (void *)connection);
  462. return aws_raise_error(AWS_ERROR_EVENT_STREAM_RPC_PROTOCOL_ERROR);
  463. }
  464. struct event_stream_connection_send_message_args *args =
  465. aws_mem_calloc(connection->allocator, 1, sizeof(struct event_stream_connection_send_message_args));
  466. if (!message_args) {
  467. AWS_LOGF_ERROR(
  468. AWS_LS_EVENT_STREAM_RPC_SERVER,
  469. "id=%p: allocation of callback args failed with error %s",
  470. (void *)connection,
  471. aws_error_debug_str(aws_last_error()));
  472. return AWS_OP_ERR;
  473. }
  474. args->allocator = connection->allocator;
  475. args->user_data = user_data;
  476. args->message_type = message_args->message_type;
  477. args->connection = connection;
  478. args->flush_fn = flush_fn;
  479. if (continuation) {
  480. args->continuation = continuation;
  481. aws_event_stream_rpc_server_continuation_acquire(continuation);
  482. if (message_args->message_flags & AWS_EVENT_STREAM_RPC_MESSAGE_FLAG_TERMINATE_STREAM) {
  483. AWS_LOGF_DEBUG(
  484. AWS_LS_EVENT_STREAM_RPC_SERVER,
  485. "id=%p: continuation with terminate stream flag was specified closing",
  486. (void *)continuation);
  487. args->end_stream = true;
  488. }
  489. }
  490. if (message_args->message_type == AWS_EVENT_STREAM_RPC_MESSAGE_TYPE_CONNECT_ACK) {
  491. AWS_LOGF_INFO(
  492. AWS_LS_EVENT_STREAM_RPC_SERVER,
  493. "id=%p: sending connect ack message, the connect handshake is completed",
  494. (void *)connection);
  495. aws_atomic_store_int(&connection->handshake_state, CONNECTION_HANDSHAKE_STATE_CONNECT_ACK_PROCESSED);
  496. if (!(message_args->message_flags & AWS_EVENT_STREAM_RPC_MESSAGE_FLAG_CONNECTION_ACCEPTED)) {
  497. AWS_LOGF_DEBUG(
  498. AWS_LS_EVENT_STREAM_RPC_SERVER,
  499. "id=%p: connection ack was rejected closing connection",
  500. (void *)connection);
  501. args->terminate_connection = true;
  502. }
  503. }
  504. args->flush_fn = flush_fn;
  505. size_t headers_count = message_args->headers_count + 3;
  506. struct aws_array_list headers_list;
  507. AWS_ZERO_STRUCT(headers_list);
  508. if (aws_array_list_init_dynamic(
  509. &headers_list, connection->allocator, headers_count, sizeof(struct aws_event_stream_header_value_pair))) {
  510. AWS_LOGF_ERROR(
  511. AWS_LS_EVENT_STREAM_RPC_SERVER,
  512. "id=%p: allocation of headers failed with error %s",
  513. (void *)connection,
  514. aws_error_debug_str(aws_last_error()));
  515. goto args_allocated_before_failure;
  516. }
  517. /* since we preallocated the space for the headers, these can't fail, but we'll go ahead an assert on them just in
  518. * case */
  519. for (size_t i = 0; i < message_args->headers_count; ++i) {
  520. AWS_FATAL_ASSERT(!aws_array_list_push_back(&headers_list, &message_args->headers[i]));
  521. }
  522. AWS_FATAL_ASSERT(!aws_event_stream_add_int32_header(
  523. &headers_list,
  524. (const char *)aws_event_stream_rpc_message_type_name.ptr,
  525. (uint8_t)aws_event_stream_rpc_message_type_name.len,
  526. message_args->message_type));
  527. AWS_FATAL_ASSERT(!aws_event_stream_add_int32_header(
  528. &headers_list,
  529. (const char *)aws_event_stream_rpc_message_flags_name.ptr,
  530. (uint8_t)aws_event_stream_rpc_message_flags_name.len,
  531. message_args->message_flags));
  532. AWS_FATAL_ASSERT(!aws_event_stream_add_int32_header(
  533. &headers_list,
  534. (const char *)aws_event_stream_rpc_stream_id_name.ptr,
  535. (uint8_t)aws_event_stream_rpc_stream_id_name.len,
  536. stream_id));
  537. int message_init_err_code =
  538. aws_event_stream_message_init(&args->message, connection->allocator, &headers_list, message_args->payload);
  539. aws_array_list_clean_up(&headers_list);
  540. if (message_init_err_code) {
  541. AWS_LOGF_ERROR(
  542. AWS_LS_EVENT_STREAM_RPC_SERVER,
  543. "id=%p: initialization of message failed with error %s",
  544. (void *)connection,
  545. aws_error_debug_str(aws_last_error()));
  546. goto args_allocated_before_failure;
  547. }
  548. aws_event_stream_rpc_server_connection_acquire(connection);
  549. if (aws_event_stream_channel_handler_write_message(
  550. connection->event_stream_handler, &args->message, s_on_protocol_message_written_fn, args)) {
  551. AWS_LOGF_ERROR(
  552. AWS_LS_EVENT_STREAM_RPC_SERVER,
  553. "id=%p: message send failed with error %s",
  554. (void *)connection,
  555. aws_error_debug_str(aws_last_error()));
  556. goto message_initialized_before_failure;
  557. }
  558. return AWS_OP_SUCCESS;
  559. message_initialized_before_failure:
  560. aws_event_stream_message_clean_up(&args->message);
  561. args_allocated_before_failure:
  562. aws_mem_release(args->allocator, args);
  563. aws_event_stream_rpc_server_connection_release(connection);
  564. return AWS_OP_ERR;
  565. }
  566. int aws_event_stream_rpc_server_connection_send_protocol_message(
  567. struct aws_event_stream_rpc_server_connection *connection,
  568. const struct aws_event_stream_rpc_message_args *message_args,
  569. aws_event_stream_rpc_server_message_flush_fn *flush_fn,
  570. void *user_data) {
  571. if (!aws_event_stream_rpc_server_connection_is_open(connection)) {
  572. return aws_raise_error(AWS_ERROR_EVENT_STREAM_RPC_CONNECTION_CLOSED);
  573. }
  574. return s_send_protocol_message(connection, NULL, message_args, 0, flush_fn, user_data);
  575. }
  576. void *aws_event_stream_rpc_server_connection_get_user_data(struct aws_event_stream_rpc_server_connection *connection) {
  577. return connection->user_data;
  578. }
  579. AWS_EVENT_STREAM_API void aws_event_stream_rpc_server_override_last_stream_id(
  580. struct aws_event_stream_rpc_server_connection *connection,
  581. int32_t value) {
  582. connection->latest_stream_id = value;
  583. }
  584. void aws_event_stream_rpc_server_connection_close(
  585. struct aws_event_stream_rpc_server_connection *connection,
  586. int shutdown_error_code) {
  587. if (aws_event_stream_rpc_server_connection_is_open(connection)) {
  588. AWS_LOGF_DEBUG(
  589. AWS_LS_EVENT_STREAM_RPC_SERVER,
  590. "id=%p: closing connection with error %s",
  591. (void *)connection,
  592. aws_error_debug_str(shutdown_error_code));
  593. aws_atomic_store_int(&connection->is_open, 0U);
  594. aws_channel_shutdown(connection->channel, shutdown_error_code);
  595. if (!connection->bootstrap_owned) {
  596. aws_hash_table_clear(&connection->continuation_table);
  597. aws_event_stream_rpc_server_connection_release(connection);
  598. }
  599. }
  600. }
  601. bool aws_event_stream_rpc_server_continuation_is_closed(
  602. struct aws_event_stream_rpc_server_continuation_token *continuation) {
  603. return aws_atomic_load_int(&continuation->is_closed) == 1U;
  604. }
  605. bool aws_event_stream_rpc_server_connection_is_open(struct aws_event_stream_rpc_server_connection *connection) {
  606. return aws_atomic_load_int(&connection->is_open) == 1U;
  607. }
  608. void aws_event_stream_rpc_server_continuation_acquire(
  609. struct aws_event_stream_rpc_server_continuation_token *continuation) {
  610. size_t current_count = aws_atomic_fetch_add_explicit(&continuation->ref_count, 1, aws_memory_order_relaxed);
  611. AWS_LOGF_TRACE(
  612. AWS_LS_EVENT_STREAM_RPC_SERVER,
  613. "id=%p: continuation acquired, new ref count is %zu.",
  614. (void *)continuation,
  615. current_count + 1);
  616. }
  617. void aws_event_stream_rpc_server_continuation_release(
  618. struct aws_event_stream_rpc_server_continuation_token *continuation) {
  619. size_t value = aws_atomic_fetch_sub_explicit(&continuation->ref_count, 1, aws_memory_order_seq_cst);
  620. AWS_LOGF_TRACE(
  621. AWS_LS_EVENT_STREAM_RPC_SERVER,
  622. "id=%p: continuation released, new ref count is %zu.",
  623. (void *)continuation,
  624. value - 1);
  625. if (value == 1) {
  626. AWS_LOGF_DEBUG(AWS_LS_EVENT_STREAM_RPC_SERVER, "id=%p: destroying continuation.", (void *)continuation);
  627. struct aws_allocator *allocator = continuation->connection->allocator;
  628. aws_event_stream_rpc_server_connection_release(continuation->connection);
  629. aws_mem_release(allocator, continuation);
  630. }
  631. }
  632. int aws_event_stream_rpc_server_continuation_send_message(
  633. struct aws_event_stream_rpc_server_continuation_token *continuation,
  634. const struct aws_event_stream_rpc_message_args *message_args,
  635. aws_event_stream_rpc_server_message_flush_fn *flush_fn,
  636. void *user_data) {
  637. AWS_FATAL_PRECONDITION(continuation->continuation_fn);
  638. AWS_FATAL_PRECONDITION(continuation->closed_fn);
  639. if (aws_event_stream_rpc_server_continuation_is_closed(continuation)) {
  640. return aws_raise_error(AWS_ERROR_EVENT_STREAM_RPC_STREAM_CLOSED);
  641. }
  642. return s_send_protocol_message(
  643. continuation->connection, continuation, message_args, continuation->stream_id, flush_fn, user_data);
  644. }
  645. static void s_connection_error_message_flush_fn(int error_code, void *user_data) {
  646. (void)error_code;
  647. struct aws_event_stream_rpc_server_connection *connection = user_data;
  648. aws_event_stream_rpc_server_connection_close(connection, AWS_ERROR_EVENT_STREAM_RPC_PROTOCOL_ERROR);
  649. }
  650. static void s_send_connection_level_error(
  651. struct aws_event_stream_rpc_server_connection *connection,
  652. uint32_t message_type,
  653. uint32_t message_flags,
  654. const struct aws_byte_cursor *message) {
  655. struct aws_byte_buf payload_buf = aws_byte_buf_from_array(message->ptr, message->len);
  656. AWS_LOGF_DEBUG(
  657. AWS_LS_EVENT_STREAM_RPC_SERVER,
  658. "id=%p: sending connection-level error\n" PRInSTR,
  659. (void *)connection,
  660. AWS_BYTE_BUF_PRI(payload_buf));
  661. struct aws_event_stream_header_value_pair content_type_header =
  662. aws_event_stream_create_string_header(s_json_content_type_name, s_json_content_type_value);
  663. struct aws_event_stream_header_value_pair headers[] = {
  664. content_type_header,
  665. };
  666. struct aws_event_stream_rpc_message_args message_args = {
  667. .message_type = message_type,
  668. .message_flags = message_flags,
  669. .payload = &payload_buf,
  670. .headers_count = 1,
  671. .headers = headers,
  672. };
  673. aws_event_stream_rpc_server_connection_send_protocol_message(
  674. connection, &message_args, s_connection_error_message_flush_fn, connection);
  675. }
  676. /* TODO: come back and make this a proper state pattern. For now it's branches all over the place until we nail
  677. * down the spec. */
  678. static void s_route_message_by_type(
  679. struct aws_event_stream_rpc_server_connection *connection,
  680. struct aws_event_stream_message *message,
  681. struct aws_array_list *headers_list,
  682. uint32_t stream_id,
  683. uint32_t message_type,
  684. uint32_t message_flags,
  685. struct aws_byte_cursor operation_name) {
  686. struct aws_byte_buf payload_buf = aws_byte_buf_from_array(
  687. aws_event_stream_message_payload(message), aws_event_stream_message_payload_len(message));
  688. struct aws_event_stream_rpc_message_args message_args = {
  689. .headers = headers_list->data,
  690. .headers_count = aws_array_list_length(headers_list),
  691. .payload = &payload_buf,
  692. .message_flags = message_flags,
  693. .message_type = message_type,
  694. };
  695. size_t handshake_state = aws_atomic_load_int(&connection->handshake_state);
  696. /* make sure if this is not a CONNECT message being received, the handshake has been completed. */
  697. if (handshake_state < CONNECTION_HANDSHAKE_STATE_CONNECT_ACK_PROCESSED &&
  698. message_type != AWS_EVENT_STREAM_RPC_MESSAGE_TYPE_CONNECT) {
  699. AWS_LOGF_ERROR(
  700. AWS_LS_EVENT_STREAM_RPC_SERVER,
  701. "id=%p: a message was received on this connection prior to the "
  702. "connect handshake completing",
  703. (void *)connection);
  704. aws_raise_error(AWS_ERROR_EVENT_STREAM_RPC_PROTOCOL_ERROR);
  705. s_send_connection_level_error(
  706. connection, AWS_EVENT_STREAM_RPC_MESSAGE_TYPE_PROTOCOL_ERROR, 0, &s_connect_not_completed_error);
  707. return;
  708. }
  709. /* stream_id being non zero ALWAYS indicates APPLICATION_DATA or APPLICATION_ERROR. */
  710. if (stream_id > 0) {
  711. AWS_LOGF_TRACE(AWS_LS_EVENT_STREAM_RPC_SERVER, "id=%p: stream id %" PRIu32, (void *)connection, stream_id);
  712. struct aws_event_stream_rpc_server_continuation_token *continuation = NULL;
  713. if (message_type > AWS_EVENT_STREAM_RPC_MESSAGE_TYPE_APPLICATION_ERROR) {
  714. AWS_LOGF_ERROR(
  715. AWS_LS_EVENT_STREAM_RPC_SERVER,
  716. "id=%p: only application messages can be sent on a stream id, "
  717. "but this message is the incorrect type",
  718. (void *)connection);
  719. aws_raise_error(AWS_ERROR_EVENT_STREAM_RPC_PROTOCOL_ERROR);
  720. s_send_connection_level_error(
  721. connection, AWS_EVENT_STREAM_RPC_MESSAGE_TYPE_PROTOCOL_ERROR, 0, &s_invalid_stream_id_error);
  722. return;
  723. }
  724. /* INT32_MAX is the max stream id. */
  725. if (stream_id > INT32_MAX) {
  726. AWS_LOGF_ERROR(
  727. AWS_LS_EVENT_STREAM_RPC_SERVER,
  728. "id=%p: stream_id is larger than the max acceptable value",
  729. (void *)connection);
  730. aws_raise_error(AWS_ERROR_EVENT_STREAM_RPC_PROTOCOL_ERROR);
  731. s_send_connection_level_error(
  732. connection, AWS_EVENT_STREAM_RPC_MESSAGE_TYPE_PROTOCOL_ERROR, 0, &s_invalid_stream_id_error);
  733. return;
  734. }
  735. /* if the stream is is in the past, look it up from the continuation table. If it's not there, that's an error.
  736. * if it is, find it and notify the user a message arrived */
  737. if (stream_id <= connection->latest_stream_id) {
  738. AWS_LOGF_ERROR(
  739. AWS_LS_EVENT_STREAM_RPC_SERVER,
  740. "id=%p: stream_id is an already seen stream_id, looking for existing continuation",
  741. (void *)connection);
  742. struct aws_hash_element *continuation_element = NULL;
  743. if (aws_hash_table_find(&connection->continuation_table, &stream_id, &continuation_element) ||
  744. !continuation_element) {
  745. AWS_LOGF_ERROR(
  746. AWS_LS_EVENT_STREAM_RPC_SERVER,
  747. "id=%p: stream_id does not have a corresponding continuation",
  748. (void *)connection);
  749. aws_raise_error(AWS_ERROR_EVENT_STREAM_RPC_PROTOCOL_ERROR);
  750. s_send_connection_level_error(
  751. connection, AWS_EVENT_STREAM_RPC_MESSAGE_TYPE_PROTOCOL_ERROR, 0, &s_invalid_client_stream_id_error);
  752. return;
  753. }
  754. continuation = continuation_element->value;
  755. AWS_LOGF_TRACE(
  756. AWS_LS_EVENT_STREAM_RPC_SERVER,
  757. "id=%p: stream_id corresponds to continuation %p",
  758. (void *)connection,
  759. (void *)continuation);
  760. aws_event_stream_rpc_server_continuation_acquire(continuation);
  761. continuation->continuation_fn(continuation, &message_args, continuation->user_data);
  762. aws_event_stream_rpc_server_continuation_release(continuation);
  763. /* now these are potentially new streams. Make sure they're in bounds, create a new continuation
  764. * and notify the user the stream has been created, then send them the message. */
  765. } else {
  766. AWS_LOGF_TRACE(
  767. AWS_LS_EVENT_STREAM_RPC_SERVER,
  768. "id=%p: stream_id is unknown, attempting to create a continuation for it",
  769. (void *)connection);
  770. if (stream_id != connection->latest_stream_id + 1) {
  771. AWS_LOGF_ERROR(
  772. AWS_LS_EVENT_STREAM_RPC_SERVER,
  773. "id=%p: stream_id is invalid because it's not sequentially increasing",
  774. (void *)connection);
  775. aws_raise_error(AWS_ERROR_EVENT_STREAM_RPC_PROTOCOL_ERROR);
  776. s_send_connection_level_error(
  777. connection,
  778. AWS_EVENT_STREAM_RPC_MESSAGE_TYPE_PROTOCOL_ERROR,
  779. 0,
  780. &s_invalid_new_client_stream_id_error);
  781. return;
  782. }
  783. /* new streams must always have an operation name. */
  784. if (operation_name.len == 0) {
  785. AWS_LOGF_ERROR(
  786. AWS_LS_EVENT_STREAM_RPC_SERVER,
  787. "id=%p: new stream_id encountered, but an operation name was not received",
  788. (void *)connection);
  789. aws_raise_error(AWS_ERROR_EVENT_STREAM_RPC_PROTOCOL_ERROR);
  790. s_send_connection_level_error(
  791. connection, AWS_EVENT_STREAM_RPC_MESSAGE_TYPE_PROTOCOL_ERROR, 0, &s_missing_operation_name_error);
  792. return;
  793. }
  794. AWS_LOGF_DEBUG(
  795. AWS_LS_EVENT_STREAM_RPC_SERVER,
  796. "id=%p: stream_id is a valid new stream. Creating continuation",
  797. (void *)connection);
  798. continuation =
  799. aws_mem_calloc(connection->allocator, 1, sizeof(struct aws_event_stream_rpc_server_continuation_token));
  800. if (!continuation) {
  801. AWS_LOGF_ERROR(
  802. AWS_LS_EVENT_STREAM_RPC_SERVER,
  803. "id=%p: continuation allocation failed with error %s",
  804. (void *)connection,
  805. aws_error_debug_str(aws_last_error()));
  806. s_send_connection_level_error(
  807. connection, AWS_EVENT_STREAM_RPC_MESSAGE_TYPE_INTERNAL_ERROR, 0, &s_internal_error);
  808. return;
  809. }
  810. AWS_LOGF_DEBUG(
  811. AWS_LS_EVENT_STREAM_RPC_SERVER,
  812. "id=%p: new continuation is %p",
  813. (void *)connection,
  814. (void *)continuation);
  815. continuation->stream_id = stream_id;
  816. continuation->connection = connection;
  817. aws_event_stream_rpc_server_connection_acquire(continuation->connection);
  818. aws_atomic_init_int(&continuation->ref_count, 1);
  819. if (aws_hash_table_put(&connection->continuation_table, &continuation->stream_id, continuation, NULL)) {
  820. AWS_LOGF_ERROR(
  821. AWS_LS_EVENT_STREAM_RPC_SERVER,
  822. "id=%p: continuation table update failed with error %s",
  823. (void *)connection,
  824. aws_error_debug_str(aws_last_error()));
  825. /* continuation release will drop the connection reference as well */
  826. aws_event_stream_rpc_server_continuation_release(continuation);
  827. s_send_connection_level_error(
  828. connection, AWS_EVENT_STREAM_RPC_MESSAGE_TYPE_INTERNAL_ERROR, 0, &s_internal_error);
  829. return;
  830. }
  831. struct aws_event_stream_rpc_server_stream_continuation_options options;
  832. AWS_ZERO_STRUCT(options);
  833. aws_event_stream_rpc_server_continuation_acquire(continuation);
  834. AWS_LOGF_TRACE(
  835. AWS_LS_EVENT_STREAM_RPC_SERVER, "id=%p: invoking on_incoming_stream callback", (void *)connection);
  836. if (connection->on_incoming_stream(
  837. continuation->connection, continuation, operation_name, &options, connection->user_data)) {
  838. aws_event_stream_rpc_server_continuation_release(continuation);
  839. s_send_connection_level_error(
  840. connection, AWS_EVENT_STREAM_RPC_MESSAGE_TYPE_INTERNAL_ERROR, 0, &s_internal_error);
  841. return;
  842. }
  843. AWS_FATAL_ASSERT(options.on_continuation);
  844. AWS_FATAL_ASSERT(options.on_continuation_closed);
  845. continuation->continuation_fn = options.on_continuation;
  846. continuation->closed_fn = options.on_continuation_closed;
  847. continuation->user_data = options.user_data;
  848. connection->latest_stream_id = stream_id;
  849. continuation->continuation_fn(continuation, &message_args, continuation->user_data);
  850. aws_event_stream_rpc_server_continuation_release(continuation);
  851. }
  852. /* if it was a terminal stream message purge it from the hash table. The delete will decref the continuation. */
  853. if (message_flags & AWS_EVENT_STREAM_RPC_MESSAGE_FLAG_TERMINATE_STREAM) {
  854. AWS_LOGF_DEBUG(
  855. AWS_LS_EVENT_STREAM_RPC_SERVER,
  856. "id=%p: the terminate_stream flag was received for continuation %p, closing",
  857. (void *)connection,
  858. (void *)continuation);
  859. aws_atomic_store_int(&continuation->is_closed, 1U);
  860. aws_hash_table_remove(&connection->continuation_table, &stream_id, NULL, NULL);
  861. }
  862. } else {
  863. if (message_type <= AWS_EVENT_STREAM_RPC_MESSAGE_TYPE_APPLICATION_ERROR ||
  864. message_type >= AWS_EVENT_STREAM_RPC_MESSAGE_TYPE_COUNT) {
  865. AWS_LOGF_ERROR(
  866. AWS_LS_EVENT_STREAM_RPC_SERVER,
  867. "id=%p: a zero stream id was received with an invalid message-type %" PRIu32,
  868. (void *)connection,
  869. message_type);
  870. s_send_connection_level_error(
  871. connection, AWS_EVENT_STREAM_RPC_MESSAGE_TYPE_PROTOCOL_ERROR, 0, &s_invalid_message_type_error);
  872. return;
  873. }
  874. if (message_type == AWS_EVENT_STREAM_RPC_MESSAGE_TYPE_CONNECT) {
  875. if (handshake_state) {
  876. AWS_LOGF_ERROR(
  877. AWS_LS_EVENT_STREAM_RPC_SERVER,
  878. "id=%p: connect received but the handshake is already completed. Only one is allowed.",
  879. (void *)connection);
  880. /* only one connect is allowed. This would be a duplicate. */
  881. s_send_connection_level_error(
  882. connection, AWS_EVENT_STREAM_RPC_MESSAGE_TYPE_PROTOCOL_ERROR, 0, &s_connect_not_completed_error);
  883. return;
  884. }
  885. aws_atomic_store_int(&connection->handshake_state, CONNECTION_HANDSHAKE_STATE_CONNECT_PROCESSED);
  886. AWS_LOGF_INFO(
  887. AWS_LS_EVENT_STREAM_RPC_SERVER,
  888. "id=%p: connect received, connection handshake completion pending the server sending an ack.",
  889. (void *)connection);
  890. }
  891. connection->on_connection_protocol_message(connection, &message_args, connection->user_data);
  892. }
  893. }
  894. /* invoked by the event stream channel handler when a complete message has been read from the channel. */
  895. static void s_on_message_received(struct aws_event_stream_message *message, int error_code, void *user_data) {
  896. if (!error_code) {
  897. struct aws_event_stream_rpc_server_connection *connection = user_data;
  898. AWS_LOGF_TRACE(
  899. AWS_LS_EVENT_STREAM_RPC_SERVER,
  900. "id=%p: message received on connection of length %" PRIu32,
  901. (void *)connection,
  902. aws_event_stream_message_total_length(message));
  903. struct aws_array_list headers;
  904. if (aws_array_list_init_dynamic(
  905. &headers, connection->allocator, 8, sizeof(struct aws_event_stream_header_value_pair))) {
  906. AWS_LOGF_ERROR(
  907. AWS_LS_EVENT_STREAM_RPC_SERVER,
  908. "id=%p: error initializing headers %s",
  909. (void *)connection,
  910. aws_error_debug_str(aws_last_error()));
  911. s_send_connection_level_error(
  912. connection, AWS_EVENT_STREAM_RPC_MESSAGE_TYPE_INTERNAL_ERROR, 0, &s_internal_error);
  913. return;
  914. }
  915. if (aws_event_stream_message_headers(message, &headers)) {
  916. AWS_LOGF_ERROR(
  917. AWS_LS_EVENT_STREAM_RPC_SERVER,
  918. "id=%p: error fetching headers %s",
  919. (void *)connection,
  920. aws_error_debug_str(aws_last_error()));
  921. s_send_connection_level_error(
  922. connection, AWS_EVENT_STREAM_RPC_MESSAGE_TYPE_INTERNAL_ERROR, 0, &s_internal_error);
  923. goto clean_up;
  924. }
  925. int32_t stream_id = -1;
  926. int32_t message_type = -1;
  927. int32_t message_flags = -1;
  928. struct aws_byte_buf operation_name_buf;
  929. AWS_ZERO_STRUCT(operation_name_buf);
  930. if (aws_event_stream_rpc_extract_message_metadata(
  931. &headers, &stream_id, &message_type, &message_flags, &operation_name_buf)) {
  932. AWS_LOGF_ERROR(
  933. AWS_LS_EVENT_STREAM_RPC_SERVER,
  934. "id=%p: invalid protocol message with error %s",
  935. (void *)connection,
  936. aws_error_debug_str(aws_last_error()));
  937. s_send_connection_level_error(
  938. connection, AWS_EVENT_STREAM_RPC_MESSAGE_TYPE_PROTOCOL_ERROR, 0, &s_invalid_message_error);
  939. goto clean_up;
  940. }
  941. AWS_LOGF_TRACE(AWS_LS_EVENT_STREAM_RPC_SERVER, "id=%p: routing message", (void *)connection);
  942. s_route_message_by_type(
  943. connection,
  944. message,
  945. &headers,
  946. stream_id,
  947. message_type,
  948. message_flags,
  949. aws_byte_cursor_from_buf(&operation_name_buf));
  950. clean_up:
  951. aws_event_stream_headers_list_cleanup(&headers);
  952. }
  953. }