event_stream_rpc_client.c 41 KB

1234567891011121314151617181920212223242526272829303132333435363738394041424344454647484950515253545556575859606162636465666768697071727374757677787980818283848586878889909192939495969798991001011021031041051061071081091101111121131141151161171181191201211221231241251261271281291301311321331341351361371381391401411421431441451461471481491501511521531541551561571581591601611621631641651661671681691701711721731741751761771781791801811821831841851861871881891901911921931941951961971981992002012022032042052062072082092102112122132142152162172182192202212222232242252262272282292302312322332342352362372382392402412422432442452462472482492502512522532542552562572582592602612622632642652662672682692702712722732742752762772782792802812822832842852862872882892902912922932942952962972982993003013023033043053063073083093103113123133143153163173183193203213223233243253263273283293303313323333343353363373383393403413423433443453463473483493503513523533543553563573583593603613623633643653663673683693703713723733743753763773783793803813823833843853863873883893903913923933943953963973983994004014024034044054064074084094104114124134144154164174184194204214224234244254264274284294304314324334344354364374384394404414424434444454464474484494504514524534544554564574584594604614624634644654664674684694704714724734744754764774784794804814824834844854864874884894904914924934944954964974984995005015025035045055065075085095105115125135145155165175185195205215225235245255265275285295305315325335345355365375385395405415425435445455465475485495505515525535545555565575585595605615625635645655665675685695705715725735745755765775785795805815825835845855865875885895905915925935945955965975985996006016026036046056066076086096106116126136146156166176186196206216226236246256266276286296306316326336346356366376386396406416426436446456466476486496506516526536546556566576586596606616626636646656666676686696706716726736746756766776786796806816826836846856866876886896906916926936946956966976986997007017027037047057067077087097107117127137147157167177187197207217227237247257267277287297307317327337347357367377387397407417427437447457467477487497507517527537547557567577587597607617627637647657667677687697707717727737747757767777787797807817827837847857867877887897907917927937947957967977987998008018028038048058068078088098108118128138148158168178188198208218228238248258268278288298308318328338348358368378388398408418428438448458468478488498508518528538548558568578588598608618628638648658668678688698708718728738748758768778788798808818828838848858868878888898908918928938948958968978988999009019029039049059069079089099109119129139149159169179189199209219229239249259269279289299309319329339349359369379389399409419429439449459469479489499509519529539549559569579589599609619629639649659669679689699709719729739749759769779789799809819829839849859869879889899909919929939949959969979989991000100110021003100410051006100710081009101010111012101310141015101610171018101910201021102210231024102510261027102810291030103110321033103410351036103710381039104010411042
  1. /**
  2. * Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved.
  3. * SPDX-License-Identifier: Apache-2.0.
  4. */
  5. #include <aws/event-stream/event_stream_channel_handler.h>
  6. #include <aws/event-stream/event_stream_rpc_client.h>
  7. #include <aws/event-stream/private/event_stream_rpc_priv.h>
  8. #include <aws/common/atomics.h>
  9. #include <aws/common/hash_table.h>
  10. #include <aws/common/mutex.h>
  11. #include <aws/io/channel_bootstrap.h>
  12. #include <inttypes.h>
  13. #ifdef _MSC_VER
  14. /* allow declared initializer using address of automatic variable */
  15. # pragma warning(disable : 4221)
  16. /* allow non-constant aggregate initializers */
  17. # pragma warning(disable : 4204)
  18. #endif
  19. static void s_clear_continuation_table(struct aws_event_stream_rpc_client_connection *connection);
  20. struct aws_event_stream_rpc_client_connection {
  21. struct aws_allocator *allocator;
  22. struct aws_hash_table continuation_table;
  23. struct aws_client_bootstrap *bootstrap_ref;
  24. struct aws_atomic_var ref_count;
  25. struct aws_channel *channel;
  26. struct aws_channel_handler *event_stream_handler;
  27. uint32_t latest_stream_id;
  28. struct aws_mutex stream_lock;
  29. struct aws_atomic_var is_open;
  30. struct aws_atomic_var handshake_state;
  31. size_t initial_window_size;
  32. aws_event_stream_rpc_client_on_connection_setup_fn *on_connection_setup;
  33. aws_event_stream_rpc_client_connection_protocol_message_fn *on_connection_protocol_message;
  34. aws_event_stream_rpc_client_on_connection_shutdown_fn *on_connection_shutdown;
  35. void *user_data;
  36. bool bootstrap_owned;
  37. bool enable_read_back_pressure;
  38. };
  39. struct aws_event_stream_rpc_client_continuation_token {
  40. uint32_t stream_id;
  41. struct aws_event_stream_rpc_client_connection *connection;
  42. aws_event_stream_rpc_client_stream_continuation_fn *continuation_fn;
  43. aws_event_stream_rpc_client_stream_continuation_closed_fn *closed_fn;
  44. void *user_data;
  45. struct aws_atomic_var ref_count;
  46. struct aws_atomic_var is_closed;
  47. };
  48. static void s_on_message_received(struct aws_event_stream_message *message, int error_code, void *user_data);
  49. static int s_create_connection_on_channel(
  50. struct aws_event_stream_rpc_client_connection *connection,
  51. struct aws_channel *channel) {
  52. struct aws_channel_handler *event_stream_handler = NULL;
  53. struct aws_channel_slot *slot = NULL;
  54. struct aws_event_stream_channel_handler_options handler_options = {
  55. .on_message_received = s_on_message_received,
  56. .user_data = connection,
  57. .initial_window_size = connection->initial_window_size,
  58. .manual_window_management = connection->enable_read_back_pressure,
  59. };
  60. AWS_LOGF_TRACE(
  61. AWS_LS_EVENT_STREAM_RPC_CLIENT,
  62. "id=%p: creating an event-stream handler on channel %p",
  63. (void *)connection,
  64. (void *)channel);
  65. event_stream_handler = aws_event_stream_channel_handler_new(connection->allocator, &handler_options);
  66. if (!event_stream_handler) {
  67. AWS_LOGF_ERROR(
  68. AWS_LS_EVENT_STREAM_RPC_CLIENT,
  69. "id=%p: creating an event-stream handler failed with error %s",
  70. (void *)connection,
  71. aws_error_debug_str(aws_last_error()));
  72. goto error;
  73. }
  74. slot = aws_channel_slot_new(channel);
  75. if (!slot) {
  76. AWS_LOGF_ERROR(
  77. AWS_LS_EVENT_STREAM_RPC_CLIENT,
  78. "id=%p: creating channel slot failed with error %s",
  79. (void *)connection,
  80. aws_error_debug_str(aws_last_error()));
  81. goto error;
  82. }
  83. aws_channel_slot_insert_end(channel, slot);
  84. if (aws_channel_slot_set_handler(slot, event_stream_handler)) {
  85. AWS_LOGF_ERROR(
  86. AWS_LS_EVENT_STREAM_RPC_CLIENT,
  87. "id=%p: setting handler on channel slot failed with error %s",
  88. (void *)connection,
  89. aws_error_debug_str(aws_last_error()));
  90. goto error;
  91. }
  92. connection->event_stream_handler = event_stream_handler;
  93. connection->channel = channel;
  94. aws_channel_acquire_hold(channel);
  95. return AWS_OP_SUCCESS;
  96. error:
  97. if (!slot && event_stream_handler) {
  98. aws_channel_handler_destroy(event_stream_handler);
  99. }
  100. return AWS_OP_ERR;
  101. }
  102. static void s_on_channel_setup_fn(
  103. struct aws_client_bootstrap *bootstrap,
  104. int error_code,
  105. struct aws_channel *channel,
  106. void *user_data) {
  107. (void)bootstrap;
  108. struct aws_event_stream_rpc_client_connection *connection = user_data;
  109. AWS_LOGF_DEBUG(
  110. AWS_LS_EVENT_STREAM_RPC_CLIENT,
  111. "id=%p: on_channel_setup_fn invoked with error_code %d with channel %p",
  112. (void *)connection,
  113. error_code,
  114. (void *)channel);
  115. if (!error_code) {
  116. connection->bootstrap_owned = true;
  117. if (s_create_connection_on_channel(connection, channel)) {
  118. int last_error = aws_last_error();
  119. connection->on_connection_setup(NULL, last_error, connection->user_data);
  120. aws_channel_shutdown(channel, last_error);
  121. return;
  122. }
  123. AWS_LOGF_DEBUG(
  124. AWS_LS_EVENT_STREAM_RPC_CLIENT,
  125. "id=%p: successful event-stream channel setup %p",
  126. (void *)connection,
  127. (void *)channel);
  128. aws_event_stream_rpc_client_connection_acquire(connection);
  129. connection->on_connection_setup(connection, AWS_OP_SUCCESS, connection->user_data);
  130. aws_event_stream_rpc_client_connection_release(connection);
  131. } else {
  132. connection->on_connection_setup(NULL, error_code, connection->user_data);
  133. aws_event_stream_rpc_client_connection_release(connection);
  134. }
  135. }
  136. static void s_on_channel_shutdown_fn(
  137. struct aws_client_bootstrap *bootstrap,
  138. int error_code,
  139. struct aws_channel *channel,
  140. void *user_data) {
  141. (void)bootstrap;
  142. struct aws_event_stream_rpc_client_connection *connection = user_data;
  143. AWS_LOGF_DEBUG(
  144. AWS_LS_EVENT_STREAM_RPC_CLIENT,
  145. "id=%p: on_channel_shutdown_fn invoked with error_code %d with channel %p",
  146. (void *)connection,
  147. error_code,
  148. (void *)channel);
  149. aws_atomic_store_int(&connection->is_open, 0u);
  150. if (connection->bootstrap_owned) {
  151. s_clear_continuation_table(connection);
  152. aws_event_stream_rpc_client_connection_acquire(connection);
  153. connection->on_connection_shutdown(connection, error_code, connection->user_data);
  154. aws_event_stream_rpc_client_connection_release(connection);
  155. }
  156. aws_channel_release_hold(channel);
  157. aws_event_stream_rpc_client_connection_release(connection);
  158. }
  159. /* Set each continuation's is_closed=true.
  160. * A lock MUST be held while calling this.
  161. * For use with aws_hash_table_foreach(). */
  162. static int s_mark_each_continuation_closed(void *context, struct aws_hash_element *p_element) {
  163. (void)context;
  164. struct aws_event_stream_rpc_client_continuation_token *continuation = p_element->value;
  165. aws_atomic_store_int(&continuation->is_closed, 1U);
  166. return AWS_COMMON_HASH_TABLE_ITER_CONTINUE;
  167. }
  168. /* Invoke continuation's on_closed() callback.
  169. * A lock must NOT be hold while calling this */
  170. static void s_complete_continuation(struct aws_event_stream_rpc_client_continuation_token *token) {
  171. AWS_LOGF_DEBUG(
  172. AWS_LS_EVENT_STREAM_RPC_CLIENT,
  173. "token=%p: token with stream-id %" PRIu32 ", purged from the stream table",
  174. (void *)token,
  175. token->stream_id);
  176. if (token->stream_id) {
  177. token->closed_fn(token, token->user_data);
  178. }
  179. aws_event_stream_rpc_client_continuation_release(token);
  180. }
  181. static int s_complete_and_clear_each_continuation(void *context, struct aws_hash_element *p_element) {
  182. (void)context;
  183. struct aws_event_stream_rpc_client_continuation_token *continuation = p_element->value;
  184. s_complete_continuation(continuation);
  185. return AWS_COMMON_HASH_TABLE_ITER_DELETE | AWS_COMMON_HASH_TABLE_ITER_CONTINUE;
  186. }
  187. /* Remove each continuation from hash-table and invoke its on_closed() callback.
  188. * The connection->is_open must be set false before calling this. */
  189. static void s_clear_continuation_table(struct aws_event_stream_rpc_client_connection *connection) {
  190. AWS_ASSERT(!aws_event_stream_rpc_client_connection_is_open(connection));
  191. /* Use lock to ensure synchronization with code that adds entries to table.
  192. * Since connection was just marked closed, no further entries will be
  193. * added to table once we acquire the lock. */
  194. aws_mutex_lock(&connection->stream_lock);
  195. aws_hash_table_foreach(&connection->continuation_table, s_mark_each_continuation_closed, NULL);
  196. aws_mutex_unlock(&connection->stream_lock);
  197. /* Now release lock before invoking callbacks.
  198. * It's safe to alter the table now without a lock, since no further
  199. * entries can be added, and we've gone through the critical section
  200. * above to ensure synchronization */
  201. aws_hash_table_foreach(&connection->continuation_table, s_complete_and_clear_each_continuation, NULL);
  202. }
  203. int aws_event_stream_rpc_client_connection_connect(
  204. struct aws_allocator *allocator,
  205. const struct aws_event_stream_rpc_client_connection_options *conn_options) {
  206. AWS_PRECONDITION(allocator);
  207. AWS_PRECONDITION(conn_options);
  208. AWS_PRECONDITION(conn_options->on_connection_protocol_message);
  209. AWS_PRECONDITION(conn_options->on_connection_setup);
  210. AWS_PRECONDITION(conn_options->on_connection_shutdown);
  211. struct aws_event_stream_rpc_client_connection *connection =
  212. aws_mem_calloc(allocator, 1, sizeof(struct aws_event_stream_rpc_client_connection));
  213. AWS_LOGF_TRACE(AWS_LS_EVENT_STREAM_RPC_CLIENT, "id=%p: creating new connection", (void *)connection);
  214. if (!connection) {
  215. return AWS_OP_ERR;
  216. }
  217. connection->allocator = allocator;
  218. aws_atomic_init_int(&connection->ref_count, 1);
  219. connection->bootstrap_ref = conn_options->bootstrap;
  220. /* this is released in the connection release which gets called regardless of if this function is successful or
  221. * not*/
  222. aws_client_bootstrap_acquire(connection->bootstrap_ref);
  223. aws_atomic_init_int(&connection->handshake_state, CONNECTION_HANDSHAKE_STATE_INITIALIZED);
  224. aws_atomic_init_int(&connection->is_open, 1);
  225. aws_mutex_init(&connection->stream_lock);
  226. connection->on_connection_shutdown = conn_options->on_connection_shutdown;
  227. connection->on_connection_protocol_message = conn_options->on_connection_protocol_message;
  228. connection->on_connection_setup = conn_options->on_connection_setup;
  229. connection->user_data = conn_options->user_data;
  230. if (aws_hash_table_init(
  231. &connection->continuation_table,
  232. allocator,
  233. 64,
  234. aws_event_stream_rpc_hash_streamid,
  235. aws_event_stream_rpc_streamid_eq,
  236. NULL,
  237. NULL)) {
  238. AWS_LOGF_ERROR(
  239. AWS_LS_EVENT_STREAM_RPC_CLIENT,
  240. "id=%p: failed initializing continuation table with error %s.",
  241. (void *)connection,
  242. aws_error_debug_str(aws_last_error()));
  243. goto error;
  244. }
  245. struct aws_socket_channel_bootstrap_options bootstrap_options = {
  246. .bootstrap = connection->bootstrap_ref,
  247. .tls_options = conn_options->tls_options,
  248. .socket_options = conn_options->socket_options,
  249. .user_data = connection,
  250. .host_name = conn_options->host_name,
  251. .port = conn_options->port,
  252. .enable_read_back_pressure = false,
  253. .setup_callback = s_on_channel_setup_fn,
  254. .shutdown_callback = s_on_channel_shutdown_fn,
  255. };
  256. if (aws_client_bootstrap_new_socket_channel(&bootstrap_options)) {
  257. AWS_LOGF_ERROR(
  258. AWS_LS_EVENT_STREAM_RPC_CLIENT,
  259. "id=%p: failed creating new socket channel with error %s.",
  260. (void *)connection,
  261. aws_error_debug_str(aws_last_error()));
  262. goto error;
  263. }
  264. return AWS_OP_SUCCESS;
  265. error:
  266. aws_event_stream_rpc_client_connection_release(connection);
  267. return AWS_OP_ERR;
  268. }
  269. void aws_event_stream_rpc_client_connection_acquire(const struct aws_event_stream_rpc_client_connection *connection) {
  270. AWS_PRECONDITION(connection);
  271. size_t current_count = aws_atomic_fetch_add_explicit(
  272. &((struct aws_event_stream_rpc_client_connection *)connection)->ref_count, 1, aws_memory_order_relaxed);
  273. AWS_LOGF_TRACE(
  274. AWS_LS_EVENT_STREAM_RPC_CLIENT,
  275. "id=%p: connection acquired, new ref count is %zu.",
  276. (void *)connection,
  277. current_count + 1);
  278. }
  279. static void s_destroy_connection(struct aws_event_stream_rpc_client_connection *connection) {
  280. AWS_LOGF_DEBUG(AWS_LS_EVENT_STREAM_RPC_CLIENT, "id=%p: destroying connection.", (void *)connection);
  281. aws_hash_table_clean_up(&connection->continuation_table);
  282. aws_client_bootstrap_release(connection->bootstrap_ref);
  283. aws_mem_release(connection->allocator, connection);
  284. }
  285. void aws_event_stream_rpc_client_connection_release(const struct aws_event_stream_rpc_client_connection *connection) {
  286. if (!connection) {
  287. return;
  288. }
  289. struct aws_event_stream_rpc_client_connection *connection_mut =
  290. (struct aws_event_stream_rpc_client_connection *)connection;
  291. size_t ref_count = aws_atomic_fetch_sub_explicit(&connection_mut->ref_count, 1, aws_memory_order_seq_cst);
  292. AWS_LOGF_TRACE(
  293. AWS_LS_EVENT_STREAM_RPC_CLIENT,
  294. "id=%p: connection released, new ref count is %zu.",
  295. (void *)connection,
  296. ref_count - 1);
  297. if (ref_count == 1) {
  298. s_destroy_connection(connection_mut);
  299. }
  300. }
  301. void aws_event_stream_rpc_client_connection_close(
  302. struct aws_event_stream_rpc_client_connection *connection,
  303. int shutdown_error_code) {
  304. AWS_LOGF_TRACE(
  305. AWS_LS_EVENT_STREAM_RPC_CLIENT,
  306. "id=%p: connection close invoked with reason %s.",
  307. (void *)connection,
  308. aws_error_debug_str(shutdown_error_code));
  309. size_t expect_open = 1U;
  310. if (aws_atomic_compare_exchange_int(&connection->is_open, &expect_open, 0U)) {
  311. aws_channel_shutdown(connection->channel, shutdown_error_code);
  312. if (!connection->bootstrap_owned) {
  313. s_clear_continuation_table(connection);
  314. aws_event_stream_rpc_client_connection_release(connection);
  315. }
  316. } else {
  317. AWS_LOGF_TRACE(AWS_LS_EVENT_STREAM_RPC_CLIENT, "id=%p: connection already closed.", (void *)connection);
  318. }
  319. }
  320. bool aws_event_stream_rpc_client_connection_is_open(const struct aws_event_stream_rpc_client_connection *connection) {
  321. return aws_atomic_load_int(&connection->is_open) == 1U;
  322. }
  323. struct event_stream_connection_send_message_args {
  324. struct aws_allocator *allocator;
  325. struct aws_event_stream_message message;
  326. enum aws_event_stream_rpc_message_type message_type;
  327. struct aws_event_stream_rpc_client_connection *connection;
  328. struct aws_event_stream_rpc_client_continuation_token *continuation;
  329. aws_event_stream_rpc_client_message_flush_fn *flush_fn;
  330. void *user_data;
  331. bool end_stream;
  332. bool terminate_connection;
  333. };
  334. static void s_on_protocol_message_written_fn(
  335. struct aws_event_stream_message *message,
  336. int error_code,
  337. void *user_data) {
  338. (void)message;
  339. struct event_stream_connection_send_message_args *message_args = user_data;
  340. AWS_LOGF_TRACE(
  341. AWS_LS_EVENT_STREAM_RPC_CLIENT,
  342. "id=%p: message %p flushed to channel.",
  343. (void *)message_args->connection,
  344. (void *)message);
  345. if (message_args->message_type == AWS_EVENT_STREAM_RPC_MESSAGE_TYPE_CONNECT) {
  346. AWS_LOGF_TRACE(
  347. AWS_LS_EVENT_STREAM_RPC_CLIENT,
  348. "id=%p: connect message flushed to the wire.",
  349. (void *)message_args->connection);
  350. }
  351. if (message_args->end_stream) {
  352. AWS_LOGF_DEBUG(
  353. AWS_LS_EVENT_STREAM_RPC_CLIENT,
  354. "id=%p: the end stream flag was set, closing continuation %p.",
  355. (void *)message_args->connection,
  356. (void *)message_args->continuation);
  357. AWS_FATAL_ASSERT(message_args->continuation && "end stream flag was set but it wasn't on a continuation");
  358. aws_atomic_store_int(&message_args->continuation->is_closed, 1U);
  359. aws_mutex_lock(&message_args->connection->stream_lock);
  360. aws_hash_table_remove(
  361. &message_args->connection->continuation_table, &message_args->continuation->stream_id, NULL, NULL);
  362. aws_mutex_unlock(&message_args->connection->stream_lock);
  363. /* Lock must NOT be held while invoking callback */
  364. s_complete_continuation(message_args->continuation);
  365. }
  366. message_args->flush_fn(error_code, message_args->user_data);
  367. if (message_args->terminate_connection) {
  368. AWS_LOGF_DEBUG(
  369. AWS_LS_EVENT_STREAM_RPC_CLIENT,
  370. "id=%p: terminate_connection flag was specified. Shutting down the connection.",
  371. (void *)message_args->connection);
  372. aws_event_stream_rpc_client_connection_close(message_args->connection, AWS_ERROR_SUCCESS);
  373. }
  374. aws_event_stream_rpc_client_connection_release(message_args->connection);
  375. if (message_args->continuation) {
  376. aws_event_stream_rpc_client_continuation_release(message_args->continuation);
  377. }
  378. aws_event_stream_message_clean_up(&message_args->message);
  379. aws_mem_release(message_args->allocator, message_args);
  380. }
  381. static int s_send_protocol_message(
  382. struct aws_event_stream_rpc_client_connection *connection,
  383. struct aws_event_stream_rpc_client_continuation_token *continuation,
  384. struct aws_byte_cursor *operation_name,
  385. const struct aws_event_stream_rpc_message_args *message_args,
  386. int32_t stream_id,
  387. aws_event_stream_rpc_client_message_flush_fn *flush_fn,
  388. void *user_data) {
  389. AWS_LOGF_TRACE(
  390. AWS_LS_EVENT_STREAM_RPC_CLIENT,
  391. "id=%p: sending message. continuation: %p, stream id %" PRId32,
  392. (void *)connection,
  393. (void *)continuation,
  394. stream_id);
  395. size_t connect_handshake_state = aws_atomic_load_int(&connection->handshake_state);
  396. AWS_LOGF_TRACE(
  397. AWS_LS_EVENT_STREAM_RPC_CLIENT,
  398. "id=%p: handshake completion value %zu",
  399. (void *)connection,
  400. connect_handshake_state);
  401. /* handshake step 1 is a connect message being received. Handshake 2 is the connect ack being sent.
  402. * no messages other than connect and connect ack are allowed until this count reaches 2. */
  403. if (connect_handshake_state != CONNECTION_HANDSHAKE_STATE_CONNECT_ACK_PROCESSED &&
  404. message_args->message_type < AWS_EVENT_STREAM_RPC_MESSAGE_TYPE_CONNECT) {
  405. AWS_LOGF_ERROR(
  406. AWS_LS_EVENT_STREAM_RPC_CLIENT,
  407. "id=%p: handshake not completed, only a connect message can be sent.",
  408. (void *)connection);
  409. return aws_raise_error(AWS_ERROR_EVENT_STREAM_RPC_PROTOCOL_ERROR);
  410. }
  411. struct event_stream_connection_send_message_args *args =
  412. aws_mem_calloc(connection->allocator, 1, sizeof(struct event_stream_connection_send_message_args));
  413. if (!message_args) {
  414. AWS_LOGF_ERROR(
  415. AWS_LS_EVENT_STREAM_RPC_CLIENT,
  416. "id=%p: failed to allocate callback arguments %s.",
  417. (void *)connection,
  418. aws_error_debug_str(aws_last_error()));
  419. return AWS_OP_ERR;
  420. }
  421. args->allocator = connection->allocator;
  422. args->user_data = user_data;
  423. args->message_type = message_args->message_type;
  424. args->connection = connection;
  425. args->flush_fn = flush_fn;
  426. if (continuation) {
  427. AWS_LOGF_TRACE(
  428. AWS_LS_EVENT_STREAM_RPC_CLIENT,
  429. "id=%p: sending message on continuation %p",
  430. (void *)connection,
  431. (void *)continuation);
  432. args->continuation = continuation;
  433. aws_event_stream_rpc_client_continuation_acquire(continuation);
  434. if (message_args->message_flags & AWS_EVENT_STREAM_RPC_MESSAGE_FLAG_TERMINATE_STREAM) {
  435. AWS_LOGF_DEBUG(
  436. AWS_LS_EVENT_STREAM_RPC_CLIENT,
  437. "id=%p:end stream flag was specified on continuation %p",
  438. (void *)connection,
  439. (void *)continuation);
  440. args->end_stream = true;
  441. }
  442. }
  443. if (message_args->message_type == AWS_EVENT_STREAM_RPC_MESSAGE_TYPE_CONNECT_ACK &&
  444. !(message_args->message_flags & AWS_EVENT_STREAM_RPC_MESSAGE_FLAG_CONNECTION_ACCEPTED)) {
  445. AWS_LOGF_DEBUG(AWS_LS_EVENT_STREAM_RPC_CLIENT, "id=%p: terminating connection", (void *)connection);
  446. args->terminate_connection = true;
  447. }
  448. if (message_args->message_type == AWS_EVENT_STREAM_RPC_MESSAGE_TYPE_CONNECT) {
  449. AWS_LOGF_DEBUG(
  450. AWS_LS_EVENT_STREAM_RPC_CLIENT,
  451. "id=%p: sending connect message, waiting on connect ack",
  452. (void *)connection);
  453. aws_atomic_store_int(&connection->handshake_state, CONNECTION_HANDSHAKE_STATE_CONNECT_PROCESSED);
  454. }
  455. args->flush_fn = flush_fn;
  456. size_t headers_count = operation_name ? message_args->headers_count + 4 : message_args->headers_count + 3;
  457. struct aws_array_list headers_list;
  458. AWS_ZERO_STRUCT(headers_list);
  459. if (aws_array_list_init_dynamic(
  460. &headers_list, connection->allocator, headers_count, sizeof(struct aws_event_stream_header_value_pair))) {
  461. AWS_LOGF_ERROR(
  462. AWS_LS_EVENT_STREAM_RPC_CLIENT,
  463. "id=%p: an error occurred while initializing the headers list %s",
  464. (void *)connection,
  465. aws_error_debug_str(aws_last_error()));
  466. goto args_allocated_before_failure;
  467. }
  468. /* since we preallocated the space for the headers, these can't fail, but we'll go ahead an assert on them just in
  469. * case */
  470. for (size_t i = 0; i < message_args->headers_count; ++i) {
  471. AWS_FATAL_ASSERT(!aws_array_list_push_back(&headers_list, &message_args->headers[i]));
  472. }
  473. AWS_FATAL_ASSERT(!aws_event_stream_add_int32_header(
  474. &headers_list,
  475. (const char *)aws_event_stream_rpc_message_type_name.ptr,
  476. (uint8_t)aws_event_stream_rpc_message_type_name.len,
  477. message_args->message_type));
  478. AWS_FATAL_ASSERT(!aws_event_stream_add_int32_header(
  479. &headers_list,
  480. (const char *)aws_event_stream_rpc_message_flags_name.ptr,
  481. (uint8_t)aws_event_stream_rpc_message_flags_name.len,
  482. message_args->message_flags));
  483. AWS_FATAL_ASSERT(!aws_event_stream_add_int32_header(
  484. &headers_list,
  485. (const char *)aws_event_stream_rpc_stream_id_name.ptr,
  486. (uint8_t)aws_event_stream_rpc_stream_id_name.len,
  487. stream_id));
  488. if (operation_name) {
  489. AWS_LOGF_DEBUG(
  490. AWS_LS_EVENT_STREAM_RPC_CLIENT,
  491. "id=%p: operation name specified " PRInSTR,
  492. (void *)connection,
  493. AWS_BYTE_CURSOR_PRI(*operation_name));
  494. AWS_FATAL_ASSERT(!aws_event_stream_add_string_header(
  495. &headers_list,
  496. (const char *)aws_event_stream_rpc_operation_name.ptr,
  497. (uint8_t)aws_event_stream_rpc_operation_name.len,
  498. (const char *)operation_name->ptr,
  499. (uint16_t)operation_name->len,
  500. 0));
  501. }
  502. int message_init_err_code =
  503. aws_event_stream_message_init(&args->message, connection->allocator, &headers_list, message_args->payload);
  504. aws_array_list_clean_up(&headers_list);
  505. if (message_init_err_code) {
  506. AWS_LOGF_ERROR(
  507. AWS_LS_EVENT_STREAM_RPC_CLIENT,
  508. "id=%p: message init failed with error %s",
  509. (void *)connection,
  510. aws_error_debug_str(aws_last_error()));
  511. goto args_allocated_before_failure;
  512. }
  513. aws_event_stream_rpc_client_connection_acquire(connection);
  514. if (aws_event_stream_channel_handler_write_message(
  515. connection->event_stream_handler, &args->message, s_on_protocol_message_written_fn, args)) {
  516. AWS_LOGF_ERROR(
  517. AWS_LS_EVENT_STREAM_RPC_CLIENT,
  518. "id=%p: writing message failed with error %s",
  519. (void *)connection,
  520. aws_error_debug_str(aws_last_error()));
  521. goto message_initialized_before_failure;
  522. }
  523. return AWS_OP_SUCCESS;
  524. message_initialized_before_failure:
  525. aws_event_stream_message_clean_up(&args->message);
  526. args_allocated_before_failure:
  527. aws_mem_release(args->allocator, args);
  528. aws_event_stream_rpc_client_connection_release(connection);
  529. return AWS_OP_ERR;
  530. }
  531. int aws_event_stream_rpc_client_connection_send_protocol_message(
  532. struct aws_event_stream_rpc_client_connection *connection,
  533. const struct aws_event_stream_rpc_message_args *message_args,
  534. aws_event_stream_rpc_client_message_flush_fn *flush_fn,
  535. void *user_data) {
  536. if (!aws_event_stream_rpc_client_connection_is_open(connection)) {
  537. return aws_raise_error(AWS_ERROR_EVENT_STREAM_RPC_CONNECTION_CLOSED);
  538. }
  539. return s_send_protocol_message(connection, NULL, NULL, message_args, 0, flush_fn, user_data);
  540. }
  541. static void s_connection_error_message_flush_fn(int error_code, void *user_data) {
  542. (void)error_code;
  543. struct aws_event_stream_rpc_client_connection *connection = user_data;
  544. aws_event_stream_rpc_client_connection_close(connection, AWS_ERROR_EVENT_STREAM_RPC_PROTOCOL_ERROR);
  545. }
  546. static void s_send_connection_level_error(
  547. struct aws_event_stream_rpc_client_connection *connection,
  548. uint32_t message_type,
  549. uint32_t message_flags,
  550. const struct aws_byte_cursor *message) {
  551. struct aws_byte_buf payload_buf = aws_byte_buf_from_array(message->ptr, message->len);
  552. AWS_LOGF_DEBUG(
  553. AWS_LS_EVENT_STREAM_RPC_CLIENT,
  554. "id=%p: sending connection-level error\n" PRInSTR,
  555. (void *)connection,
  556. AWS_BYTE_BUF_PRI(payload_buf));
  557. struct aws_event_stream_header_value_pair content_type_header =
  558. aws_event_stream_create_string_header(s_json_content_type_name, s_json_content_type_value);
  559. struct aws_event_stream_header_value_pair headers[] = {
  560. content_type_header,
  561. };
  562. struct aws_event_stream_rpc_message_args message_args = {
  563. .message_type = message_type,
  564. .message_flags = message_flags,
  565. .payload = &payload_buf,
  566. .headers_count = 1,
  567. .headers = headers,
  568. };
  569. aws_event_stream_rpc_client_connection_send_protocol_message(
  570. connection, &message_args, s_connection_error_message_flush_fn, connection);
  571. }
  572. static void s_route_message_by_type(
  573. struct aws_event_stream_rpc_client_connection *connection,
  574. struct aws_event_stream_message *message,
  575. struct aws_array_list *headers_list,
  576. uint32_t stream_id,
  577. uint32_t message_type,
  578. uint32_t message_flags) {
  579. struct aws_byte_buf payload_buf = aws_byte_buf_from_array(
  580. aws_event_stream_message_payload(message), aws_event_stream_message_payload_len(message));
  581. struct aws_event_stream_rpc_message_args message_args = {
  582. .headers = headers_list->data,
  583. .headers_count = aws_array_list_length(headers_list),
  584. .payload = &payload_buf,
  585. .message_flags = message_flags,
  586. .message_type = message_type,
  587. };
  588. size_t handshake_complete = aws_atomic_load_int(&connection->handshake_state);
  589. /* make sure if this is not a CONNECT message being received, the handshake has been completed. */
  590. if (handshake_complete < CONNECTION_HANDSHAKE_STATE_CONNECT_ACK_PROCESSED &&
  591. message_type != AWS_EVENT_STREAM_RPC_MESSAGE_TYPE_CONNECT_ACK) {
  592. AWS_LOGF_ERROR(
  593. AWS_LS_EVENT_STREAM_RPC_CLIENT,
  594. "id=%p: a message was received on this connection prior to the "
  595. "connect handshake completing",
  596. (void *)connection);
  597. aws_raise_error(AWS_ERROR_EVENT_STREAM_RPC_PROTOCOL_ERROR);
  598. s_send_connection_level_error(
  599. connection, AWS_EVENT_STREAM_RPC_MESSAGE_TYPE_PROTOCOL_ERROR, 0, &s_connect_not_completed_error);
  600. return;
  601. }
  602. /* stream_id being non zero ALWAYS indicates APPLICATION_DATA or APPLICATION_ERROR. */
  603. if (stream_id > 0) {
  604. AWS_LOGF_TRACE(AWS_LS_EVENT_STREAM_RPC_CLIENT, "id=%p: stream id %" PRIu32, (void *)connection, stream_id);
  605. struct aws_event_stream_rpc_client_continuation_token *continuation = NULL;
  606. if (message_type > AWS_EVENT_STREAM_RPC_MESSAGE_TYPE_APPLICATION_ERROR) {
  607. AWS_LOGF_ERROR(
  608. AWS_LS_EVENT_STREAM_RPC_CLIENT,
  609. "id=%p: only application messages can be sent on a stream id, "
  610. "but this message is the incorrect type",
  611. (void *)connection);
  612. aws_raise_error(AWS_ERROR_EVENT_STREAM_RPC_PROTOCOL_ERROR);
  613. s_send_connection_level_error(
  614. connection, AWS_EVENT_STREAM_RPC_MESSAGE_TYPE_PROTOCOL_ERROR, 0, &s_invalid_stream_id_error);
  615. return;
  616. }
  617. aws_mutex_lock(&connection->stream_lock);
  618. struct aws_hash_element *continuation_element = NULL;
  619. if (aws_hash_table_find(&connection->continuation_table, &stream_id, &continuation_element) ||
  620. !continuation_element) {
  621. aws_mutex_unlock(&connection->stream_lock);
  622. AWS_LOGF_ERROR(
  623. AWS_LS_EVENT_STREAM_RPC_CLIENT,
  624. "id=%p: a stream id was received that was not created by this client",
  625. (void *)connection);
  626. aws_raise_error(AWS_ERROR_EVENT_STREAM_RPC_PROTOCOL_ERROR);
  627. s_send_connection_level_error(
  628. connection, AWS_EVENT_STREAM_RPC_MESSAGE_TYPE_PROTOCOL_ERROR, 0, &s_invalid_client_stream_id_error);
  629. return;
  630. }
  631. aws_mutex_unlock(&connection->stream_lock);
  632. continuation = continuation_element->value;
  633. aws_event_stream_rpc_client_continuation_acquire(continuation);
  634. continuation->continuation_fn(continuation, &message_args, continuation->user_data);
  635. aws_event_stream_rpc_client_continuation_release(continuation);
  636. /* if it was a terminal stream message purge it from the hash table. The delete will decref the continuation. */
  637. if (message_flags & AWS_EVENT_STREAM_RPC_MESSAGE_FLAG_TERMINATE_STREAM) {
  638. AWS_LOGF_DEBUG(
  639. AWS_LS_EVENT_STREAM_RPC_CLIENT,
  640. "id=%p: the terminate stream flag was specified for continuation %p",
  641. (void *)connection,
  642. (void *)continuation);
  643. aws_atomic_store_int(&continuation->is_closed, 1U);
  644. aws_mutex_lock(&connection->stream_lock);
  645. aws_hash_table_remove(&connection->continuation_table, &stream_id, NULL, NULL);
  646. aws_mutex_unlock(&connection->stream_lock);
  647. /* Note that we do not invoke callback while holding lock */
  648. s_complete_continuation(continuation);
  649. }
  650. } else {
  651. if (message_type <= AWS_EVENT_STREAM_RPC_MESSAGE_TYPE_APPLICATION_ERROR ||
  652. message_type >= AWS_EVENT_STREAM_RPC_MESSAGE_TYPE_COUNT) {
  653. AWS_LOGF_ERROR(
  654. AWS_LS_EVENT_STREAM_RPC_CLIENT,
  655. "id=%p: a zero stream id was received with an invalid message-type %" PRIu32,
  656. (void *)connection,
  657. message_type);
  658. s_send_connection_level_error(
  659. connection, AWS_EVENT_STREAM_RPC_MESSAGE_TYPE_PROTOCOL_ERROR, 0, &s_invalid_message_type_error);
  660. return;
  661. }
  662. if (message_type == AWS_EVENT_STREAM_RPC_MESSAGE_TYPE_CONNECT_ACK) {
  663. if (handshake_complete != CONNECTION_HANDSHAKE_STATE_CONNECT_PROCESSED) {
  664. AWS_LOGF_ERROR(
  665. AWS_LS_EVENT_STREAM_RPC_CLIENT,
  666. "id=%p: connect ack received but the handshake is already completed. Only one is allowed.",
  667. (void *)connection);
  668. /* only one connect is allowed. This would be a duplicate. */
  669. s_send_connection_level_error(
  670. connection, AWS_EVENT_STREAM_RPC_MESSAGE_TYPE_PROTOCOL_ERROR, 0, &s_connect_not_completed_error);
  671. return;
  672. }
  673. aws_atomic_store_int(&connection->handshake_state, CONNECTION_HANDSHAKE_STATE_CONNECT_ACK_PROCESSED);
  674. AWS_LOGF_INFO(
  675. AWS_LS_EVENT_STREAM_RPC_CLIENT,
  676. "id=%p: connect ack received, connection handshake completed",
  677. (void *)connection);
  678. }
  679. connection->on_connection_protocol_message(connection, &message_args, connection->user_data);
  680. }
  681. }
  682. /* invoked by the event stream channel handler when a complete message has been read from the channel. */
  683. static void s_on_message_received(struct aws_event_stream_message *message, int error_code, void *user_data) {
  684. if (!error_code) {
  685. struct aws_event_stream_rpc_client_connection *connection = user_data;
  686. AWS_LOGF_TRACE(
  687. AWS_LS_EVENT_STREAM_RPC_CLIENT,
  688. "id=%p: message received on connection of length %" PRIu32,
  689. (void *)connection,
  690. aws_event_stream_message_total_length(message));
  691. struct aws_array_list headers;
  692. if (aws_array_list_init_dynamic(
  693. &headers, connection->allocator, 8, sizeof(struct aws_event_stream_header_value_pair))) {
  694. AWS_LOGF_ERROR(
  695. AWS_LS_EVENT_STREAM_RPC_CLIENT,
  696. "id=%p: error initializing headers %s",
  697. (void *)connection,
  698. aws_error_debug_str(aws_last_error()));
  699. s_send_connection_level_error(
  700. connection, AWS_EVENT_STREAM_RPC_MESSAGE_TYPE_INTERNAL_ERROR, 0, &s_internal_error);
  701. return;
  702. }
  703. if (aws_event_stream_message_headers(message, &headers)) {
  704. AWS_LOGF_ERROR(
  705. AWS_LS_EVENT_STREAM_RPC_CLIENT,
  706. "id=%p: error fetching headers %s",
  707. (void *)connection,
  708. aws_error_debug_str(aws_last_error()));
  709. s_send_connection_level_error(
  710. connection, AWS_EVENT_STREAM_RPC_MESSAGE_TYPE_INTERNAL_ERROR, 0, &s_internal_error);
  711. goto clean_up;
  712. }
  713. int32_t stream_id = -1;
  714. int32_t message_type = -1;
  715. int32_t message_flags = -1;
  716. struct aws_byte_buf operation_name_buf;
  717. AWS_ZERO_STRUCT(operation_name_buf);
  718. if (aws_event_stream_rpc_extract_message_metadata(
  719. &headers, &stream_id, &message_type, &message_flags, &operation_name_buf)) {
  720. AWS_LOGF_ERROR(
  721. AWS_LS_EVENT_STREAM_RPC_CLIENT,
  722. "id=%p: invalid protocol message with error %s",
  723. (void *)connection,
  724. aws_error_debug_str(aws_last_error()));
  725. s_send_connection_level_error(
  726. connection, AWS_EVENT_STREAM_RPC_MESSAGE_TYPE_PROTOCOL_ERROR, 0, &s_invalid_message_error);
  727. goto clean_up;
  728. }
  729. (void)operation_name_buf;
  730. AWS_LOGF_TRACE(AWS_LS_EVENT_STREAM_RPC_CLIENT, "id=%p: routing message", (void *)connection);
  731. s_route_message_by_type(connection, message, &headers, stream_id, message_type, message_flags);
  732. clean_up:
  733. aws_event_stream_headers_list_cleanup(&headers);
  734. }
  735. }
  736. struct aws_event_stream_rpc_client_continuation_token *aws_event_stream_rpc_client_connection_new_stream(
  737. struct aws_event_stream_rpc_client_connection *connection,
  738. const struct aws_event_stream_rpc_client_stream_continuation_options *continuation_options) {
  739. AWS_PRECONDITION(continuation_options->on_continuation_closed);
  740. AWS_PRECONDITION(continuation_options->on_continuation);
  741. AWS_LOGF_TRACE(AWS_LS_EVENT_STREAM_RPC_CLIENT, "id=%p: creating a new stream on connection", (void *)connection);
  742. struct aws_event_stream_rpc_client_continuation_token *continuation =
  743. aws_mem_calloc(connection->allocator, 1, sizeof(struct aws_event_stream_rpc_client_continuation_token));
  744. if (!continuation) {
  745. AWS_LOGF_ERROR(
  746. AWS_LS_EVENT_STREAM_RPC_CLIENT,
  747. "id=%p: error while allocating continuation %s",
  748. (void *)connection,
  749. aws_error_debug_str(aws_last_error()));
  750. return NULL;
  751. }
  752. AWS_LOGF_DEBUG(
  753. AWS_LS_EVENT_STREAM_RPC_CLIENT, "id=%p: continuation created %p", (void *)connection, (void *)continuation);
  754. continuation->connection = connection;
  755. aws_event_stream_rpc_client_connection_acquire(continuation->connection);
  756. aws_atomic_init_int(&continuation->ref_count, 1);
  757. aws_atomic_init_int(&continuation->is_closed, 0);
  758. continuation->continuation_fn = continuation_options->on_continuation;
  759. continuation->closed_fn = continuation_options->on_continuation_closed;
  760. continuation->user_data = continuation_options->user_data;
  761. return continuation;
  762. }
  763. void *aws_event_stream_rpc_client_continuation_get_user_data(
  764. struct aws_event_stream_rpc_client_continuation_token *continuation) {
  765. return continuation->user_data;
  766. }
  767. void aws_event_stream_rpc_client_continuation_acquire(
  768. const struct aws_event_stream_rpc_client_continuation_token *continuation) {
  769. size_t current_count = aws_atomic_fetch_add_explicit(
  770. &((struct aws_event_stream_rpc_client_continuation_token *)continuation)->ref_count,
  771. 1u,
  772. aws_memory_order_relaxed);
  773. AWS_LOGF_TRACE(
  774. AWS_LS_EVENT_STREAM_RPC_CLIENT,
  775. "id=%p: continuation acquired, new ref count is %zu.",
  776. (void *)continuation,
  777. current_count + 1);
  778. }
  779. void aws_event_stream_rpc_client_continuation_release(
  780. const struct aws_event_stream_rpc_client_continuation_token *continuation) {
  781. if (AWS_UNLIKELY(!continuation)) {
  782. return;
  783. }
  784. struct aws_event_stream_rpc_client_continuation_token *continuation_mut =
  785. (struct aws_event_stream_rpc_client_continuation_token *)continuation;
  786. size_t ref_count = aws_atomic_fetch_sub_explicit(&continuation_mut->ref_count, 1, aws_memory_order_seq_cst);
  787. AWS_LOGF_TRACE(
  788. AWS_LS_EVENT_STREAM_RPC_CLIENT,
  789. "id=%p: continuation released, new ref count is %zu.",
  790. (void *)continuation,
  791. ref_count - 1);
  792. if (ref_count == 1) {
  793. struct aws_allocator *allocator = continuation_mut->connection->allocator;
  794. aws_event_stream_rpc_client_connection_release(continuation_mut->connection);
  795. aws_mem_release(allocator, continuation_mut);
  796. }
  797. }
  798. bool aws_event_stream_rpc_client_continuation_is_closed(
  799. const struct aws_event_stream_rpc_client_continuation_token *continuation) {
  800. return aws_atomic_load_int(&continuation->is_closed) == 1u;
  801. }
  802. int aws_event_stream_rpc_client_continuation_activate(
  803. struct aws_event_stream_rpc_client_continuation_token *continuation,
  804. struct aws_byte_cursor operation_name,
  805. const struct aws_event_stream_rpc_message_args *message_args,
  806. aws_event_stream_rpc_client_message_flush_fn *flush_fn,
  807. void *user_data) {
  808. AWS_LOGF_TRACE(AWS_LS_EVENT_STREAM_RPC_CLIENT, "id=%p: activating continuation", (void *)continuation);
  809. int ret_val = AWS_OP_ERR;
  810. aws_mutex_lock(&continuation->connection->stream_lock);
  811. if (continuation->stream_id) {
  812. AWS_LOGF_ERROR(AWS_LS_EVENT_STREAM_RPC_CLIENT, "id=%p: stream has already been activated", (void *)continuation)
  813. aws_raise_error(AWS_ERROR_INVALID_STATE);
  814. goto clean_up;
  815. }
  816. /* Even though is_open is atomic, we need to hold a lock while checking it.
  817. * This lets us coordinate with code that sets is_open to false. */
  818. if (!aws_event_stream_rpc_client_connection_is_open(continuation->connection)) {
  819. AWS_LOGF_ERROR(AWS_LS_EVENT_STREAM_RPC_CLIENT, "id=%p: stream's connection is not open", (void *)continuation)
  820. aws_raise_error(AWS_ERROR_EVENT_STREAM_RPC_CONNECTION_CLOSED);
  821. goto clean_up;
  822. }
  823. /* we cannot update the connection's stream id until we're certain the message at least made it to the wire, because
  824. * the next stream id must be consecutively increasing by 1. So send the message then update the connection state
  825. * once we've made it to the wire. */
  826. continuation->stream_id = continuation->connection->latest_stream_id + 1;
  827. AWS_LOGF_DEBUG(
  828. AWS_LS_EVENT_STREAM_RPC_CLIENT,
  829. "id=%p: continuation's new stream id is %" PRIu32,
  830. (void *)continuation,
  831. continuation->stream_id);
  832. if (aws_hash_table_put(
  833. &continuation->connection->continuation_table, &continuation->stream_id, continuation, NULL)) {
  834. AWS_LOGF_ERROR(
  835. AWS_LS_EVENT_STREAM_RPC_CLIENT,
  836. "id=%p: storing the new stream failed with %s",
  837. (void *)continuation,
  838. aws_error_debug_str(aws_last_error()));
  839. continuation->stream_id = 0;
  840. goto clean_up;
  841. }
  842. if (s_send_protocol_message(
  843. continuation->connection,
  844. continuation,
  845. &operation_name,
  846. message_args,
  847. continuation->stream_id,
  848. flush_fn,
  849. user_data)) {
  850. aws_hash_table_remove(&continuation->connection->continuation_table, &continuation->stream_id, NULL, NULL);
  851. continuation->stream_id = 0;
  852. AWS_LOGF_ERROR(
  853. AWS_LS_EVENT_STREAM_RPC_CLIENT,
  854. "id=%p: failed to flush the new stream to the channel with error %s",
  855. (void *)continuation,
  856. aws_error_debug_str(aws_last_error()));
  857. goto clean_up;
  858. }
  859. /* The continuation table gets a ref count on the continuation. Take it here. */
  860. aws_event_stream_rpc_client_continuation_acquire(continuation);
  861. continuation->connection->latest_stream_id = continuation->stream_id;
  862. ret_val = AWS_OP_SUCCESS;
  863. clean_up:
  864. aws_mutex_unlock(&continuation->connection->stream_lock);
  865. return ret_val;
  866. }
  867. int aws_event_stream_rpc_client_continuation_send_message(
  868. struct aws_event_stream_rpc_client_continuation_token *continuation,
  869. const struct aws_event_stream_rpc_message_args *message_args,
  870. aws_event_stream_rpc_client_message_flush_fn *flush_fn,
  871. void *user_data) {
  872. if (aws_event_stream_rpc_client_continuation_is_closed(continuation)) {
  873. return aws_raise_error(AWS_ERROR_EVENT_STREAM_RPC_STREAM_CLOSED);
  874. }
  875. if (!continuation->stream_id) {
  876. return aws_raise_error(AWS_ERROR_EVENT_STREAM_RPC_STREAM_NOT_ACTIVATED);
  877. }
  878. return s_send_protocol_message(
  879. continuation->connection, continuation, NULL, message_args, continuation->stream_id, flush_fn, user_data);
  880. }