event_stream_channel_handler.c 23 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547
  1. /*
  2. * Copyright 2010-2020 Amazon.com, Inc. or its affiliates. All Rights Reserved.
  3. *
  4. * Licensed under the Apache License, Version 2.0 (the "License").
  5. * You may not use this file except in compliance with the License.
  6. * A copy of the License is located at
  7. *
  8. * http://aws.amazon.com/apache2.0
  9. *
  10. * or in the "license" file accompanying this file. This file is distributed
  11. * on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either
  12. * express or implied. See the License for the specific language governing
  13. * permissions and limitations under the License.
  14. */
  15. #include <aws/event-stream/event_stream.h>
  16. #include <aws/event-stream/event_stream_channel_handler.h>
  17. #include <aws/checksums/crc.h>
  18. #include <aws/io/channel.h>
  19. #include <inttypes.h>
  20. static const size_t s_default_payload_size = 1024;
  21. /* an event stream message has overhead of
  22. * [msg len (uint32_t)]
  23. * [headers len (uint32_t)]
  24. * [prelude crc (uint32_t)]
  25. * ... headers and payload ....
  26. * [message crc (uint32_t)]
  27. */
  28. static const size_t s_message_overhead_size = AWS_EVENT_STREAM_PRELUDE_LENGTH + AWS_EVENT_STREAM_TRAILER_LENGTH;
  29. struct aws_event_stream_channel_handler {
  30. struct aws_channel_handler handler;
  31. struct aws_byte_buf message_buf;
  32. uint32_t running_crc;
  33. uint32_t current_message_len;
  34. aws_event_stream_channel_handler_on_message_received_fn *on_message_received;
  35. void *user_data;
  36. size_t initial_window_size;
  37. bool manual_window_management;
  38. };
  39. static int s_process_read_message(
  40. struct aws_channel_handler *handler,
  41. struct aws_channel_slot *slot,
  42. struct aws_io_message *message) {
  43. AWS_LOGF_TRACE(
  44. AWS_LS_EVENT_STREAM_CHANNEL_HANDLER,
  45. "id=%p: received message of size %zu",
  46. (void *)handler,
  47. message->message_data.len);
  48. struct aws_event_stream_channel_handler *event_stream_handler = handler->impl;
  49. struct aws_byte_cursor message_cursor = aws_byte_cursor_from_buf(&message->message_data);
  50. int error_code = AWS_ERROR_SUCCESS;
  51. while (message_cursor.len) {
  52. AWS_LOGF_TRACE(
  53. AWS_LS_EVENT_STREAM_CHANNEL_HANDLER,
  54. "id=%p: processing chunk of size %zu",
  55. (void *)handler,
  56. message_cursor.len);
  57. /* first read only the prelude so we can do checks before reading the entire buffer. */
  58. if (event_stream_handler->message_buf.len < AWS_EVENT_STREAM_PRELUDE_LENGTH) {
  59. size_t remaining_prelude = AWS_EVENT_STREAM_PRELUDE_LENGTH - event_stream_handler->message_buf.len;
  60. size_t to_copy = aws_min_size(message_cursor.len, remaining_prelude);
  61. AWS_LOGF_TRACE(
  62. AWS_LS_EVENT_STREAM_CHANNEL_HANDLER,
  63. "id=%p: processing prelude, %zu bytes of an expected 12.",
  64. (void *)handler,
  65. to_copy);
  66. if (!aws_byte_buf_write(&event_stream_handler->message_buf, message_cursor.ptr, to_copy)) {
  67. error_code = aws_last_error();
  68. AWS_LOGF_ERROR(
  69. AWS_LS_EVENT_STREAM_CHANNEL_HANDLER,
  70. "id=%p: writing to prelude buffer failed with error %s",
  71. (void *)handler,
  72. aws_error_debug_str(error_code));
  73. goto finished;
  74. }
  75. aws_byte_cursor_advance(&message_cursor, to_copy);
  76. }
  77. /* we need to get the prelude so we can get the message length to know how much to read and also
  78. * to check the prelude CRC to protect against bit-flips causing us to read to much memory */
  79. if (event_stream_handler->message_buf.len == AWS_EVENT_STREAM_PRELUDE_LENGTH) {
  80. AWS_LOGF_TRACE(AWS_LS_EVENT_STREAM_CHANNEL_HANDLER, "id=%p: processing prelude buffer", (void *)handler);
  81. struct aws_byte_cursor prelude_cursor = aws_byte_cursor_from_buf(&event_stream_handler->message_buf);
  82. event_stream_handler->running_crc =
  83. aws_checksums_crc32(prelude_cursor.ptr, sizeof(uint32_t) + sizeof(uint32_t), 0);
  84. AWS_LOGF_DEBUG(
  85. AWS_LS_EVENT_STREAM_CHANNEL_HANDLER,
  86. "id=%p: calculated prelude CRC of %" PRIu32,
  87. (void *)handler,
  88. event_stream_handler->running_crc);
  89. aws_byte_cursor_read_be32(&prelude_cursor, &event_stream_handler->current_message_len);
  90. AWS_LOGF_DEBUG(
  91. AWS_LS_EVENT_STREAM_CHANNEL_HANDLER,
  92. "id=%p: read total message length of %" PRIu32,
  93. (void *)handler,
  94. event_stream_handler->current_message_len);
  95. if (event_stream_handler->current_message_len > AWS_EVENT_STREAM_MAX_MESSAGE_SIZE) {
  96. AWS_LOGF_ERROR(
  97. AWS_LS_EVENT_STREAM_CHANNEL_HANDLER,
  98. "id=%p: message length of %" PRIu32 " exceeds the max size of %zu",
  99. (void *)handler,
  100. event_stream_handler->current_message_len,
  101. (size_t)AWS_EVENT_STREAM_MAX_MESSAGE_SIZE);
  102. aws_raise_error(AWS_ERROR_EVENT_STREAM_MESSAGE_FIELD_SIZE_EXCEEDED);
  103. error_code = aws_last_error();
  104. goto finished;
  105. }
  106. /* advance past the headers field since we don't really care about it at this point */
  107. aws_byte_cursor_advance(&prelude_cursor, sizeof(uint32_t));
  108. uint32_t prelude_crc = 0;
  109. aws_byte_cursor_read_be32(&prelude_cursor, &prelude_crc);
  110. AWS_LOGF_DEBUG(
  111. AWS_LS_EVENT_STREAM_CHANNEL_HANDLER,
  112. "id=%p: read prelude CRC of %" PRIu32,
  113. (void *)handler,
  114. prelude_crc);
  115. /* make sure the checksum matches before processing any further */
  116. if (event_stream_handler->running_crc != prelude_crc) {
  117. AWS_LOGF_ERROR(
  118. AWS_LS_EVENT_STREAM_CHANNEL_HANDLER,
  119. "id=%p: prelude CRC mismatch. calculated %" PRIu32 " but the crc for the message was %" PRIu32,
  120. (void *)handler,
  121. event_stream_handler->running_crc,
  122. prelude_crc);
  123. aws_raise_error(AWS_ERROR_EVENT_STREAM_PRELUDE_CHECKSUM_FAILURE);
  124. error_code = aws_last_error();
  125. goto finished;
  126. }
  127. }
  128. /* read whatever is remaining from the message */
  129. if (event_stream_handler->message_buf.len < event_stream_handler->current_message_len) {
  130. AWS_LOGF_TRACE(
  131. AWS_LS_EVENT_STREAM_CHANNEL_HANDLER, "id=%p: processing remaining message buffer", (void *)handler);
  132. size_t remaining = event_stream_handler->current_message_len - event_stream_handler->message_buf.len;
  133. size_t to_copy = aws_min_size(message_cursor.len, remaining);
  134. AWS_LOGF_TRACE(
  135. AWS_LS_EVENT_STREAM_CHANNEL_HANDLER,
  136. "id=%p: of the remaining %zu, processing %zu from the "
  137. "current message.",
  138. (void *)handler,
  139. remaining,
  140. to_copy);
  141. struct aws_byte_cursor to_append = aws_byte_cursor_advance(&message_cursor, to_copy);
  142. if (aws_byte_buf_append_dynamic(&event_stream_handler->message_buf, &to_append)) {
  143. error_code = aws_last_error();
  144. AWS_LOGF_ERROR(
  145. AWS_LS_EVENT_STREAM_CHANNEL_HANDLER,
  146. "id=%p: Appending to the message buffer failed with error %s.",
  147. (void *)handler,
  148. aws_error_debug_str(error_code));
  149. goto finished;
  150. }
  151. }
  152. /* If we read the entire message, parse it and give it back to the subscriber. Keep in mind, once we're to this
  153. * point the aws_event_stream API handles the rest of the message parsing and validation. */
  154. if (event_stream_handler->message_buf.len == event_stream_handler->current_message_len) {
  155. AWS_LOGF_TRACE(
  156. AWS_LS_EVENT_STREAM_CHANNEL_HANDLER,
  157. "id=%p: An entire message has been read. Parsing the message now.",
  158. (void *)handler);
  159. struct aws_event_stream_message received_message;
  160. AWS_ZERO_STRUCT(received_message);
  161. if (aws_event_stream_message_from_buffer(
  162. &received_message, event_stream_handler->handler.alloc, &event_stream_handler->message_buf)) {
  163. error_code = aws_last_error();
  164. AWS_LOGF_ERROR(
  165. AWS_LS_EVENT_STREAM_CHANNEL_HANDLER,
  166. "id=%p: Parsing the message failed with error %s.",
  167. (void *)handler,
  168. aws_error_debug_str(error_code));
  169. goto finished;
  170. }
  171. size_t message_size = event_stream_handler->message_buf.len;
  172. AWS_LOGF_TRACE(
  173. AWS_LS_EVENT_STREAM_CHANNEL_HANDLER, "id=%p: Invoking on_message_received callback.", (void *)handler);
  174. event_stream_handler->on_message_received(
  175. &received_message, AWS_ERROR_SUCCESS, event_stream_handler->user_data);
  176. aws_event_stream_message_clean_up(&received_message);
  177. event_stream_handler->current_message_len = 0;
  178. event_stream_handler->running_crc = 0;
  179. aws_byte_buf_reset(&event_stream_handler->message_buf, true);
  180. if (!event_stream_handler->manual_window_management) {
  181. aws_channel_slot_increment_read_window(slot, message_size);
  182. }
  183. }
  184. }
  185. finished:
  186. if (error_code) {
  187. event_stream_handler->on_message_received(NULL, error_code, event_stream_handler->user_data);
  188. aws_channel_shutdown(slot->channel, error_code);
  189. }
  190. aws_mem_release(message->allocator, message);
  191. return AWS_OP_SUCCESS;
  192. }
  193. struct message_write_data {
  194. struct aws_allocator *allocator;
  195. struct aws_channel_task task;
  196. struct aws_event_stream_channel_handler *handler;
  197. struct aws_event_stream_message *message;
  198. aws_event_stream_channel_handler_on_message_written_fn *on_message_written;
  199. void *user_data;
  200. };
  201. static void s_on_message_write_completed_fn(
  202. struct aws_channel *channel,
  203. struct aws_io_message *message,
  204. int err_code,
  205. void *user_data) {
  206. (void)channel;
  207. (void)message;
  208. struct message_write_data *message_data = user_data;
  209. AWS_LOGF_TRACE(
  210. AWS_LS_EVENT_STREAM_CHANNEL_HANDLER,
  211. "channel=%p: Message write completed. Invoking "
  212. "on_message_written callback.",
  213. (void *)channel);
  214. message_data->on_message_written(message_data->message, err_code, message_data->user_data);
  215. aws_mem_release(message_data->allocator, message_data);
  216. }
  217. static void s_write_handler_message(struct aws_channel_task *task, void *arg, enum aws_task_status status) {
  218. (void)task;
  219. struct message_write_data *message_data = arg;
  220. AWS_LOGF_TRACE(AWS_LS_EVENT_STREAM_CHANNEL_HANDLER, "static: Write message task invoked.");
  221. if (status == AWS_TASK_STATUS_RUN_READY) {
  222. struct aws_event_stream_message *message = message_data->message;
  223. struct aws_event_stream_channel_handler *handler = message_data->handler;
  224. struct aws_byte_cursor message_cur = aws_byte_cursor_from_array(
  225. aws_event_stream_message_buffer(message), aws_event_stream_message_total_length(message));
  226. while (message_cur.len) {
  227. AWS_LOGF_TRACE(
  228. AWS_LS_EVENT_STREAM_CHANNEL_HANDLER,
  229. "id=%p: writing message chunk of size %zu.",
  230. (void *)&handler->handler,
  231. message_cur.len);
  232. /* io messages from the pool are allowed to be smaller than the requested size. */
  233. struct aws_io_message *io_message = aws_channel_acquire_message_from_pool(
  234. handler->handler.slot->channel, AWS_IO_MESSAGE_APPLICATION_DATA, message_cur.len);
  235. if (!io_message) {
  236. int error_code = aws_last_error();
  237. AWS_LOGF_ERROR(
  238. AWS_LS_EVENT_STREAM_CHANNEL_HANDLER,
  239. "id=%p: Error occurred while acquiring io message %s.",
  240. (void *)&handler->handler,
  241. aws_error_debug_str(error_code));
  242. message_data->on_message_written(message, error_code, message_data->user_data);
  243. aws_mem_release(message_data->allocator, message_data);
  244. aws_channel_shutdown(handler->handler.slot->channel, error_code);
  245. break;
  246. }
  247. aws_byte_buf_write_to_capacity(&io_message->message_data, &message_cur);
  248. /* if that was the end of the buffer we want to write, attach the completion callback to that io message */
  249. if (message_cur.len == 0) {
  250. AWS_LOGF_TRACE(
  251. AWS_LS_EVENT_STREAM_CHANNEL_HANDLER,
  252. "id=%p: Message completely written to all io buffers.",
  253. (void *)&handler->handler);
  254. io_message->on_completion = s_on_message_write_completed_fn;
  255. io_message->user_data = message_data;
  256. }
  257. /* note if this fails the io message will not be queued and as a result will not have it's completion
  258. * callback invoked. */
  259. if (aws_channel_slot_send_message(handler->handler.slot, io_message, AWS_CHANNEL_DIR_WRITE)) {
  260. aws_mem_release(io_message->allocator, io_message);
  261. int error_code = aws_last_error();
  262. AWS_LOGF_ERROR(
  263. AWS_LS_EVENT_STREAM_CHANNEL_HANDLER,
  264. "id=%p: Error occurred while sending message to channel %s.",
  265. (void *)&handler->handler,
  266. aws_error_debug_str(error_code));
  267. message_data->on_message_written(message, error_code, message_data->user_data);
  268. aws_mem_release(message_data->allocator, message_data);
  269. aws_channel_shutdown(handler->handler.slot->channel, error_code);
  270. break;
  271. }
  272. AWS_LOGF_TRACE(
  273. AWS_LS_EVENT_STREAM_CHANNEL_HANDLER, "id=%p: Message sent to channel", (void *)&handler->handler);
  274. }
  275. } else {
  276. AWS_LOGF_WARN(AWS_LS_EVENT_STREAM_CHANNEL_HANDLER, "static: Channel was shutdown. Message not sent");
  277. message_data->on_message_written(
  278. message_data->message, AWS_ERROR_IO_OPERATION_CANCELLED, message_data->user_data);
  279. aws_mem_release(message_data->allocator, message_data);
  280. }
  281. }
  282. int aws_event_stream_channel_handler_write_message(
  283. struct aws_channel_handler *channel_handler,
  284. struct aws_event_stream_message *message,
  285. aws_event_stream_channel_handler_on_message_written_fn *on_message_written,
  286. void *user_data) {
  287. AWS_PRECONDITION(channel_handler);
  288. AWS_PRECONDITION(message);
  289. AWS_PRECONDITION(on_message_written);
  290. struct aws_event_stream_channel_handler *handler = channel_handler->impl;
  291. struct message_write_data *write_data =
  292. aws_mem_calloc(handler->handler.alloc, 1, sizeof(struct message_write_data));
  293. if (!write_data) {
  294. AWS_LOGF_ERROR(
  295. AWS_LS_EVENT_STREAM_CHANNEL_HANDLER,
  296. "id=%p: Error occurred while allocating callback data %s.",
  297. (void *)channel_handler,
  298. aws_error_debug_str(aws_last_error()));
  299. aws_channel_shutdown(channel_handler->slot->channel, aws_last_error());
  300. return AWS_OP_ERR;
  301. }
  302. write_data->handler = handler;
  303. write_data->user_data = user_data;
  304. write_data->message = message;
  305. write_data->on_message_written = on_message_written;
  306. write_data->allocator = handler->handler.alloc;
  307. AWS_LOGF_TRACE(
  308. AWS_LS_EVENT_STREAM_CHANNEL_HANDLER, "id=%p: Scheduling message write task", (void *)channel_handler);
  309. aws_channel_task_init(
  310. &write_data->task, s_write_handler_message, write_data, "aws_event_stream_channel_handler_write_message");
  311. aws_channel_schedule_task_now_serialized(handler->handler.slot->channel, &write_data->task);
  312. return AWS_OP_SUCCESS;
  313. }
  314. void *aws_event_stream_channel_handler_get_user_data(struct aws_channel_handler *channel_handler) {
  315. struct aws_event_stream_channel_handler *handler = channel_handler->impl;
  316. return handler->user_data;
  317. }
  318. struct window_update_data {
  319. struct aws_allocator *allocator;
  320. struct aws_channel_task task;
  321. struct aws_event_stream_channel_handler *handler;
  322. size_t window_update_size;
  323. };
  324. static void s_update_window_task(struct aws_channel_task *task, void *arg, enum aws_task_status status) {
  325. (void)task;
  326. struct window_update_data *update_data = arg;
  327. if (status == AWS_TASK_STATUS_RUN_READY) {
  328. AWS_LOGF_DEBUG(
  329. AWS_LS_EVENT_STREAM_CHANNEL_HANDLER,
  330. "static: updating window. increment of %zu",
  331. update_data->window_update_size);
  332. aws_channel_slot_increment_read_window(update_data->handler->handler.slot, update_data->window_update_size);
  333. }
  334. aws_mem_release(update_data->allocator, update_data);
  335. }
  336. void aws_event_stream_channel_handler_increment_read_window(
  337. struct aws_channel_handler *channel_handler,
  338. size_t window_update_size) {
  339. AWS_PRECONDITION(channel_handler);
  340. struct aws_event_stream_channel_handler *handler = channel_handler->impl;
  341. if (!handler->manual_window_management) {
  342. return;
  343. }
  344. AWS_LOGF_DEBUG(
  345. AWS_LS_EVENT_STREAM_CHANNEL_HANDLER,
  346. "id=%p: A user requested window update and manual window management is specified. Updating size of %zu",
  347. (void *)channel_handler,
  348. window_update_size);
  349. if (aws_channel_thread_is_callers_thread(handler->handler.slot->channel)) {
  350. if (aws_channel_slot_increment_read_window(handler->handler.slot, window_update_size)) {
  351. aws_channel_shutdown(handler->handler.slot->channel, aws_last_error());
  352. return;
  353. }
  354. }
  355. struct window_update_data *update_data =
  356. aws_mem_calloc(handler->handler.alloc, 1, sizeof(struct window_update_data));
  357. if (!update_data) {
  358. AWS_LOGF_ERROR(
  359. AWS_LS_EVENT_STREAM_CHANNEL_HANDLER,
  360. "id=%p: Error occurred while allocating update window data %s.",
  361. (void *)channel_handler,
  362. aws_error_debug_str(aws_last_error()));
  363. aws_channel_shutdown(handler->handler.slot->channel, aws_last_error());
  364. return;
  365. }
  366. update_data->allocator = handler->handler.alloc;
  367. update_data->handler = handler;
  368. update_data->window_update_size = window_update_size;
  369. aws_channel_task_init(
  370. &update_data->task,
  371. s_update_window_task,
  372. update_data,
  373. "aws_event_stream_channel_handler_increment_read_window");
  374. aws_channel_schedule_task_now(handler->handler.slot->channel, &update_data->task);
  375. }
  376. static int s_process_write_message(
  377. struct aws_channel_handler *handler,
  378. struct aws_channel_slot *slot,
  379. struct aws_io_message *message) {
  380. (void)handler;
  381. (void)slot;
  382. (void)message;
  383. AWS_FATAL_ASSERT(!"The event-stream-channel-handler is not designed to be a mid-channel handler.");
  384. return AWS_OP_ERR;
  385. }
  386. static int s_increment_read_window(struct aws_channel_handler *handler, struct aws_channel_slot *slot, size_t size) {
  387. (void)handler;
  388. return aws_channel_slot_increment_read_window(slot, size);
  389. }
  390. static size_t s_initial_window_size(struct aws_channel_handler *handler) {
  391. struct aws_event_stream_channel_handler *message_handler = handler->impl;
  392. return message_handler->initial_window_size;
  393. }
  394. static size_t s_message_overhead(struct aws_channel_handler *handler) {
  395. (void)handler;
  396. return s_message_overhead_size;
  397. }
  398. static void s_destroy(struct aws_channel_handler *handler) {
  399. AWS_LOGF_DEBUG(
  400. AWS_LS_EVENT_STREAM_CHANNEL_HANDLER,
  401. "id=%p: destroying event-stream message channel handler.",
  402. (void *)handler);
  403. struct aws_event_stream_channel_handler *event_stream_handler = handler->impl;
  404. aws_byte_buf_clean_up(&event_stream_handler->message_buf);
  405. aws_mem_release(handler->alloc, event_stream_handler);
  406. }
  407. static int s_shutdown(
  408. struct aws_channel_handler *handler,
  409. struct aws_channel_slot *slot,
  410. enum aws_channel_direction dir,
  411. int error_code,
  412. bool free_scarce_resources_immediately) {
  413. (void)handler;
  414. AWS_LOGF_DEBUG(
  415. AWS_LS_EVENT_STREAM_CHANNEL_HANDLER,
  416. "id=%p: shutdown called on event-stream channel handler with error %s.",
  417. (void *)handler,
  418. aws_error_debug_str(error_code));
  419. return aws_channel_slot_on_handler_shutdown_complete(slot, dir, error_code, free_scarce_resources_immediately);
  420. }
  421. static struct aws_channel_handler_vtable vtable = {
  422. .destroy = s_destroy,
  423. .increment_read_window = s_increment_read_window,
  424. .initial_window_size = s_initial_window_size,
  425. .process_read_message = s_process_read_message,
  426. .process_write_message = s_process_write_message,
  427. .message_overhead = s_message_overhead,
  428. .shutdown = s_shutdown,
  429. };
  430. struct aws_channel_handler *aws_event_stream_channel_handler_new(
  431. struct aws_allocator *allocator,
  432. const struct aws_event_stream_channel_handler_options *handler_options) {
  433. AWS_PRECONDITION(allocator);
  434. AWS_PRECONDITION(handler_options);
  435. AWS_PRECONDITION(handler_options->on_message_received);
  436. AWS_LOGF_INFO(AWS_LS_EVENT_STREAM_CHANNEL_HANDLER, "static: creating new event-stream message channel handler.");
  437. struct aws_event_stream_channel_handler *event_stream_handler =
  438. aws_mem_calloc(allocator, 1, sizeof(struct aws_event_stream_channel_handler));
  439. if (!event_stream_handler) {
  440. AWS_LOGF_ERROR(
  441. AWS_LS_EVENT_STREAM_CHANNEL_HANDLER,
  442. "static: Error occurred while allocating handler %s.",
  443. aws_error_debug_str(aws_last_error()));
  444. return NULL;
  445. }
  446. AWS_LOGF_DEBUG(AWS_LS_EVENT_STREAM_RPC_CLIENT, "static: new handler is %p", (void *)&event_stream_handler->handler);
  447. if (aws_byte_buf_init(
  448. &event_stream_handler->message_buf, allocator, s_default_payload_size + s_message_overhead_size)) {
  449. AWS_LOGF_ERROR(
  450. AWS_LS_EVENT_STREAM_CHANNEL_HANDLER,
  451. "id=%p: Error occurred while allocating scratch buffer %s.",
  452. (void *)&event_stream_handler->handler,
  453. aws_error_debug_str(aws_last_error()));
  454. aws_mem_release(allocator, event_stream_handler);
  455. return NULL;
  456. }
  457. event_stream_handler->on_message_received = handler_options->on_message_received;
  458. event_stream_handler->user_data = handler_options->user_data;
  459. event_stream_handler->initial_window_size =
  460. handler_options->initial_window_size > 0 ? handler_options->initial_window_size : SIZE_MAX;
  461. event_stream_handler->manual_window_management = handler_options->manual_window_management;
  462. event_stream_handler->handler.vtable = &vtable;
  463. event_stream_handler->handler.alloc = allocator;
  464. event_stream_handler->handler.impl = event_stream_handler;
  465. return &event_stream_handler->handler;
  466. }