event_stream_rpc_client.c 42 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737738739740741742743744745746747748749750751752753754755756757758759760761762763764765766767768769770771772773774775776777778779780781782783784785786787788789790791792793794795796797798799800801802803804805806807808809810811812813814815816817818819820821822823824825826827828829830831832833834835836837838839840841842843844845846847848849850851852853854855856857858859860861862863864865866867868869870871872873874875876877878879880881882883884885886887888889890891892893894895896897898899900901902903904905906907908909910911912913914915916917918919920921922923924925926927928929930931932933934935936937938939940941942943944945946947948949950951952953954955956957958959960961962963964965966967968969970971972973974975976977978979980981982983984985986987988989990991992993994995996997998999100010011002100310041005100610071008100910101011101210131014101510161017101810191020102110221023102410251026102710281029103010311032103310341035103610371038103910401041104210431044104510461047104810491050105110521053105410551056105710581059106010611062106310641065106610671068
  1. /**
  2. * Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved.
  3. * SPDX-License-Identifier: Apache-2.0.
  4. */
  5. #include <aws/event-stream/event_stream_channel_handler.h>
  6. #include <aws/event-stream/event_stream_rpc_client.h>
  7. #include <aws/event-stream/private/event_stream_rpc_priv.h>
  8. #include <aws/common/atomics.h>
  9. #include <aws/common/hash_table.h>
  10. #include <aws/common/mutex.h>
  11. #include <aws/io/channel_bootstrap.h>
  12. #include <inttypes.h>
  13. #ifdef _MSC_VER
  14. /* allow declared initializer using address of automatic variable */
  15. # pragma warning(disable : 4221)
  16. /* allow non-constant aggregate initializers */
  17. # pragma warning(disable : 4204)
  18. #endif
  19. static void s_clear_continuation_table(struct aws_event_stream_rpc_client_connection *connection);
  20. struct aws_event_stream_rpc_client_connection {
  21. struct aws_allocator *allocator;
  22. struct aws_hash_table continuation_table;
  23. struct aws_client_bootstrap *bootstrap_ref;
  24. struct aws_atomic_var ref_count;
  25. struct aws_channel *channel;
  26. struct aws_channel_handler *event_stream_handler;
  27. uint32_t latest_stream_id;
  28. struct aws_mutex stream_lock;
  29. struct aws_atomic_var is_open;
  30. struct aws_atomic_var handshake_state;
  31. size_t initial_window_size;
  32. aws_event_stream_rpc_client_on_connection_setup_fn *on_connection_setup;
  33. aws_event_stream_rpc_client_connection_protocol_message_fn *on_connection_protocol_message;
  34. aws_event_stream_rpc_client_on_connection_shutdown_fn *on_connection_shutdown;
  35. void *user_data;
  36. bool bootstrap_owned;
  37. bool enable_read_back_pressure;
  38. };
  39. struct aws_event_stream_rpc_client_continuation_token {
  40. uint32_t stream_id;
  41. struct aws_event_stream_rpc_client_connection *connection;
  42. aws_event_stream_rpc_client_stream_continuation_fn *continuation_fn;
  43. aws_event_stream_rpc_client_stream_continuation_closed_fn *closed_fn;
  44. void *user_data;
  45. struct aws_atomic_var ref_count;
  46. struct aws_atomic_var is_closed;
  47. };
  48. static void s_on_message_received(struct aws_event_stream_message *message, int error_code, void *user_data);
  49. static int s_create_connection_on_channel(
  50. struct aws_event_stream_rpc_client_connection *connection,
  51. struct aws_channel *channel) {
  52. struct aws_channel_handler *event_stream_handler = NULL;
  53. struct aws_channel_slot *slot = NULL;
  54. struct aws_event_stream_channel_handler_options handler_options = {
  55. .on_message_received = s_on_message_received,
  56. .user_data = connection,
  57. .initial_window_size = connection->initial_window_size,
  58. .manual_window_management = connection->enable_read_back_pressure,
  59. };
  60. AWS_LOGF_TRACE(
  61. AWS_LS_EVENT_STREAM_RPC_CLIENT,
  62. "id=%p: creating an event-stream handler on channel %p",
  63. (void *)connection,
  64. (void *)channel);
  65. event_stream_handler = aws_event_stream_channel_handler_new(connection->allocator, &handler_options);
  66. if (!event_stream_handler) {
  67. AWS_LOGF_ERROR(
  68. AWS_LS_EVENT_STREAM_RPC_CLIENT,
  69. "id=%p: creating an event-stream handler failed with error %s",
  70. (void *)connection,
  71. aws_error_debug_str(aws_last_error()));
  72. goto error;
  73. }
  74. slot = aws_channel_slot_new(channel);
  75. if (!slot) {
  76. AWS_LOGF_ERROR(
  77. AWS_LS_EVENT_STREAM_RPC_CLIENT,
  78. "id=%p: creating channel slot failed with error %s",
  79. (void *)connection,
  80. aws_error_debug_str(aws_last_error()));
  81. goto error;
  82. }
  83. aws_channel_slot_insert_end(channel, slot);
  84. if (aws_channel_slot_set_handler(slot, event_stream_handler)) {
  85. AWS_LOGF_ERROR(
  86. AWS_LS_EVENT_STREAM_RPC_CLIENT,
  87. "id=%p: setting handler on channel slot failed with error %s",
  88. (void *)connection,
  89. aws_error_debug_str(aws_last_error()));
  90. goto error;
  91. }
  92. connection->event_stream_handler = event_stream_handler;
  93. connection->channel = channel;
  94. aws_channel_acquire_hold(channel);
  95. return AWS_OP_SUCCESS;
  96. error:
  97. if (!slot && event_stream_handler) {
  98. aws_channel_handler_destroy(event_stream_handler);
  99. }
  100. return AWS_OP_ERR;
  101. }
  102. static void s_on_channel_setup_fn(
  103. struct aws_client_bootstrap *bootstrap,
  104. int error_code,
  105. struct aws_channel *channel,
  106. void *user_data) {
  107. (void)bootstrap;
  108. struct aws_event_stream_rpc_client_connection *connection = user_data;
  109. AWS_LOGF_DEBUG(
  110. AWS_LS_EVENT_STREAM_RPC_CLIENT,
  111. "id=%p: on_channel_setup_fn invoked with error_code %d with channel %p",
  112. (void *)connection,
  113. error_code,
  114. (void *)channel);
  115. if (!error_code) {
  116. connection->bootstrap_owned = true;
  117. if (s_create_connection_on_channel(connection, channel)) {
  118. int last_error = aws_last_error();
  119. connection->on_connection_setup(NULL, last_error, connection->user_data);
  120. aws_channel_shutdown(channel, last_error);
  121. return;
  122. }
  123. AWS_LOGF_DEBUG(
  124. AWS_LS_EVENT_STREAM_RPC_CLIENT,
  125. "id=%p: successful event-stream channel setup %p",
  126. (void *)connection,
  127. (void *)channel);
  128. aws_event_stream_rpc_client_connection_acquire(connection);
  129. connection->on_connection_setup(connection, AWS_OP_SUCCESS, connection->user_data);
  130. aws_event_stream_rpc_client_connection_release(connection);
  131. } else {
  132. connection->on_connection_setup(NULL, error_code, connection->user_data);
  133. aws_event_stream_rpc_client_connection_release(connection);
  134. }
  135. }
  136. static void s_on_channel_shutdown_fn(
  137. struct aws_client_bootstrap *bootstrap,
  138. int error_code,
  139. struct aws_channel *channel,
  140. void *user_data) {
  141. (void)bootstrap;
  142. struct aws_event_stream_rpc_client_connection *connection = user_data;
  143. AWS_LOGF_DEBUG(
  144. AWS_LS_EVENT_STREAM_RPC_CLIENT,
  145. "id=%p: on_channel_shutdown_fn invoked with error_code %d with channel %p",
  146. (void *)connection,
  147. error_code,
  148. (void *)channel);
  149. aws_atomic_store_int(&connection->is_open, 0u);
  150. if (connection->bootstrap_owned) {
  151. s_clear_continuation_table(connection);
  152. aws_event_stream_rpc_client_connection_acquire(connection);
  153. connection->on_connection_shutdown(connection, error_code, connection->user_data);
  154. aws_event_stream_rpc_client_connection_release(connection);
  155. }
  156. aws_channel_release_hold(channel);
  157. aws_event_stream_rpc_client_connection_release(connection);
  158. }
  159. /* Set each continuation's is_closed=true.
  160. * A lock MUST be held while calling this.
  161. * For use with aws_hash_table_foreach(). */
  162. static int s_mark_each_continuation_closed(void *context, struct aws_hash_element *p_element) {
  163. (void)context;
  164. struct aws_event_stream_rpc_client_continuation_token *continuation = p_element->value;
  165. aws_atomic_store_int(&continuation->is_closed, 1U);
  166. return AWS_COMMON_HASH_TABLE_ITER_CONTINUE;
  167. }
  168. /* Invoke continuation's on_closed() callback.
  169. * A lock must NOT be hold while calling this */
  170. static void s_complete_continuation(struct aws_event_stream_rpc_client_continuation_token *token) {
  171. AWS_LOGF_DEBUG(
  172. AWS_LS_EVENT_STREAM_RPC_CLIENT,
  173. "token=%p: token with stream-id %" PRIu32 ", purged from the stream table",
  174. (void *)token,
  175. token->stream_id);
  176. if (token->stream_id) {
  177. token->closed_fn(token, token->user_data);
  178. }
  179. aws_event_stream_rpc_client_continuation_release(token);
  180. }
  181. static int s_complete_and_clear_each_continuation(void *context, struct aws_hash_element *p_element) {
  182. (void)context;
  183. struct aws_event_stream_rpc_client_continuation_token *continuation = p_element->value;
  184. s_complete_continuation(continuation);
  185. return AWS_COMMON_HASH_TABLE_ITER_DELETE | AWS_COMMON_HASH_TABLE_ITER_CONTINUE;
  186. }
  187. /* Remove each continuation from hash-table and invoke its on_closed() callback.
  188. * The connection->is_open must be set false before calling this. */
  189. static void s_clear_continuation_table(struct aws_event_stream_rpc_client_connection *connection) {
  190. AWS_ASSERT(!aws_event_stream_rpc_client_connection_is_open(connection));
  191. /* Use lock to ensure synchronization with code that adds entries to table.
  192. * Since connection was just marked closed, no further entries will be
  193. * added to table once we acquire the lock. */
  194. aws_mutex_lock(&connection->stream_lock);
  195. aws_hash_table_foreach(&connection->continuation_table, s_mark_each_continuation_closed, NULL);
  196. aws_mutex_unlock(&connection->stream_lock);
  197. /* Now release lock before invoking callbacks.
  198. * It's safe to alter the table now without a lock, since no further
  199. * entries can be added, and we've gone through the critical section
  200. * above to ensure synchronization */
  201. aws_hash_table_foreach(&connection->continuation_table, s_complete_and_clear_each_continuation, NULL);
  202. }
  203. int aws_event_stream_rpc_client_connection_connect(
  204. struct aws_allocator *allocator,
  205. const struct aws_event_stream_rpc_client_connection_options *conn_options) {
  206. AWS_PRECONDITION(allocator);
  207. AWS_PRECONDITION(conn_options);
  208. AWS_PRECONDITION(conn_options->on_connection_protocol_message);
  209. AWS_PRECONDITION(conn_options->on_connection_setup);
  210. AWS_PRECONDITION(conn_options->on_connection_shutdown);
  211. struct aws_event_stream_rpc_client_connection *connection =
  212. aws_mem_calloc(allocator, 1, sizeof(struct aws_event_stream_rpc_client_connection));
  213. AWS_LOGF_TRACE(AWS_LS_EVENT_STREAM_RPC_CLIENT, "id=%p: creating new connection", (void *)connection);
  214. if (!connection) {
  215. return AWS_OP_ERR;
  216. }
  217. connection->allocator = allocator;
  218. aws_atomic_init_int(&connection->ref_count, 1);
  219. connection->bootstrap_ref = conn_options->bootstrap;
  220. /* this is released in the connection release which gets called regardless of if this function is successful or
  221. * not*/
  222. aws_client_bootstrap_acquire(connection->bootstrap_ref);
  223. aws_atomic_init_int(&connection->handshake_state, CONNECTION_HANDSHAKE_STATE_INITIALIZED);
  224. aws_atomic_init_int(&connection->is_open, 1);
  225. aws_mutex_init(&connection->stream_lock);
  226. connection->on_connection_shutdown = conn_options->on_connection_shutdown;
  227. connection->on_connection_protocol_message = conn_options->on_connection_protocol_message;
  228. connection->on_connection_setup = conn_options->on_connection_setup;
  229. connection->user_data = conn_options->user_data;
  230. if (aws_hash_table_init(
  231. &connection->continuation_table,
  232. allocator,
  233. 64,
  234. aws_event_stream_rpc_hash_streamid,
  235. aws_event_stream_rpc_streamid_eq,
  236. NULL,
  237. NULL)) {
  238. AWS_LOGF_ERROR(
  239. AWS_LS_EVENT_STREAM_RPC_CLIENT,
  240. "id=%p: failed initializing continuation table with error %s.",
  241. (void *)connection,
  242. aws_error_debug_str(aws_last_error()));
  243. goto error;
  244. }
  245. struct aws_socket_channel_bootstrap_options bootstrap_options = {
  246. .bootstrap = connection->bootstrap_ref,
  247. .tls_options = conn_options->tls_options,
  248. .socket_options = conn_options->socket_options,
  249. .user_data = connection,
  250. .host_name = conn_options->host_name,
  251. .port = conn_options->port,
  252. .enable_read_back_pressure = false,
  253. .setup_callback = s_on_channel_setup_fn,
  254. .shutdown_callback = s_on_channel_shutdown_fn,
  255. };
  256. if (aws_client_bootstrap_new_socket_channel(&bootstrap_options)) {
  257. AWS_LOGF_ERROR(
  258. AWS_LS_EVENT_STREAM_RPC_CLIENT,
  259. "id=%p: failed creating new socket channel with error %s.",
  260. (void *)connection,
  261. aws_error_debug_str(aws_last_error()));
  262. goto error;
  263. }
  264. return AWS_OP_SUCCESS;
  265. error:
  266. aws_event_stream_rpc_client_connection_release(connection);
  267. return AWS_OP_ERR;
  268. }
  269. void aws_event_stream_rpc_client_connection_acquire(const struct aws_event_stream_rpc_client_connection *connection) {
  270. AWS_PRECONDITION(connection);
  271. size_t current_count = aws_atomic_fetch_add_explicit(
  272. &((struct aws_event_stream_rpc_client_connection *)connection)->ref_count, 1, aws_memory_order_relaxed);
  273. AWS_LOGF_TRACE(
  274. AWS_LS_EVENT_STREAM_RPC_CLIENT,
  275. "id=%p: connection acquired, new ref count is %zu.",
  276. (void *)connection,
  277. current_count + 1);
  278. }
  279. static void s_destroy_connection(struct aws_event_stream_rpc_client_connection *connection) {
  280. AWS_LOGF_DEBUG(AWS_LS_EVENT_STREAM_RPC_CLIENT, "id=%p: destroying connection.", (void *)connection);
  281. aws_hash_table_clean_up(&connection->continuation_table);
  282. aws_client_bootstrap_release(connection->bootstrap_ref);
  283. aws_mem_release(connection->allocator, connection);
  284. }
  285. void aws_event_stream_rpc_client_connection_release(const struct aws_event_stream_rpc_client_connection *connection) {
  286. if (!connection) {
  287. return;
  288. }
  289. struct aws_event_stream_rpc_client_connection *connection_mut =
  290. (struct aws_event_stream_rpc_client_connection *)connection;
  291. size_t ref_count = aws_atomic_fetch_sub_explicit(&connection_mut->ref_count, 1, aws_memory_order_seq_cst);
  292. AWS_LOGF_TRACE(
  293. AWS_LS_EVENT_STREAM_RPC_CLIENT,
  294. "id=%p: connection released, new ref count is %zu.",
  295. (void *)connection,
  296. ref_count - 1);
  297. AWS_FATAL_ASSERT(ref_count != 0 && "Connection ref count has gone negative");
  298. if (ref_count == 1) {
  299. s_destroy_connection(connection_mut);
  300. }
  301. }
  302. void aws_event_stream_rpc_client_connection_close(
  303. struct aws_event_stream_rpc_client_connection *connection,
  304. int shutdown_error_code) {
  305. AWS_LOGF_TRACE(
  306. AWS_LS_EVENT_STREAM_RPC_CLIENT,
  307. "id=%p: connection close invoked with reason %s.",
  308. (void *)connection,
  309. aws_error_debug_str(shutdown_error_code));
  310. size_t expect_open = 1U;
  311. if (aws_atomic_compare_exchange_int(&connection->is_open, &expect_open, 0U)) {
  312. aws_channel_shutdown(connection->channel, shutdown_error_code);
  313. if (!connection->bootstrap_owned) {
  314. s_clear_continuation_table(connection);
  315. aws_event_stream_rpc_client_connection_release(connection);
  316. }
  317. } else {
  318. AWS_LOGF_TRACE(AWS_LS_EVENT_STREAM_RPC_CLIENT, "id=%p: connection already closed.", (void *)connection);
  319. }
  320. }
  321. bool aws_event_stream_rpc_client_connection_is_open(const struct aws_event_stream_rpc_client_connection *connection) {
  322. return aws_atomic_load_int(&connection->is_open) == 1U;
  323. }
  324. struct event_stream_connection_send_message_args {
  325. struct aws_allocator *allocator;
  326. struct aws_event_stream_message message;
  327. enum aws_event_stream_rpc_message_type message_type;
  328. struct aws_event_stream_rpc_client_connection *connection;
  329. struct aws_event_stream_rpc_client_continuation_token *continuation;
  330. aws_event_stream_rpc_client_message_flush_fn *flush_fn;
  331. void *user_data;
  332. bool end_stream;
  333. bool terminate_connection;
  334. };
  335. static void s_on_protocol_message_written_fn(
  336. struct aws_event_stream_message *message,
  337. int error_code,
  338. void *user_data) {
  339. (void)message;
  340. struct event_stream_connection_send_message_args *message_args = user_data;
  341. AWS_LOGF_TRACE(
  342. AWS_LS_EVENT_STREAM_RPC_CLIENT,
  343. "id=%p: message %p flushed to channel.",
  344. (void *)message_args->connection,
  345. (void *)message);
  346. if (message_args->message_type == AWS_EVENT_STREAM_RPC_MESSAGE_TYPE_CONNECT) {
  347. AWS_LOGF_TRACE(
  348. AWS_LS_EVENT_STREAM_RPC_CLIENT,
  349. "id=%p: connect message flushed to the wire.",
  350. (void *)message_args->connection);
  351. }
  352. if (message_args->end_stream) {
  353. AWS_LOGF_DEBUG(
  354. AWS_LS_EVENT_STREAM_RPC_CLIENT,
  355. "id=%p: the end stream flag was set, closing continuation %p.",
  356. (void *)message_args->connection,
  357. (void *)message_args->continuation);
  358. AWS_FATAL_ASSERT(message_args->continuation && "end stream flag was set but it wasn't on a continuation");
  359. aws_atomic_store_int(&message_args->continuation->is_closed, 1U);
  360. aws_mutex_lock(&message_args->connection->stream_lock);
  361. aws_hash_table_remove(
  362. &message_args->connection->continuation_table, &message_args->continuation->stream_id, NULL, NULL);
  363. aws_mutex_unlock(&message_args->connection->stream_lock);
  364. /* Lock must NOT be held while invoking callback */
  365. s_complete_continuation(message_args->continuation);
  366. }
  367. message_args->flush_fn(error_code, message_args->user_data);
  368. if (message_args->terminate_connection) {
  369. AWS_LOGF_DEBUG(
  370. AWS_LS_EVENT_STREAM_RPC_CLIENT,
  371. "id=%p: terminate_connection flag was specified. Shutting down the connection.",
  372. (void *)message_args->connection);
  373. aws_event_stream_rpc_client_connection_close(message_args->connection, AWS_ERROR_SUCCESS);
  374. }
  375. aws_event_stream_rpc_client_connection_release(message_args->connection);
  376. if (message_args->continuation) {
  377. aws_event_stream_rpc_client_continuation_release(message_args->continuation);
  378. }
  379. aws_event_stream_message_clean_up(&message_args->message);
  380. aws_mem_release(message_args->allocator, message_args);
  381. }
  382. static int s_send_protocol_message(
  383. struct aws_event_stream_rpc_client_connection *connection,
  384. struct aws_event_stream_rpc_client_continuation_token *continuation,
  385. struct aws_byte_cursor *operation_name,
  386. const struct aws_event_stream_rpc_message_args *message_args,
  387. int32_t stream_id,
  388. aws_event_stream_rpc_client_message_flush_fn *flush_fn,
  389. void *user_data) {
  390. AWS_LOGF_TRACE(
  391. AWS_LS_EVENT_STREAM_RPC_CLIENT,
  392. "id=%p: sending message. continuation: %p, stream id %" PRId32,
  393. (void *)connection,
  394. (void *)continuation,
  395. stream_id);
  396. size_t connect_handshake_state = aws_atomic_load_int(&connection->handshake_state);
  397. AWS_LOGF_TRACE(
  398. AWS_LS_EVENT_STREAM_RPC_CLIENT,
  399. "id=%p: handshake completion value %zu",
  400. (void *)connection,
  401. connect_handshake_state);
  402. /* handshake step 1 is a connect message being received. Handshake 2 is the connect ack being sent.
  403. * no messages other than connect and connect ack are allowed until this count reaches 2. */
  404. if (connect_handshake_state != CONNECTION_HANDSHAKE_STATE_CONNECT_ACK_PROCESSED &&
  405. message_args->message_type < AWS_EVENT_STREAM_RPC_MESSAGE_TYPE_CONNECT) {
  406. AWS_LOGF_ERROR(
  407. AWS_LS_EVENT_STREAM_RPC_CLIENT,
  408. "id=%p: handshake not completed, only a connect message can be sent.",
  409. (void *)connection);
  410. return aws_raise_error(AWS_ERROR_EVENT_STREAM_RPC_PROTOCOL_ERROR);
  411. }
  412. struct event_stream_connection_send_message_args *args =
  413. aws_mem_calloc(connection->allocator, 1, sizeof(struct event_stream_connection_send_message_args));
  414. if (!message_args) {
  415. AWS_LOGF_ERROR(
  416. AWS_LS_EVENT_STREAM_RPC_CLIENT,
  417. "id=%p: failed to allocate callback arguments %s.",
  418. (void *)connection,
  419. aws_error_debug_str(aws_last_error()));
  420. return AWS_OP_ERR;
  421. }
  422. args->allocator = connection->allocator;
  423. args->user_data = user_data;
  424. args->message_type = message_args->message_type;
  425. args->connection = connection;
  426. args->flush_fn = flush_fn;
  427. if (continuation) {
  428. AWS_LOGF_TRACE(
  429. AWS_LS_EVENT_STREAM_RPC_CLIENT,
  430. "id=%p: sending message on continuation %p",
  431. (void *)connection,
  432. (void *)continuation);
  433. args->continuation = continuation;
  434. aws_event_stream_rpc_client_continuation_acquire(continuation);
  435. if (message_args->message_flags & AWS_EVENT_STREAM_RPC_MESSAGE_FLAG_TERMINATE_STREAM) {
  436. AWS_LOGF_DEBUG(
  437. AWS_LS_EVENT_STREAM_RPC_CLIENT,
  438. "id=%p:end stream flag was specified on continuation %p",
  439. (void *)connection,
  440. (void *)continuation);
  441. args->end_stream = true;
  442. }
  443. }
  444. if (message_args->message_type == AWS_EVENT_STREAM_RPC_MESSAGE_TYPE_CONNECT_ACK &&
  445. !(message_args->message_flags & AWS_EVENT_STREAM_RPC_MESSAGE_FLAG_CONNECTION_ACCEPTED)) {
  446. AWS_LOGF_DEBUG(AWS_LS_EVENT_STREAM_RPC_CLIENT, "id=%p: terminating connection", (void *)connection);
  447. args->terminate_connection = true;
  448. }
  449. if (message_args->message_type == AWS_EVENT_STREAM_RPC_MESSAGE_TYPE_CONNECT) {
  450. AWS_LOGF_DEBUG(
  451. AWS_LS_EVENT_STREAM_RPC_CLIENT,
  452. "id=%p: sending connect message, waiting on connect ack",
  453. (void *)connection);
  454. aws_atomic_store_int(&connection->handshake_state, CONNECTION_HANDSHAKE_STATE_CONNECT_PROCESSED);
  455. }
  456. args->flush_fn = flush_fn;
  457. size_t headers_count = 0;
  458. if (operation_name) {
  459. if (aws_add_size_checked(message_args->headers_count, 4, &headers_count)) {
  460. return AWS_OP_ERR;
  461. }
  462. } else {
  463. if (aws_add_size_checked(message_args->headers_count, 3, &headers_count)) {
  464. return AWS_OP_ERR;
  465. }
  466. }
  467. struct aws_array_list headers_list;
  468. AWS_ZERO_STRUCT(headers_list);
  469. if (aws_array_list_init_dynamic(
  470. &headers_list, connection->allocator, headers_count, sizeof(struct aws_event_stream_header_value_pair))) {
  471. AWS_LOGF_ERROR(
  472. AWS_LS_EVENT_STREAM_RPC_CLIENT,
  473. "id=%p: an error occurred while initializing the headers list %s",
  474. (void *)connection,
  475. aws_error_debug_str(aws_last_error()));
  476. goto args_allocated_before_failure;
  477. }
  478. /* since we preallocated the space for the headers, these can't fail, but we'll go ahead an assert on them just in
  479. * case */
  480. for (size_t i = 0; i < message_args->headers_count; ++i) {
  481. AWS_FATAL_ASSERT(!aws_array_list_push_back(&headers_list, &message_args->headers[i]));
  482. }
  483. AWS_FATAL_ASSERT(!aws_event_stream_add_int32_header(
  484. &headers_list,
  485. (const char *)aws_event_stream_rpc_message_type_name.ptr,
  486. (uint8_t)aws_event_stream_rpc_message_type_name.len,
  487. message_args->message_type));
  488. AWS_FATAL_ASSERT(!aws_event_stream_add_int32_header(
  489. &headers_list,
  490. (const char *)aws_event_stream_rpc_message_flags_name.ptr,
  491. (uint8_t)aws_event_stream_rpc_message_flags_name.len,
  492. message_args->message_flags));
  493. AWS_FATAL_ASSERT(!aws_event_stream_add_int32_header(
  494. &headers_list,
  495. (const char *)aws_event_stream_rpc_stream_id_name.ptr,
  496. (uint8_t)aws_event_stream_rpc_stream_id_name.len,
  497. stream_id));
  498. if (operation_name) {
  499. AWS_LOGF_DEBUG(
  500. AWS_LS_EVENT_STREAM_RPC_CLIENT,
  501. "id=%p: operation name specified " PRInSTR,
  502. (void *)connection,
  503. AWS_BYTE_CURSOR_PRI(*operation_name));
  504. AWS_FATAL_ASSERT(!aws_event_stream_add_string_header(
  505. &headers_list,
  506. (const char *)aws_event_stream_rpc_operation_name.ptr,
  507. (uint8_t)aws_event_stream_rpc_operation_name.len,
  508. (const char *)operation_name->ptr,
  509. (uint16_t)operation_name->len,
  510. 0));
  511. }
  512. int message_init_err_code =
  513. aws_event_stream_message_init(&args->message, connection->allocator, &headers_list, message_args->payload);
  514. aws_array_list_clean_up(&headers_list);
  515. if (message_init_err_code) {
  516. AWS_LOGF_ERROR(
  517. AWS_LS_EVENT_STREAM_RPC_CLIENT,
  518. "id=%p: message init failed with error %s",
  519. (void *)connection,
  520. aws_error_debug_str(aws_last_error()));
  521. goto args_allocated_before_failure;
  522. }
  523. aws_event_stream_rpc_client_connection_acquire(connection);
  524. if (aws_event_stream_channel_handler_write_message(
  525. connection->event_stream_handler, &args->message, s_on_protocol_message_written_fn, args)) {
  526. AWS_LOGF_ERROR(
  527. AWS_LS_EVENT_STREAM_RPC_CLIENT,
  528. "id=%p: writing message failed with error %s",
  529. (void *)connection,
  530. aws_error_debug_str(aws_last_error()));
  531. goto message_initialized_before_failure;
  532. }
  533. return AWS_OP_SUCCESS;
  534. message_initialized_before_failure:
  535. aws_event_stream_message_clean_up(&args->message);
  536. args_allocated_before_failure:
  537. aws_mem_release(args->allocator, args);
  538. aws_event_stream_rpc_client_connection_release(connection);
  539. return AWS_OP_ERR;
  540. }
  541. int aws_event_stream_rpc_client_connection_send_protocol_message(
  542. struct aws_event_stream_rpc_client_connection *connection,
  543. const struct aws_event_stream_rpc_message_args *message_args,
  544. aws_event_stream_rpc_client_message_flush_fn *flush_fn,
  545. void *user_data) {
  546. if (!aws_event_stream_rpc_client_connection_is_open(connection)) {
  547. return aws_raise_error(AWS_ERROR_EVENT_STREAM_RPC_CONNECTION_CLOSED);
  548. }
  549. return s_send_protocol_message(connection, NULL, NULL, message_args, 0, flush_fn, user_data);
  550. }
  551. static void s_connection_error_message_flush_fn(int error_code, void *user_data) {
  552. (void)error_code;
  553. struct aws_event_stream_rpc_client_connection *connection = user_data;
  554. aws_event_stream_rpc_client_connection_close(connection, AWS_ERROR_EVENT_STREAM_RPC_PROTOCOL_ERROR);
  555. }
  556. static void s_send_connection_level_error(
  557. struct aws_event_stream_rpc_client_connection *connection,
  558. uint32_t message_type,
  559. uint32_t message_flags,
  560. const struct aws_byte_cursor *message) {
  561. struct aws_byte_buf payload_buf = aws_byte_buf_from_array(message->ptr, message->len);
  562. AWS_LOGF_DEBUG(
  563. AWS_LS_EVENT_STREAM_RPC_CLIENT,
  564. "id=%p: sending connection-level error\n" PRInSTR,
  565. (void *)connection,
  566. AWS_BYTE_BUF_PRI(payload_buf));
  567. struct aws_event_stream_header_value_pair content_type_header =
  568. aws_event_stream_create_string_header(s_json_content_type_name, s_json_content_type_value);
  569. struct aws_event_stream_header_value_pair headers[] = {
  570. content_type_header,
  571. };
  572. struct aws_event_stream_rpc_message_args message_args = {
  573. .message_type = message_type,
  574. .message_flags = message_flags,
  575. .payload = &payload_buf,
  576. .headers_count = 1,
  577. .headers = headers,
  578. };
  579. aws_event_stream_rpc_client_connection_send_protocol_message(
  580. connection, &message_args, s_connection_error_message_flush_fn, connection);
  581. }
  582. static void s_route_message_by_type(
  583. struct aws_event_stream_rpc_client_connection *connection,
  584. struct aws_event_stream_message *message,
  585. struct aws_array_list *headers_list,
  586. uint32_t stream_id,
  587. uint32_t message_type,
  588. uint32_t message_flags) {
  589. struct aws_byte_buf payload_buf = aws_byte_buf_from_array(
  590. aws_event_stream_message_payload(message), aws_event_stream_message_payload_len(message));
  591. struct aws_event_stream_rpc_message_args message_args = {
  592. .headers = headers_list->data,
  593. .headers_count = aws_array_list_length(headers_list),
  594. .payload = &payload_buf,
  595. .message_flags = message_flags,
  596. .message_type = message_type,
  597. };
  598. size_t handshake_complete = aws_atomic_load_int(&connection->handshake_state);
  599. /* make sure if this is not a CONNECT message being received, the handshake has been completed. */
  600. if (handshake_complete < CONNECTION_HANDSHAKE_STATE_CONNECT_ACK_PROCESSED &&
  601. message_type != AWS_EVENT_STREAM_RPC_MESSAGE_TYPE_CONNECT_ACK) {
  602. AWS_LOGF_ERROR(
  603. AWS_LS_EVENT_STREAM_RPC_CLIENT,
  604. "id=%p: a message was received on this connection prior to the "
  605. "connect handshake completing",
  606. (void *)connection);
  607. aws_raise_error(AWS_ERROR_EVENT_STREAM_RPC_PROTOCOL_ERROR);
  608. s_send_connection_level_error(
  609. connection, AWS_EVENT_STREAM_RPC_MESSAGE_TYPE_PROTOCOL_ERROR, 0, &s_connect_not_completed_error);
  610. return;
  611. }
  612. /* stream_id being non zero ALWAYS indicates APPLICATION_DATA or APPLICATION_ERROR. */
  613. if (stream_id > 0) {
  614. AWS_LOGF_TRACE(AWS_LS_EVENT_STREAM_RPC_CLIENT, "id=%p: stream id %" PRIu32, (void *)connection, stream_id);
  615. struct aws_event_stream_rpc_client_continuation_token *continuation = NULL;
  616. if (message_type > AWS_EVENT_STREAM_RPC_MESSAGE_TYPE_APPLICATION_ERROR) {
  617. AWS_LOGF_ERROR(
  618. AWS_LS_EVENT_STREAM_RPC_CLIENT,
  619. "id=%p: only application messages can be sent on a stream id, "
  620. "but this message is the incorrect type",
  621. (void *)connection);
  622. aws_raise_error(AWS_ERROR_EVENT_STREAM_RPC_PROTOCOL_ERROR);
  623. s_send_connection_level_error(
  624. connection, AWS_EVENT_STREAM_RPC_MESSAGE_TYPE_PROTOCOL_ERROR, 0, &s_invalid_stream_id_error);
  625. return;
  626. }
  627. aws_mutex_lock(&connection->stream_lock);
  628. struct aws_hash_element *continuation_element = NULL;
  629. if (aws_hash_table_find(&connection->continuation_table, &stream_id, &continuation_element) ||
  630. !continuation_element) {
  631. bool old_stream_id = stream_id <= connection->latest_stream_id;
  632. aws_mutex_unlock(&connection->stream_lock);
  633. if (!old_stream_id) {
  634. AWS_LOGF_ERROR(
  635. AWS_LS_EVENT_STREAM_RPC_CLIENT,
  636. "id=%p: a stream id was received that was not created by this client",
  637. (void *)connection);
  638. aws_raise_error(AWS_ERROR_EVENT_STREAM_RPC_PROTOCOL_ERROR);
  639. s_send_connection_level_error(
  640. connection, AWS_EVENT_STREAM_RPC_MESSAGE_TYPE_PROTOCOL_ERROR, 0, &s_invalid_client_stream_id_error);
  641. } else {
  642. AWS_LOGF_WARN(
  643. AWS_LS_EVENT_STREAM_RPC_CLIENT,
  644. "id=%p: a stream id was received that corresponds to an already-closed stream",
  645. (void *)connection);
  646. }
  647. return;
  648. }
  649. continuation = continuation_element->value;
  650. AWS_FATAL_ASSERT(continuation != NULL);
  651. aws_event_stream_rpc_client_continuation_acquire(continuation);
  652. aws_mutex_unlock(&connection->stream_lock);
  653. continuation->continuation_fn(continuation, &message_args, continuation->user_data);
  654. aws_event_stream_rpc_client_continuation_release(continuation);
  655. /* if it was a terminal stream message purge it from the hash table. The delete will decref the continuation. */
  656. if (message_flags & AWS_EVENT_STREAM_RPC_MESSAGE_FLAG_TERMINATE_STREAM) {
  657. AWS_LOGF_DEBUG(
  658. AWS_LS_EVENT_STREAM_RPC_CLIENT,
  659. "id=%p: the terminate stream flag was specified for continuation %p",
  660. (void *)connection,
  661. (void *)continuation);
  662. aws_atomic_store_int(&continuation->is_closed, 1U);
  663. aws_mutex_lock(&connection->stream_lock);
  664. aws_hash_table_remove(&connection->continuation_table, &stream_id, NULL, NULL);
  665. aws_mutex_unlock(&connection->stream_lock);
  666. /* Note that we do not invoke callback while holding lock */
  667. s_complete_continuation(continuation);
  668. }
  669. } else {
  670. if (message_type <= AWS_EVENT_STREAM_RPC_MESSAGE_TYPE_APPLICATION_ERROR ||
  671. message_type >= AWS_EVENT_STREAM_RPC_MESSAGE_TYPE_COUNT) {
  672. AWS_LOGF_ERROR(
  673. AWS_LS_EVENT_STREAM_RPC_CLIENT,
  674. "id=%p: a zero stream id was received with an invalid message-type %" PRIu32,
  675. (void *)connection,
  676. message_type);
  677. s_send_connection_level_error(
  678. connection, AWS_EVENT_STREAM_RPC_MESSAGE_TYPE_PROTOCOL_ERROR, 0, &s_invalid_message_type_error);
  679. return;
  680. }
  681. if (message_type == AWS_EVENT_STREAM_RPC_MESSAGE_TYPE_CONNECT_ACK) {
  682. if (handshake_complete != CONNECTION_HANDSHAKE_STATE_CONNECT_PROCESSED) {
  683. AWS_LOGF_ERROR(
  684. AWS_LS_EVENT_STREAM_RPC_CLIENT,
  685. "id=%p: connect ack received but the handshake is already completed. Only one is allowed.",
  686. (void *)connection);
  687. /* only one connect is allowed. This would be a duplicate. */
  688. s_send_connection_level_error(
  689. connection, AWS_EVENT_STREAM_RPC_MESSAGE_TYPE_PROTOCOL_ERROR, 0, &s_connect_not_completed_error);
  690. return;
  691. }
  692. aws_atomic_store_int(&connection->handshake_state, CONNECTION_HANDSHAKE_STATE_CONNECT_ACK_PROCESSED);
  693. AWS_LOGF_INFO(
  694. AWS_LS_EVENT_STREAM_RPC_CLIENT,
  695. "id=%p: connect ack received, connection handshake completed",
  696. (void *)connection);
  697. }
  698. connection->on_connection_protocol_message(connection, &message_args, connection->user_data);
  699. }
  700. }
  701. /* invoked by the event stream channel handler when a complete message has been read from the channel. */
  702. static void s_on_message_received(struct aws_event_stream_message *message, int error_code, void *user_data) {
  703. if (!error_code) {
  704. struct aws_event_stream_rpc_client_connection *connection = user_data;
  705. AWS_LOGF_TRACE(
  706. AWS_LS_EVENT_STREAM_RPC_CLIENT,
  707. "id=%p: message received on connection of length %" PRIu32,
  708. (void *)connection,
  709. aws_event_stream_message_total_length(message));
  710. struct aws_array_list headers;
  711. if (aws_array_list_init_dynamic(
  712. &headers, connection->allocator, 8, sizeof(struct aws_event_stream_header_value_pair))) {
  713. AWS_LOGF_ERROR(
  714. AWS_LS_EVENT_STREAM_RPC_CLIENT,
  715. "id=%p: error initializing headers %s",
  716. (void *)connection,
  717. aws_error_debug_str(aws_last_error()));
  718. s_send_connection_level_error(
  719. connection, AWS_EVENT_STREAM_RPC_MESSAGE_TYPE_INTERNAL_ERROR, 0, &s_internal_error);
  720. return;
  721. }
  722. if (aws_event_stream_message_headers(message, &headers)) {
  723. AWS_LOGF_ERROR(
  724. AWS_LS_EVENT_STREAM_RPC_CLIENT,
  725. "id=%p: error fetching headers %s",
  726. (void *)connection,
  727. aws_error_debug_str(aws_last_error()));
  728. s_send_connection_level_error(
  729. connection, AWS_EVENT_STREAM_RPC_MESSAGE_TYPE_INTERNAL_ERROR, 0, &s_internal_error);
  730. goto clean_up;
  731. }
  732. int32_t stream_id = -1;
  733. int32_t message_type = -1;
  734. int32_t message_flags = -1;
  735. struct aws_byte_buf operation_name_buf;
  736. AWS_ZERO_STRUCT(operation_name_buf);
  737. if (aws_event_stream_rpc_extract_message_metadata(
  738. &headers, &stream_id, &message_type, &message_flags, &operation_name_buf)) {
  739. AWS_LOGF_ERROR(
  740. AWS_LS_EVENT_STREAM_RPC_CLIENT,
  741. "id=%p: invalid protocol message with error %s",
  742. (void *)connection,
  743. aws_error_debug_str(aws_last_error()));
  744. s_send_connection_level_error(
  745. connection, AWS_EVENT_STREAM_RPC_MESSAGE_TYPE_PROTOCOL_ERROR, 0, &s_invalid_message_error);
  746. goto clean_up;
  747. }
  748. (void)operation_name_buf;
  749. AWS_LOGF_TRACE(AWS_LS_EVENT_STREAM_RPC_CLIENT, "id=%p: routing message", (void *)connection);
  750. s_route_message_by_type(connection, message, &headers, stream_id, message_type, message_flags);
  751. clean_up:
  752. aws_event_stream_headers_list_cleanup(&headers);
  753. }
  754. }
  755. struct aws_event_stream_rpc_client_continuation_token *aws_event_stream_rpc_client_connection_new_stream(
  756. struct aws_event_stream_rpc_client_connection *connection,
  757. const struct aws_event_stream_rpc_client_stream_continuation_options *continuation_options) {
  758. AWS_PRECONDITION(continuation_options->on_continuation_closed);
  759. AWS_PRECONDITION(continuation_options->on_continuation);
  760. AWS_LOGF_TRACE(AWS_LS_EVENT_STREAM_RPC_CLIENT, "id=%p: creating a new stream on connection", (void *)connection);
  761. struct aws_event_stream_rpc_client_continuation_token *continuation =
  762. aws_mem_calloc(connection->allocator, 1, sizeof(struct aws_event_stream_rpc_client_continuation_token));
  763. if (!continuation) {
  764. AWS_LOGF_ERROR(
  765. AWS_LS_EVENT_STREAM_RPC_CLIENT,
  766. "id=%p: error while allocating continuation %s",
  767. (void *)connection,
  768. aws_error_debug_str(aws_last_error()));
  769. return NULL;
  770. }
  771. AWS_LOGF_DEBUG(
  772. AWS_LS_EVENT_STREAM_RPC_CLIENT, "id=%p: continuation created %p", (void *)connection, (void *)continuation);
  773. continuation->connection = connection;
  774. aws_event_stream_rpc_client_connection_acquire(continuation->connection);
  775. aws_atomic_init_int(&continuation->ref_count, 1);
  776. aws_atomic_init_int(&continuation->is_closed, 0);
  777. continuation->continuation_fn = continuation_options->on_continuation;
  778. continuation->closed_fn = continuation_options->on_continuation_closed;
  779. continuation->user_data = continuation_options->user_data;
  780. return continuation;
  781. }
  782. void *aws_event_stream_rpc_client_continuation_get_user_data(
  783. struct aws_event_stream_rpc_client_continuation_token *continuation) {
  784. return continuation->user_data;
  785. }
  786. void aws_event_stream_rpc_client_continuation_acquire(
  787. const struct aws_event_stream_rpc_client_continuation_token *continuation) {
  788. size_t current_count = aws_atomic_fetch_add_explicit(
  789. &((struct aws_event_stream_rpc_client_continuation_token *)continuation)->ref_count,
  790. 1u,
  791. aws_memory_order_relaxed);
  792. AWS_LOGF_TRACE(
  793. AWS_LS_EVENT_STREAM_RPC_CLIENT,
  794. "id=%p: continuation acquired, new ref count is %zu.",
  795. (void *)continuation,
  796. current_count + 1);
  797. }
  798. void aws_event_stream_rpc_client_continuation_release(
  799. const struct aws_event_stream_rpc_client_continuation_token *continuation) {
  800. if (AWS_UNLIKELY(!continuation)) {
  801. return;
  802. }
  803. struct aws_event_stream_rpc_client_continuation_token *continuation_mut =
  804. (struct aws_event_stream_rpc_client_continuation_token *)continuation;
  805. size_t ref_count = aws_atomic_fetch_sub_explicit(&continuation_mut->ref_count, 1, aws_memory_order_seq_cst);
  806. AWS_LOGF_TRACE(
  807. AWS_LS_EVENT_STREAM_RPC_CLIENT,
  808. "id=%p: continuation released, new ref count is %zu.",
  809. (void *)continuation,
  810. ref_count - 1);
  811. AWS_FATAL_ASSERT(ref_count != 0 && "Continuation ref count has gone negative");
  812. if (ref_count == 1) {
  813. struct aws_allocator *allocator = continuation_mut->connection->allocator;
  814. aws_event_stream_rpc_client_connection_release(continuation_mut->connection);
  815. aws_mem_release(allocator, continuation_mut);
  816. }
  817. }
  818. bool aws_event_stream_rpc_client_continuation_is_closed(
  819. const struct aws_event_stream_rpc_client_continuation_token *continuation) {
  820. return aws_atomic_load_int(&continuation->is_closed) == 1u;
  821. }
  822. int aws_event_stream_rpc_client_continuation_activate(
  823. struct aws_event_stream_rpc_client_continuation_token *continuation,
  824. struct aws_byte_cursor operation_name,
  825. const struct aws_event_stream_rpc_message_args *message_args,
  826. aws_event_stream_rpc_client_message_flush_fn *flush_fn,
  827. void *user_data) {
  828. AWS_LOGF_TRACE(AWS_LS_EVENT_STREAM_RPC_CLIENT, "id=%p: activating continuation", (void *)continuation);
  829. int ret_val = AWS_OP_ERR;
  830. aws_mutex_lock(&continuation->connection->stream_lock);
  831. if (continuation->stream_id) {
  832. AWS_LOGF_ERROR(
  833. AWS_LS_EVENT_STREAM_RPC_CLIENT, "id=%p: stream has already been activated", (void *)continuation);
  834. aws_raise_error(AWS_ERROR_INVALID_STATE);
  835. goto clean_up;
  836. }
  837. /* Even though is_open is atomic, we need to hold a lock while checking it.
  838. * This lets us coordinate with code that sets is_open to false. */
  839. if (!aws_event_stream_rpc_client_connection_is_open(continuation->connection)) {
  840. AWS_LOGF_ERROR(AWS_LS_EVENT_STREAM_RPC_CLIENT, "id=%p: stream's connection is not open", (void *)continuation);
  841. aws_raise_error(AWS_ERROR_EVENT_STREAM_RPC_CONNECTION_CLOSED);
  842. goto clean_up;
  843. }
  844. /* we cannot update the connection's stream id until we're certain the message at least made it to the wire, because
  845. * the next stream id must be consecutively increasing by 1. So send the message then update the connection state
  846. * once we've made it to the wire. */
  847. continuation->stream_id = continuation->connection->latest_stream_id + 1;
  848. AWS_LOGF_DEBUG(
  849. AWS_LS_EVENT_STREAM_RPC_CLIENT,
  850. "id=%p: continuation's new stream id is %" PRIu32,
  851. (void *)continuation,
  852. continuation->stream_id);
  853. if (aws_hash_table_put(
  854. &continuation->connection->continuation_table, &continuation->stream_id, continuation, NULL)) {
  855. AWS_LOGF_ERROR(
  856. AWS_LS_EVENT_STREAM_RPC_CLIENT,
  857. "id=%p: storing the new stream failed with %s",
  858. (void *)continuation,
  859. aws_error_debug_str(aws_last_error()));
  860. continuation->stream_id = 0;
  861. goto clean_up;
  862. }
  863. if (s_send_protocol_message(
  864. continuation->connection,
  865. continuation,
  866. &operation_name,
  867. message_args,
  868. continuation->stream_id,
  869. flush_fn,
  870. user_data)) {
  871. aws_hash_table_remove(&continuation->connection->continuation_table, &continuation->stream_id, NULL, NULL);
  872. continuation->stream_id = 0;
  873. AWS_LOGF_ERROR(
  874. AWS_LS_EVENT_STREAM_RPC_CLIENT,
  875. "id=%p: failed to flush the new stream to the channel with error %s",
  876. (void *)continuation,
  877. aws_error_debug_str(aws_last_error()));
  878. goto clean_up;
  879. }
  880. /* The continuation table gets a ref count on the continuation. Take it here. */
  881. aws_event_stream_rpc_client_continuation_acquire(continuation);
  882. continuation->connection->latest_stream_id = continuation->stream_id;
  883. ret_val = AWS_OP_SUCCESS;
  884. clean_up:
  885. aws_mutex_unlock(&continuation->connection->stream_lock);
  886. return ret_val;
  887. }
  888. int aws_event_stream_rpc_client_continuation_send_message(
  889. struct aws_event_stream_rpc_client_continuation_token *continuation,
  890. const struct aws_event_stream_rpc_message_args *message_args,
  891. aws_event_stream_rpc_client_message_flush_fn *flush_fn,
  892. void *user_data) {
  893. if (aws_event_stream_rpc_client_continuation_is_closed(continuation)) {
  894. return aws_raise_error(AWS_ERROR_EVENT_STREAM_RPC_STREAM_CLOSED);
  895. }
  896. if (!continuation->stream_id) {
  897. return aws_raise_error(AWS_ERROR_EVENT_STREAM_RPC_STREAM_NOT_ACTIVATED);
  898. }
  899. return s_send_protocol_message(
  900. continuation->connection, continuation, NULL, message_args, continuation->stream_id, flush_fn, user_data);
  901. }