websocket.c 69 KB

12345678910111213141516171819202122232425262728293031323334353637383940414243444546474849505152535455565758596061626364656667686970717273747576777879808182838485868788899091929394959697989910010110210310410510610710810911011111211311411511611711811912012112212312412512612712812913013113213313413513613713813914014114214314414514614714814915015115215315415515615715815916016116216316416516616716816917017117217317417517617717817918018118218318418518618718818919019119219319419519619719819920020120220320420520620720820921021121221321421521621721821922022122222322422522622722822923023123223323423523623723823924024124224324424524624724824925025125225325425525625725825926026126226326426526626726826927027127227327427527627727827928028128228328428528628728828929029129229329429529629729829930030130230330430530630730830931031131231331431531631731831932032132232332432532632732832933033133233333433533633733833934034134234334434534634734834935035135235335435535635735835936036136236336436536636736836937037137237337437537637737837938038138238338438538638738838939039139239339439539639739839940040140240340440540640740840941041141241341441541641741841942042142242342442542642742842943043143243343443543643743843944044144244344444544644744844945045145245345445545645745845946046146246346446546646746846947047147247347447547647747847948048148248348448548648748848949049149249349449549649749849950050150250350450550650750850951051151251351451551651751851952052152252352452552652752852953053153253353453553653753853954054154254354454554654754854955055155255355455555655755855956056156256356456556656756856957057157257357457557657757857958058158258358458558658758858959059159259359459559659759859960060160260360460560660760860961061161261361461561661761861962062162262362462562662762862963063163263363463563663763863964064164264364464564664764864965065165265365465565665765865966066166266366466566666766866967067167267367467567667767867968068168268368468568668768868969069169269369469569669769869970070170270370470570670770870971071171271371471571671771871972072172272372472572672772872973073173273373473573673773873974074174274374474574674774874975075175275375475575675775875976076176276376476576676776876977077177277377477577677777877978078178278378478578678778878979079179279379479579679779879980080180280380480580680780880981081181281381481581681781881982082182282382482582682782882983083183283383483583683783883984084184284384484584684784884985085185285385485585685785885986086186286386486586686786886987087187287387487587687787887988088188288388488588688788888989089189289389489589689789889990090190290390490590690790890991091191291391491591691791891992092192292392492592692792892993093193293393493593693793893994094194294394494594694794894995095195295395495595695795895996096196296396496596696796896997097197297397497597697797897998098198298398498598698798898999099199299399499599699799899910001001100210031004100510061007100810091010101110121013101410151016101710181019102010211022102310241025102610271028102910301031103210331034103510361037103810391040104110421043104410451046104710481049105010511052105310541055105610571058105910601061106210631064106510661067106810691070107110721073107410751076107710781079108010811082108310841085108610871088108910901091109210931094109510961097109810991100110111021103110411051106110711081109111011111112111311141115111611171118111911201121112211231124112511261127112811291130113111321133113411351136113711381139114011411142114311441145114611471148114911501151115211531154115511561157115811591160116111621163116411651166116711681169117011711172117311741175117611771178117911801181118211831184118511861187118811891190119111921193119411951196119711981199120012011202120312041205120612071208120912101211121212131214121512161217121812191220122112221223122412251226122712281229123012311232123312341235123612371238123912401241124212431244124512461247124812491250125112521253125412551256125712581259126012611262126312641265126612671268126912701271127212731274127512761277127812791280128112821283128412851286128712881289129012911292129312941295129612971298129913001301130213031304130513061307130813091310131113121313131413151316131713181319132013211322132313241325132613271328132913301331133213331334133513361337133813391340134113421343134413451346134713481349135013511352135313541355135613571358135913601361136213631364136513661367136813691370137113721373137413751376137713781379138013811382138313841385138613871388138913901391139213931394139513961397139813991400140114021403140414051406140714081409141014111412141314141415141614171418141914201421142214231424142514261427142814291430143114321433143414351436143714381439144014411442144314441445144614471448144914501451145214531454145514561457145814591460146114621463146414651466146714681469147014711472147314741475147614771478147914801481148214831484148514861487148814891490149114921493149414951496149714981499150015011502150315041505150615071508150915101511151215131514151515161517151815191520152115221523152415251526152715281529153015311532153315341535153615371538153915401541154215431544154515461547154815491550155115521553155415551556155715581559156015611562156315641565156615671568156915701571157215731574157515761577157815791580158115821583158415851586158715881589159015911592159315941595159615971598159916001601160216031604160516061607160816091610161116121613161416151616161716181619162016211622162316241625162616271628162916301631163216331634163516361637163816391640164116421643164416451646164716481649165016511652165316541655165616571658165916601661166216631664166516661667166816691670167116721673167416751676167716781679168016811682168316841685168616871688168916901691169216931694169516961697169816991700170117021703170417051706170717081709171017111712171317141715171617171718171917201721172217231724172517261727172817291730173117321733173417351736173717381739174017411742174317441745174617471748174917501751175217531754175517561757175817591760176117621763176417651766176717681769177017711772177317741775177617771778177917801781178217831784178517861787178817891790
  1. /**
  2. * Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved.
  3. * SPDX-License-Identifier: Apache-2.0.
  4. */
  5. #include <aws/http/private/websocket_impl.h>
  6. #include <aws/common/atomics.h>
  7. #include <aws/common/device_random.h>
  8. #include <aws/common/encoding.h>
  9. #include <aws/common/mutex.h>
  10. #include <aws/common/ref_count.h>
  11. #include <aws/http/private/websocket_decoder.h>
  12. #include <aws/http/private/websocket_encoder.h>
  13. #include <aws/http/request_response.h>
  14. #include <aws/io/channel.h>
  15. #include <aws/io/logging.h>
  16. #include <inttypes.h>
  17. #ifdef _MSC_VER
  18. # pragma warning(disable : 4204) /* non-constant aggregate initializer */
  19. #endif
  20. /* TODO: If something goes wrong during normal shutdown, do I change the error_code? */
  21. struct outgoing_frame {
  22. struct aws_websocket_send_frame_options def;
  23. struct aws_linked_list_node node;
  24. };
  25. struct aws_websocket {
  26. struct aws_allocator *alloc;
  27. struct aws_ref_count ref_count;
  28. struct aws_channel_handler channel_handler;
  29. struct aws_channel_slot *channel_slot;
  30. size_t initial_window_size;
  31. bool manual_window_update;
  32. void *user_data;
  33. aws_websocket_on_incoming_frame_begin_fn *on_incoming_frame_begin;
  34. aws_websocket_on_incoming_frame_payload_fn *on_incoming_frame_payload;
  35. aws_websocket_on_incoming_frame_complete_fn *on_incoming_frame_complete;
  36. struct aws_channel_task move_synced_data_to_thread_task;
  37. struct aws_channel_task shutdown_channel_task;
  38. struct aws_channel_task increment_read_window_task;
  39. struct aws_channel_task waiting_on_payload_stream_task;
  40. struct aws_channel_task close_timeout_task;
  41. bool is_server;
  42. /* Data that should only be accessed from the websocket's channel thread. */
  43. struct {
  44. struct aws_websocket_encoder encoder;
  45. /* list of outbound frames that have yet to be encoded and sent to the socket */
  46. struct aws_linked_list outgoing_frame_list;
  47. /* current outbound frame being encoded and sent to the socket */
  48. struct outgoing_frame *current_outgoing_frame;
  49. /*
  50. * list of outbound frames that have been completely written to the io message heading to the socket.
  51. * When the socket write completes we can in turn invoke completion callbacks for all of these frames
  52. */
  53. struct aws_linked_list write_completion_frames;
  54. struct aws_websocket_decoder decoder;
  55. struct aws_websocket_incoming_frame *current_incoming_frame;
  56. struct aws_websocket_incoming_frame incoming_frame_storage;
  57. /* Payload of incoming PING frame.
  58. * The PONG frame we send in response must have an identical payload */
  59. struct aws_byte_buf incoming_ping_payload;
  60. /* If current incoming frame is CONTINUATION, this is the data type it is a continuation of. */
  61. enum aws_websocket_opcode continuation_of_opcode;
  62. /* Amount to increment window after a channel message has been processed. */
  63. size_t incoming_message_window_update;
  64. /* Cached slot to right */
  65. struct aws_channel_slot *last_known_right_slot;
  66. /* True when no more frames will be read, due to:
  67. * - a CLOSE frame was received
  68. * - decoder error
  69. * - channel shutdown in read-dir */
  70. bool is_reading_stopped;
  71. /* True when no more frames will be written, due to:
  72. * - a CLOSE frame was sent
  73. * - encoder error
  74. * - channel shutdown in write-dir */
  75. bool is_writing_stopped;
  76. /* During normal shutdown websocket ensures that a CLOSE frame is sent */
  77. bool is_shutting_down_and_waiting_for_close_frame_to_be_written;
  78. int channel_shutdown_error_code;
  79. bool channel_shutdown_free_scarce_resources_immediately;
  80. /* Wait until each aws_io_message is completely written to
  81. * the socket before sending the next aws_io_message */
  82. bool is_waiting_for_write_completion;
  83. /* If, while writing out data from a payload stream, we experience "read would block",
  84. * schedule a task to try again in the near-future. */
  85. bool is_waiting_on_payload_stream_task;
  86. /* True if this websocket is being used as a dumb mid-channel handler.
  87. * The websocket will no longer respond to its public API or invoke callbacks. */
  88. bool is_midchannel_handler;
  89. } thread_data;
  90. /* Data that may be touched from any thread (lock must be held). */
  91. struct {
  92. struct aws_mutex lock;
  93. struct aws_linked_list outgoing_frame_list;
  94. /* If non-zero, then increment_read_window_task is scheduled */
  95. size_t window_increment_size;
  96. /* Error-code returned by aws_websocket_send_frame() when is_writing_stopped is true */
  97. int send_frame_error_code;
  98. /* Use a task to issue a channel shutdown. */
  99. int shutdown_channel_task_error_code;
  100. bool is_shutdown_channel_task_scheduled;
  101. bool is_move_synced_data_to_thread_task_scheduled;
  102. /* Mirrors variable from thread_data */
  103. bool is_midchannel_handler;
  104. } synced_data;
  105. };
  106. static int s_handler_process_read_message(
  107. struct aws_channel_handler *handler,
  108. struct aws_channel_slot *slot,
  109. struct aws_io_message *message);
  110. static int s_handler_process_write_message(
  111. struct aws_channel_handler *handler,
  112. struct aws_channel_slot *slot,
  113. struct aws_io_message *message);
  114. static int s_handler_increment_read_window(
  115. struct aws_channel_handler *handler,
  116. struct aws_channel_slot *slot,
  117. size_t size);
  118. static int s_handler_shutdown(
  119. struct aws_channel_handler *handler,
  120. struct aws_channel_slot *slot,
  121. enum aws_channel_direction dir,
  122. int error_code,
  123. bool free_scarce_resources_immediately);
  124. static size_t s_handler_initial_window_size(struct aws_channel_handler *handler);
  125. static size_t s_handler_message_overhead(struct aws_channel_handler *handler);
  126. static void s_handler_destroy(struct aws_channel_handler *handler);
  127. static void s_websocket_on_refcount_zero(void *user_data);
  128. static int s_encoder_stream_outgoing_payload(struct aws_byte_buf *out_buf, void *user_data);
  129. static int s_decoder_on_frame(const struct aws_websocket_frame *frame, void *user_data);
  130. static int s_decoder_on_payload(struct aws_byte_cursor data, void *user_data);
  131. static int s_decoder_on_user_payload(struct aws_websocket *websocket, struct aws_byte_cursor data);
  132. static int s_decoder_on_midchannel_payload(struct aws_websocket *websocket, struct aws_byte_cursor data);
  133. static void s_destroy_outgoing_frame(struct aws_websocket *websocket, struct outgoing_frame *frame, int error_code);
  134. static void s_complete_frame_list(struct aws_websocket *websocket, struct aws_linked_list *frames, int error_code);
  135. static void s_complete_incoming_frame(struct aws_websocket *websocket, int error_code, bool *out_callback_result);
  136. static void s_finish_shutdown(struct aws_websocket *websocket);
  137. static void s_io_message_write_completed(
  138. struct aws_channel *channel,
  139. struct aws_io_message *message,
  140. int err_code,
  141. void *user_data);
  142. static int s_send_frame(
  143. struct aws_websocket *websocket,
  144. const struct aws_websocket_send_frame_options *options,
  145. bool from_public_api);
  146. static bool s_midchannel_send_payload(struct aws_websocket *websocket, struct aws_byte_buf *out_buf, void *user_data);
  147. static void s_midchannel_send_complete(struct aws_websocket *websocket, int error_code, void *user_data);
  148. static void s_move_synced_data_to_thread_task(struct aws_channel_task *task, void *arg, enum aws_task_status status);
  149. static void s_increment_read_window_task(struct aws_channel_task *task, void *arg, enum aws_task_status status);
  150. static void s_shutdown_channel_task(struct aws_channel_task *task, void *arg, enum aws_task_status status);
  151. static void s_waiting_on_payload_stream_task(struct aws_channel_task *task, void *arg, enum aws_task_status status);
  152. static void s_close_timeout_task(struct aws_channel_task *task, void *arg, enum aws_task_status status);
  153. static void s_schedule_channel_shutdown(struct aws_websocket *websocket, int error_code);
  154. static void s_shutdown_due_to_write_err(struct aws_websocket *websocket, int error_code);
  155. static void s_shutdown_due_to_read_err(struct aws_websocket *websocket, int error_code);
  156. static void s_stop_writing(struct aws_websocket *websocket, int send_frame_error_code);
  157. static void s_try_write_outgoing_frames(struct aws_websocket *websocket);
  158. static struct aws_channel_handler_vtable s_channel_handler_vtable = {
  159. .process_read_message = s_handler_process_read_message,
  160. .process_write_message = s_handler_process_write_message,
  161. .increment_read_window = s_handler_increment_read_window,
  162. .shutdown = s_handler_shutdown,
  163. .initial_window_size = s_handler_initial_window_size,
  164. .message_overhead = s_handler_message_overhead,
  165. .destroy = s_handler_destroy,
  166. };
  167. const char *aws_websocket_opcode_str(uint8_t opcode) {
  168. switch (opcode) {
  169. case AWS_WEBSOCKET_OPCODE_CONTINUATION:
  170. return "continuation";
  171. case AWS_WEBSOCKET_OPCODE_TEXT:
  172. return "text";
  173. case AWS_WEBSOCKET_OPCODE_BINARY:
  174. return "binary";
  175. case AWS_WEBSOCKET_OPCODE_CLOSE:
  176. return "close";
  177. case AWS_WEBSOCKET_OPCODE_PING:
  178. return "ping";
  179. case AWS_WEBSOCKET_OPCODE_PONG:
  180. return "pong";
  181. default:
  182. return "";
  183. }
  184. }
  185. bool aws_websocket_is_data_frame(uint8_t opcode) {
  186. /* RFC-6455 Section 5.6: Most significant bit of (4 bit) data frame opcode is 0 */
  187. return !(opcode & 0x08);
  188. }
  189. static void s_lock_synced_data(struct aws_websocket *websocket) {
  190. int err = aws_mutex_lock(&websocket->synced_data.lock);
  191. AWS_ASSERT(!err);
  192. (void)err;
  193. }
  194. static void s_unlock_synced_data(struct aws_websocket *websocket) {
  195. int err = aws_mutex_unlock(&websocket->synced_data.lock);
  196. AWS_ASSERT(!err);
  197. (void)err;
  198. }
  199. struct aws_websocket *aws_websocket_handler_new(const struct aws_websocket_handler_options *options) {
  200. struct aws_channel_slot *slot = NULL;
  201. struct aws_websocket *websocket = NULL;
  202. int err;
  203. slot = aws_channel_slot_new(options->channel);
  204. if (!slot) {
  205. goto error;
  206. }
  207. err = aws_channel_slot_insert_end(options->channel, slot);
  208. if (err) {
  209. goto error;
  210. }
  211. websocket = aws_mem_calloc(options->allocator, 1, sizeof(struct aws_websocket));
  212. if (!websocket) {
  213. goto error;
  214. }
  215. websocket->alloc = options->allocator;
  216. aws_ref_count_init(&websocket->ref_count, websocket, s_websocket_on_refcount_zero);
  217. websocket->channel_handler.vtable = &s_channel_handler_vtable;
  218. websocket->channel_handler.alloc = options->allocator;
  219. websocket->channel_handler.impl = websocket;
  220. websocket->channel_slot = slot;
  221. websocket->initial_window_size = options->initial_window_size;
  222. websocket->manual_window_update = options->manual_window_update;
  223. websocket->user_data = options->user_data;
  224. websocket->on_incoming_frame_begin = options->on_incoming_frame_begin;
  225. websocket->on_incoming_frame_payload = options->on_incoming_frame_payload;
  226. websocket->on_incoming_frame_complete = options->on_incoming_frame_complete;
  227. websocket->is_server = options->is_server;
  228. aws_channel_task_init(
  229. &websocket->move_synced_data_to_thread_task,
  230. s_move_synced_data_to_thread_task,
  231. websocket,
  232. "websocket_move_synced_data_to_thread");
  233. aws_channel_task_init(
  234. &websocket->shutdown_channel_task, s_shutdown_channel_task, websocket, "websocket_shutdown_channel");
  235. aws_channel_task_init(
  236. &websocket->increment_read_window_task,
  237. s_increment_read_window_task,
  238. websocket,
  239. "websocket_increment_read_window");
  240. aws_channel_task_init(
  241. &websocket->waiting_on_payload_stream_task,
  242. s_waiting_on_payload_stream_task,
  243. websocket,
  244. "websocket_waiting_on_payload_stream");
  245. aws_channel_task_init(&websocket->close_timeout_task, s_close_timeout_task, websocket, "websocket_close_timeout");
  246. aws_linked_list_init(&websocket->thread_data.outgoing_frame_list);
  247. aws_linked_list_init(&websocket->thread_data.write_completion_frames);
  248. aws_byte_buf_init(&websocket->thread_data.incoming_ping_payload, websocket->alloc, 0);
  249. aws_websocket_encoder_init(&websocket->thread_data.encoder, s_encoder_stream_outgoing_payload, websocket);
  250. aws_websocket_decoder_init(
  251. &websocket->thread_data.decoder, options->allocator, s_decoder_on_frame, s_decoder_on_payload, websocket);
  252. aws_linked_list_init(&websocket->synced_data.outgoing_frame_list);
  253. err = aws_mutex_init(&websocket->synced_data.lock);
  254. if (err) {
  255. AWS_LOGF_ERROR(
  256. AWS_LS_HTTP_WEBSOCKET,
  257. "static: Failed to initialize mutex, error %d (%s).",
  258. aws_last_error(),
  259. aws_error_name(aws_last_error()));
  260. goto error;
  261. }
  262. err = aws_channel_slot_set_handler(slot, &websocket->channel_handler);
  263. if (err) {
  264. goto error;
  265. }
  266. /* Ensure websocket (and the rest of the channel) can't be destroyed until aws_websocket_release() is called */
  267. aws_channel_acquire_hold(options->channel);
  268. return websocket;
  269. error:
  270. if (slot) {
  271. if (websocket && !slot->handler) {
  272. websocket->channel_handler.vtable->destroy(&websocket->channel_handler);
  273. }
  274. aws_channel_slot_remove(slot);
  275. }
  276. return NULL;
  277. }
  278. static void s_handler_destroy(struct aws_channel_handler *handler) {
  279. struct aws_websocket *websocket = handler->impl;
  280. AWS_ASSERT(!websocket->thread_data.current_outgoing_frame);
  281. AWS_ASSERT(!websocket->thread_data.current_incoming_frame);
  282. AWS_LOGF_TRACE(AWS_LS_HTTP_WEBSOCKET, "id=%p: Destroying websocket.", (void *)websocket);
  283. aws_websocket_decoder_clean_up(&websocket->thread_data.decoder);
  284. aws_byte_buf_clean_up(&websocket->thread_data.incoming_ping_payload);
  285. aws_mutex_clean_up(&websocket->synced_data.lock);
  286. aws_mem_release(websocket->alloc, websocket);
  287. }
  288. struct aws_websocket *aws_websocket_acquire(struct aws_websocket *websocket) {
  289. AWS_PRECONDITION(websocket);
  290. AWS_LOGF_TRACE(AWS_LS_HTTP_WEBSOCKET, "id=%p: Acquiring websocket ref-count.", (void *)websocket);
  291. aws_ref_count_acquire(&websocket->ref_count);
  292. return websocket;
  293. }
  294. void aws_websocket_release(struct aws_websocket *websocket) {
  295. if (!websocket) {
  296. return;
  297. }
  298. AWS_LOGF_TRACE(AWS_LS_HTTP_WEBSOCKET, "id=%p: Releasing websocket ref-count.", (void *)websocket);
  299. aws_ref_count_release(&websocket->ref_count);
  300. }
  301. static void s_websocket_on_refcount_zero(void *user_data) {
  302. struct aws_websocket *websocket = user_data;
  303. AWS_ASSERT(websocket->channel_slot);
  304. AWS_LOGF_TRACE(
  305. AWS_LS_HTTP_WEBSOCKET, "id=%p: Websocket ref-count is zero, shut down if necessary.", (void *)websocket);
  306. /* Channel might already be shut down, but make sure */
  307. s_schedule_channel_shutdown(websocket, AWS_ERROR_SUCCESS);
  308. /* Channel won't destroy its slots/handlers until its refcount reaches 0 */
  309. aws_channel_release_hold(websocket->channel_slot->channel);
  310. }
  311. struct aws_channel *aws_websocket_get_channel(const struct aws_websocket *websocket) {
  312. return websocket->channel_slot->channel;
  313. }
  314. int aws_websocket_convert_to_midchannel_handler(struct aws_websocket *websocket) {
  315. if (!aws_channel_thread_is_callers_thread(websocket->channel_slot->channel)) {
  316. AWS_LOGF_ERROR(
  317. AWS_LS_HTTP_WEBSOCKET, "id=%p: Cannot convert to midchannel handler on this thread.", (void *)websocket);
  318. return aws_raise_error(AWS_ERROR_IO_EVENT_LOOP_THREAD_ONLY);
  319. }
  320. if (websocket->thread_data.is_midchannel_handler) {
  321. AWS_LOGF_ERROR(
  322. AWS_LS_HTTP_WEBSOCKET, "id=%p: Websocket has already converted to midchannel handler.", (void *)websocket);
  323. return aws_raise_error(AWS_ERROR_HTTP_WEBSOCKET_IS_MIDCHANNEL_HANDLER);
  324. }
  325. if (websocket->thread_data.is_reading_stopped || websocket->thread_data.is_writing_stopped) {
  326. AWS_LOGF_ERROR(
  327. AWS_LS_HTTP_WEBSOCKET,
  328. "id=%p: Cannot convert websocket to midchannel handler because it is closed or closing.",
  329. (void *)websocket);
  330. return aws_raise_error(AWS_ERROR_HTTP_CONNECTION_CLOSED);
  331. }
  332. if (websocket->thread_data.current_incoming_frame) {
  333. AWS_LOGF_ERROR(
  334. AWS_LS_HTTP_WEBSOCKET,
  335. "id=%p: Cannot convert to midchannel handler in the middle of an incoming frame.",
  336. (void *)websocket);
  337. return aws_raise_error(AWS_ERROR_INVALID_STATE);
  338. }
  339. websocket->thread_data.is_midchannel_handler = true;
  340. return AWS_OP_SUCCESS;
  341. }
  342. static int s_send_frame(
  343. struct aws_websocket *websocket,
  344. const struct aws_websocket_send_frame_options *options,
  345. bool from_public_api) {
  346. AWS_ASSERT(websocket);
  347. AWS_ASSERT(options);
  348. /* Check for bad input. Log about non-obvious errors. */
  349. if (options->payload_length > 0 && !options->stream_outgoing_payload) {
  350. AWS_LOGF_ERROR(
  351. AWS_LS_HTTP_WEBSOCKET,
  352. "id=%p: Invalid frame options, payload streaming function required when payload length is non-zero.",
  353. (void *)websocket);
  354. return aws_raise_error(AWS_ERROR_INVALID_ARGUMENT);
  355. }
  356. struct outgoing_frame *frame = aws_mem_calloc(websocket->alloc, 1, sizeof(struct outgoing_frame));
  357. if (!frame) {
  358. return AWS_OP_ERR;
  359. }
  360. frame->def = *options;
  361. /* Enqueue frame, unless no further sending is allowed. */
  362. int send_error = 0;
  363. bool should_schedule_task = false;
  364. /* BEGIN CRITICAL SECTION */
  365. s_lock_synced_data(websocket);
  366. if (websocket->synced_data.is_midchannel_handler && from_public_api) {
  367. send_error = AWS_ERROR_HTTP_WEBSOCKET_IS_MIDCHANNEL_HANDLER;
  368. } else if (websocket->synced_data.send_frame_error_code) {
  369. send_error = websocket->synced_data.send_frame_error_code;
  370. } else {
  371. aws_linked_list_push_back(&websocket->synced_data.outgoing_frame_list, &frame->node);
  372. if (!websocket->synced_data.is_move_synced_data_to_thread_task_scheduled) {
  373. websocket->synced_data.is_move_synced_data_to_thread_task_scheduled = true;
  374. should_schedule_task = true;
  375. }
  376. }
  377. s_unlock_synced_data(websocket);
  378. /* END CRITICAL SECTION */
  379. if (send_error) {
  380. AWS_LOGF_ERROR(
  381. AWS_LS_HTTP_WEBSOCKET,
  382. "id=%p: Cannot send frame, error %d (%s).",
  383. (void *)websocket,
  384. send_error,
  385. aws_error_name(send_error));
  386. aws_mem_release(websocket->alloc, frame);
  387. return aws_raise_error(send_error);
  388. }
  389. AWS_LOGF_DEBUG(
  390. AWS_LS_HTTP_WEBSOCKET,
  391. "id=%p: Enqueuing outgoing frame with opcode=%" PRIu8 "(%s) length=%" PRIu64 " fin=%s",
  392. (void *)websocket,
  393. options->opcode,
  394. aws_websocket_opcode_str(options->opcode),
  395. options->payload_length,
  396. options->fin ? "T" : "F");
  397. if (should_schedule_task) {
  398. AWS_LOGF_TRACE(AWS_LS_HTTP_WEBSOCKET, "id=%p: Scheduling synced data task.", (void *)websocket);
  399. aws_channel_schedule_task_now(websocket->channel_slot->channel, &websocket->move_synced_data_to_thread_task);
  400. }
  401. return AWS_OP_SUCCESS;
  402. }
  403. int aws_websocket_send_frame(struct aws_websocket *websocket, const struct aws_websocket_send_frame_options *options) {
  404. return s_send_frame(websocket, options, true);
  405. }
  406. static void s_move_synced_data_to_thread_task(struct aws_channel_task *task, void *arg, enum aws_task_status status) {
  407. (void)task;
  408. if (status != AWS_TASK_STATUS_RUN_READY) {
  409. return;
  410. }
  411. struct aws_websocket *websocket = arg;
  412. struct aws_linked_list tmp_list;
  413. aws_linked_list_init(&tmp_list);
  414. /* BEGIN CRITICAL SECTION */
  415. s_lock_synced_data(websocket);
  416. aws_linked_list_swap_contents(&websocket->synced_data.outgoing_frame_list, &tmp_list);
  417. websocket->synced_data.is_move_synced_data_to_thread_task_scheduled = false;
  418. s_unlock_synced_data(websocket);
  419. /* END CRITICAL SECTION */
  420. if (!aws_linked_list_empty(&tmp_list)) {
  421. aws_linked_list_move_all_back(&websocket->thread_data.outgoing_frame_list, &tmp_list);
  422. s_try_write_outgoing_frames(websocket);
  423. }
  424. }
  425. static void s_try_write_outgoing_frames(struct aws_websocket *websocket) {
  426. AWS_ASSERT(aws_channel_thread_is_callers_thread(websocket->channel_slot->channel));
  427. int err;
  428. /* Check whether we should be writing data */
  429. if (!websocket->thread_data.current_outgoing_frame &&
  430. aws_linked_list_empty(&websocket->thread_data.outgoing_frame_list)) {
  431. AWS_LOGF_TRACE(AWS_LS_HTTP_WEBSOCKET, "id=%p: No data to write at this time.", (void *)websocket);
  432. return;
  433. }
  434. if (websocket->thread_data.is_waiting_for_write_completion) {
  435. AWS_LOGF_TRACE(
  436. AWS_LS_HTTP_WEBSOCKET,
  437. "id=%p: Waiting until outstanding aws_io_message is written to socket before sending more data.",
  438. (void *)websocket);
  439. return;
  440. }
  441. if (websocket->thread_data.is_writing_stopped) {
  442. AWS_LOGF_TRACE(AWS_LS_HTTP_WEBSOCKET, "id=%p: Websocket is no longer sending data.", (void *)websocket);
  443. return;
  444. }
  445. /* Acquire aws_io_message */
  446. struct aws_io_message *io_msg = aws_channel_slot_acquire_max_message_for_write(websocket->channel_slot);
  447. if (!io_msg) {
  448. AWS_LOGF_ERROR(
  449. AWS_LS_HTTP_WEBSOCKET,
  450. "id=%p: Failed acquire message from pool, error %d (%s).",
  451. (void *)websocket,
  452. aws_last_error(),
  453. aws_error_name(aws_last_error()));
  454. goto error;
  455. }
  456. io_msg->user_data = websocket;
  457. io_msg->on_completion = s_io_message_write_completed;
  458. /* Loop through frames, writing their data into the io_msg */
  459. bool wrote_close_frame = false;
  460. while (!websocket->thread_data.is_writing_stopped) {
  461. if (websocket->thread_data.current_outgoing_frame) {
  462. AWS_LOGF_TRACE(
  463. AWS_LS_HTTP_WEBSOCKET,
  464. "id=%p: Resuming write of frame=%p opcode=%" PRIu8 "(%s) payload-length=%" PRIu64 ".",
  465. (void *)websocket,
  466. (void *)websocket->thread_data.current_outgoing_frame,
  467. websocket->thread_data.current_outgoing_frame->def.opcode,
  468. aws_websocket_opcode_str(websocket->thread_data.current_outgoing_frame->def.opcode),
  469. websocket->thread_data.current_outgoing_frame->def.payload_length);
  470. } else {
  471. /* We're not in the middle of encoding a frame, so pop off the next one to encode. */
  472. if (aws_linked_list_empty(&websocket->thread_data.outgoing_frame_list)) {
  473. AWS_LOGF_TRACE(AWS_LS_HTTP_WEBSOCKET, "id=%p: No more frames to write.", (void *)websocket);
  474. break;
  475. }
  476. struct aws_linked_list_node *node = aws_linked_list_pop_front(&websocket->thread_data.outgoing_frame_list);
  477. websocket->thread_data.current_outgoing_frame = AWS_CONTAINER_OF(node, struct outgoing_frame, node);
  478. struct aws_websocket_frame frame = {
  479. .fin = websocket->thread_data.current_outgoing_frame->def.fin,
  480. .opcode = websocket->thread_data.current_outgoing_frame->def.opcode,
  481. .payload_length = websocket->thread_data.current_outgoing_frame->def.payload_length,
  482. };
  483. /* RFC-6455 Section 5.3 Client-to-Server Masking
  484. * Clients must mask payload with key derived from an unpredictable source of entropy. */
  485. if (!websocket->is_server) {
  486. frame.masked = true;
  487. /* TODO: faster source of random (but still seeded by device_random) */
  488. struct aws_byte_buf masking_key_buf = aws_byte_buf_from_empty_array(frame.masking_key, 4);
  489. err = aws_device_random_buffer(&masking_key_buf);
  490. if (err) {
  491. AWS_LOGF_ERROR(
  492. AWS_LS_HTTP_WEBSOCKET,
  493. "id=%p: Failed to derive masking key, error %d (%s).",
  494. (void *)websocket,
  495. aws_last_error(),
  496. aws_error_name(aws_last_error()));
  497. goto error;
  498. }
  499. }
  500. err = aws_websocket_encoder_start_frame(&websocket->thread_data.encoder, &frame);
  501. if (err) {
  502. AWS_LOGF_ERROR(
  503. AWS_LS_HTTP_WEBSOCKET,
  504. "id=%p: Failed to start frame encoding, error %d (%s).",
  505. (void *)websocket,
  506. aws_last_error(),
  507. aws_error_name(aws_last_error()));
  508. goto error;
  509. }
  510. AWS_LOGF_TRACE(
  511. AWS_LS_HTTP_WEBSOCKET,
  512. "id=%p: Start writing frame=%p opcode=%" PRIu8 "(%s) payload-length=%" PRIu64 ".",
  513. (void *)websocket,
  514. (void *)websocket->thread_data.current_outgoing_frame,
  515. websocket->thread_data.current_outgoing_frame->def.opcode,
  516. aws_websocket_opcode_str(websocket->thread_data.current_outgoing_frame->def.opcode),
  517. websocket->thread_data.current_outgoing_frame->def.payload_length);
  518. }
  519. err = aws_websocket_encoder_process(&websocket->thread_data.encoder, &io_msg->message_data);
  520. if (err) {
  521. AWS_LOGF_ERROR(
  522. AWS_LS_HTTP_WEBSOCKET,
  523. "id=%p: Frame encoding failed with error %d (%s).",
  524. (void *)websocket,
  525. aws_last_error(),
  526. aws_error_name(aws_last_error()));
  527. goto error;
  528. }
  529. if (aws_websocket_encoder_is_frame_in_progress(&websocket->thread_data.encoder)) {
  530. AWS_LOGF_TRACE(
  531. AWS_LS_HTTP_WEBSOCKET,
  532. "id=%p: Outgoing frame still in progress, but no more data can be written at this time.",
  533. (void *)websocket);
  534. break;
  535. }
  536. if (websocket->thread_data.current_outgoing_frame->def.opcode == AWS_WEBSOCKET_OPCODE_CLOSE) {
  537. wrote_close_frame = true;
  538. }
  539. /*
  540. * a completely-written frame gets added to the write completion list so that when the socket write completes
  541. * we can complete all of the outbound frames that were finished as part of the io message
  542. */
  543. aws_linked_list_push_back(
  544. &websocket->thread_data.write_completion_frames, &websocket->thread_data.current_outgoing_frame->node);
  545. websocket->thread_data.current_outgoing_frame = NULL;
  546. if (wrote_close_frame) {
  547. break;
  548. }
  549. }
  550. /* If payload stream didn't have any bytes available to read right now, then the aws_io_message might be empty.
  551. * If this is the case schedule a task to try again in the future. */
  552. if (io_msg->message_data.len == 0) {
  553. AWS_LOGF_TRACE(
  554. AWS_LS_HTTP_WEBSOCKET,
  555. "id=%p: Reading from payload stream would block, will try again later.",
  556. (void *)websocket);
  557. if (!websocket->thread_data.is_waiting_on_payload_stream_task) {
  558. websocket->thread_data.is_waiting_on_payload_stream_task = true;
  559. /* Future Optimization Idea: Minimize work while we wait. Use some kind of backoff for the retry timing,
  560. * or have some way for stream to notify when more data is available. */
  561. aws_channel_schedule_task_now(websocket->channel_slot->channel, &websocket->waiting_on_payload_stream_task);
  562. }
  563. aws_mem_release(io_msg->allocator, io_msg);
  564. return;
  565. }
  566. /* Prepare to send aws_io_message up the channel. */
  567. /* If CLOSE frame was written, that's the last data we'll write */
  568. if (wrote_close_frame) {
  569. s_stop_writing(websocket, AWS_ERROR_HTTP_WEBSOCKET_CLOSE_FRAME_SENT);
  570. }
  571. AWS_LOGF_TRACE(
  572. AWS_LS_HTTP_WEBSOCKET,
  573. "id=%p: Sending aws_io_message of size %zu in write direction.",
  574. (void *)websocket,
  575. io_msg->message_data.len);
  576. websocket->thread_data.is_waiting_for_write_completion = true;
  577. err = aws_channel_slot_send_message(websocket->channel_slot, io_msg, AWS_CHANNEL_DIR_WRITE);
  578. if (err) {
  579. websocket->thread_data.is_waiting_for_write_completion = false;
  580. AWS_LOGF_ERROR(
  581. AWS_LS_HTTP_WEBSOCKET,
  582. "id=%p: Failed to send message in write direction, error %d (%s).",
  583. (void *)websocket,
  584. aws_last_error(),
  585. aws_error_name(aws_last_error()));
  586. goto error;
  587. }
  588. /* Finish shutdown if we were waiting for the CLOSE frame to be written */
  589. if (wrote_close_frame && websocket->thread_data.is_shutting_down_and_waiting_for_close_frame_to_be_written) {
  590. AWS_LOGF_TRACE(
  591. AWS_LS_HTTP_WEBSOCKET, "id=%p: CLOSE frame sent, finishing handler shutdown sequence.", (void *)websocket);
  592. s_finish_shutdown(websocket);
  593. }
  594. return;
  595. error:
  596. if (io_msg) {
  597. aws_mem_release(io_msg->allocator, io_msg);
  598. }
  599. s_shutdown_due_to_write_err(websocket, aws_last_error());
  600. }
  601. /* Encoder's outgoing_payload callback invokes current frame's callback */
  602. static int s_encoder_stream_outgoing_payload(struct aws_byte_buf *out_buf, void *user_data) {
  603. struct aws_websocket *websocket = user_data;
  604. AWS_ASSERT(aws_channel_thread_is_callers_thread(websocket->channel_slot->channel));
  605. AWS_ASSERT(websocket->thread_data.current_outgoing_frame);
  606. struct outgoing_frame *current_frame = websocket->thread_data.current_outgoing_frame;
  607. AWS_ASSERT(current_frame->def.stream_outgoing_payload);
  608. bool callback_result = current_frame->def.stream_outgoing_payload(websocket, out_buf, current_frame->def.user_data);
  609. if (!callback_result) {
  610. AWS_LOGF_ERROR(
  611. AWS_LS_HTTP_WEBSOCKET, "id=%p: Outgoing payload callback has reported a failure.", (void *)websocket);
  612. return aws_raise_error(AWS_ERROR_HTTP_CALLBACK_FAILURE);
  613. }
  614. return AWS_OP_SUCCESS;
  615. }
  616. static void s_waiting_on_payload_stream_task(struct aws_channel_task *task, void *arg, enum aws_task_status status) {
  617. (void)task;
  618. if (status != AWS_TASK_STATUS_RUN_READY) {
  619. /* If channel has shut down, don't need to resume sending payload */
  620. return;
  621. }
  622. struct aws_websocket *websocket = arg;
  623. AWS_ASSERT(aws_channel_thread_is_callers_thread(websocket->channel_slot->channel));
  624. AWS_LOGF_TRACE(
  625. AWS_LS_HTTP_WEBSOCKET, "id=%p: Done waiting for payload stream, sending more data...", (void *)websocket);
  626. websocket->thread_data.is_waiting_on_payload_stream_task = false;
  627. s_try_write_outgoing_frames(websocket);
  628. }
  629. static void s_io_message_write_completed(
  630. struct aws_channel *channel,
  631. struct aws_io_message *message,
  632. int err_code,
  633. void *user_data) {
  634. (void)channel;
  635. (void)message;
  636. struct aws_websocket *websocket = user_data;
  637. AWS_ASSERT(aws_channel_thread_is_callers_thread(channel));
  638. /*
  639. * Invoke the completion callbacks (and then destroy) for all the frames that were completely written as
  640. * part of this message completion at the socket layer
  641. */
  642. s_complete_frame_list(websocket, &websocket->thread_data.write_completion_frames, err_code);
  643. if (err_code == AWS_ERROR_SUCCESS) {
  644. AWS_LOGF_TRACE(
  645. AWS_LS_HTTP_WEBSOCKET, "id=%p: aws_io_message written to socket, sending more data...", (void *)websocket);
  646. websocket->thread_data.is_waiting_for_write_completion = false;
  647. s_try_write_outgoing_frames(websocket);
  648. } else {
  649. AWS_LOGF_TRACE(
  650. AWS_LS_HTTP_WEBSOCKET,
  651. "id=%p: aws_io_message did not finish writing to socket, error %d (%s).",
  652. (void *)websocket,
  653. err_code,
  654. aws_error_name(err_code));
  655. s_shutdown_due_to_write_err(websocket, err_code);
  656. }
  657. }
  658. static int s_handler_process_write_message(
  659. struct aws_channel_handler *handler,
  660. struct aws_channel_slot *slot,
  661. struct aws_io_message *message) {
  662. (void)slot;
  663. struct aws_websocket *websocket = handler->impl;
  664. AWS_ASSERT(aws_channel_thread_is_callers_thread(websocket->channel_slot->channel));
  665. /* For each aws_io_message headed in the write direction, send a BINARY frame,
  666. * where the frame's payload is the data from this aws_io_message. */
  667. struct aws_websocket_send_frame_options options = {
  668. .payload_length = message->message_data.len,
  669. .user_data = message,
  670. .stream_outgoing_payload = s_midchannel_send_payload,
  671. .on_complete = s_midchannel_send_complete,
  672. .opcode = AWS_WEBSOCKET_OPCODE_BINARY,
  673. .fin = true,
  674. };
  675. /* Use copy_mark to track progress as the data is streamed out */
  676. message->copy_mark = 0;
  677. int err = s_send_frame(websocket, &options, false);
  678. if (err) {
  679. return AWS_OP_ERR;
  680. }
  681. return AWS_OP_SUCCESS;
  682. }
  683. /* Callback for writing data from downstream aws_io_messages into payload of BINARY frames headed upstream */
  684. static bool s_midchannel_send_payload(struct aws_websocket *websocket, struct aws_byte_buf *out_buf, void *user_data) {
  685. (void)websocket;
  686. struct aws_io_message *io_msg = user_data;
  687. /* copy_mark is used to track progress */
  688. size_t src_available = io_msg->message_data.len - io_msg->copy_mark;
  689. size_t dst_available = out_buf->capacity - out_buf->len;
  690. size_t sending = dst_available < src_available ? dst_available : src_available;
  691. bool success = aws_byte_buf_write(out_buf, io_msg->message_data.buffer + io_msg->copy_mark, sending);
  692. io_msg->copy_mark += sending;
  693. return success;
  694. }
  695. /* Callback when data from downstream aws_io_messages, finishes being sent as a BINARY frame upstream. */
  696. static void s_midchannel_send_complete(struct aws_websocket *websocket, int error_code, void *user_data) {
  697. (void)websocket;
  698. struct aws_io_message *io_msg = user_data;
  699. if (io_msg->on_completion) {
  700. io_msg->on_completion(io_msg->owning_channel, io_msg, error_code, io_msg->user_data);
  701. }
  702. aws_mem_release(io_msg->allocator, io_msg);
  703. }
  704. static void s_destroy_outgoing_frame(struct aws_websocket *websocket, struct outgoing_frame *frame, int error_code) {
  705. AWS_LOGF_TRACE(
  706. AWS_LS_HTTP_WEBSOCKET,
  707. "id=%p: Completed outgoing frame=%p opcode=%" PRIu8 "(%s) payload-length=%" PRIu64 " with error_code %d (%s).",
  708. (void *)websocket,
  709. (void *)frame,
  710. frame->def.opcode,
  711. aws_websocket_opcode_str(frame->def.opcode),
  712. frame->def.payload_length,
  713. error_code,
  714. aws_error_name(error_code));
  715. if (frame->def.on_complete) {
  716. frame->def.on_complete(websocket, error_code, frame->def.user_data);
  717. }
  718. aws_mem_release(websocket->alloc, frame);
  719. }
  720. static void s_complete_frame_list(struct aws_websocket *websocket, struct aws_linked_list *frames, int error_code) {
  721. struct aws_linked_list_node *node = aws_linked_list_begin(frames);
  722. while (node != aws_linked_list_end(frames)) {
  723. struct outgoing_frame *frame = AWS_CONTAINER_OF(node, struct outgoing_frame, node);
  724. node = aws_linked_list_next(node);
  725. s_destroy_outgoing_frame(websocket, frame, error_code);
  726. }
  727. /* we've released everything, so reset the list to empty */
  728. aws_linked_list_init(frames);
  729. }
  730. static void s_stop_writing(struct aws_websocket *websocket, int send_frame_error_code) {
  731. AWS_ASSERT(aws_channel_thread_is_callers_thread(websocket->channel_slot->channel));
  732. AWS_ASSERT(send_frame_error_code != AWS_ERROR_SUCCESS);
  733. if (websocket->thread_data.is_writing_stopped) {
  734. return;
  735. }
  736. AWS_LOGF_TRACE(
  737. AWS_LS_HTTP_WEBSOCKET,
  738. "id=%p: Websocket will send no more data, future attempts to send will get error %d (%s).",
  739. (void *)websocket,
  740. send_frame_error_code,
  741. aws_error_name(send_frame_error_code));
  742. /* BEGIN CRITICAL SECTION */
  743. s_lock_synced_data(websocket);
  744. websocket->synced_data.send_frame_error_code = send_frame_error_code;
  745. s_unlock_synced_data(websocket);
  746. /* END CRITICAL SECTION */
  747. websocket->thread_data.is_writing_stopped = true;
  748. }
  749. static void s_shutdown_due_to_write_err(struct aws_websocket *websocket, int error_code) {
  750. AWS_ASSERT(aws_channel_thread_is_callers_thread(websocket->channel_slot->channel));
  751. /* No more writing allowed (it's ok to call this redundantly). */
  752. s_stop_writing(websocket, AWS_ERROR_HTTP_CONNECTION_CLOSED);
  753. /* If there's a current outgoing frame, complete it with the specific error code.
  754. * Any other pending frames will complete with the generic CONNECTION_CLOSED error. */
  755. if (websocket->thread_data.current_outgoing_frame) {
  756. s_destroy_outgoing_frame(websocket, websocket->thread_data.current_outgoing_frame, error_code);
  757. websocket->thread_data.current_outgoing_frame = NULL;
  758. }
  759. /* If we're in the final stages of shutdown, ensure shutdown completes.
  760. * Otherwise tell the channel to shutdown (it's ok to shutdown the channel redundantly). */
  761. if (websocket->thread_data.is_shutting_down_and_waiting_for_close_frame_to_be_written) {
  762. s_finish_shutdown(websocket);
  763. } else {
  764. AWS_LOGF_ERROR(
  765. AWS_LS_HTTP_WEBSOCKET,
  766. "id=%p: Closing websocket due to failure during write, error %d (%s).",
  767. (void *)websocket,
  768. error_code,
  769. aws_error_name(error_code));
  770. s_schedule_channel_shutdown(websocket, error_code);
  771. }
  772. }
  773. static void s_shutdown_due_to_read_err(struct aws_websocket *websocket, int error_code) {
  774. AWS_ASSERT(aws_channel_thread_is_callers_thread(websocket->channel_slot->channel));
  775. AWS_LOGF_ERROR(
  776. AWS_LS_HTTP_WEBSOCKET,
  777. "id=%p: Closing websocket due to failure during read, error %d (%s).",
  778. (void *)websocket,
  779. error_code,
  780. aws_error_name(error_code));
  781. websocket->thread_data.is_reading_stopped = true;
  782. /* If there's a current incoming frame, complete it with the specific error code. */
  783. if (websocket->thread_data.current_incoming_frame) {
  784. s_complete_incoming_frame(websocket, error_code, NULL);
  785. }
  786. /* Tell channel to shutdown (it's ok to call this redundantly) */
  787. s_schedule_channel_shutdown(websocket, error_code);
  788. }
  789. static void s_shutdown_channel_task(struct aws_channel_task *task, void *arg, enum aws_task_status status) {
  790. (void)task;
  791. if (status != AWS_TASK_STATUS_RUN_READY) {
  792. return;
  793. }
  794. struct aws_websocket *websocket = arg;
  795. int error_code;
  796. /* BEGIN CRITICAL SECTION */
  797. s_lock_synced_data(websocket);
  798. error_code = websocket->synced_data.shutdown_channel_task_error_code;
  799. s_unlock_synced_data(websocket);
  800. /* END CRITICAL SECTION */
  801. aws_channel_shutdown(websocket->channel_slot->channel, error_code);
  802. }
  803. /* Tell the channel to shut down. It is safe to call this multiple times.
  804. * The call to aws_channel_shutdown() is delayed so that a user invoking aws_websocket_close doesn't
  805. * have completion callbacks firing before the function call even returns */
  806. static void s_schedule_channel_shutdown(struct aws_websocket *websocket, int error_code) {
  807. bool schedule_shutdown = false;
  808. /* BEGIN CRITICAL SECTION */
  809. s_lock_synced_data(websocket);
  810. if (!websocket->synced_data.is_shutdown_channel_task_scheduled) {
  811. schedule_shutdown = true;
  812. websocket->synced_data.is_shutdown_channel_task_scheduled = true;
  813. websocket->synced_data.shutdown_channel_task_error_code = error_code;
  814. }
  815. s_unlock_synced_data(websocket);
  816. /* END CRITICAL SECTION */
  817. if (schedule_shutdown) {
  818. aws_channel_schedule_task_now(websocket->channel_slot->channel, &websocket->shutdown_channel_task);
  819. }
  820. }
  821. void aws_websocket_close(struct aws_websocket *websocket, bool free_scarce_resources_immediately) {
  822. bool is_midchannel_handler;
  823. /* BEGIN CRITICAL SECTION */
  824. s_lock_synced_data(websocket);
  825. is_midchannel_handler = websocket->synced_data.is_midchannel_handler;
  826. s_unlock_synced_data(websocket);
  827. /* END CRITICAL SECTION */
  828. if (is_midchannel_handler) {
  829. AWS_LOGF_ERROR(
  830. AWS_LS_HTTP_WEBSOCKET,
  831. "id=%p: Ignoring close call, websocket has converted to midchannel handler.",
  832. (void *)websocket);
  833. return;
  834. }
  835. /* TODO: aws_channel_shutdown() should let users specify error_code and "immediate" as separate parameters.
  836. * Currently, any non-zero error_code results in "immediate" shutdown */
  837. int error_code = AWS_ERROR_SUCCESS;
  838. if (free_scarce_resources_immediately) {
  839. error_code = AWS_ERROR_HTTP_CONNECTION_CLOSED;
  840. }
  841. s_schedule_channel_shutdown(websocket, error_code);
  842. }
  843. static int s_handler_shutdown(
  844. struct aws_channel_handler *handler,
  845. struct aws_channel_slot *slot,
  846. enum aws_channel_direction dir,
  847. int error_code,
  848. bool free_scarce_resources_immediately) {
  849. AWS_ASSERT(aws_channel_thread_is_callers_thread(slot->channel));
  850. struct aws_websocket *websocket = handler->impl;
  851. int err;
  852. AWS_LOGF_DEBUG(
  853. AWS_LS_HTTP_WEBSOCKET,
  854. "id=%p: Websocket handler shutting down dir=%s error_code=%d immediate=%d.",
  855. (void *)websocket,
  856. dir == AWS_CHANNEL_DIR_READ ? "READ" : "WRITE",
  857. error_code,
  858. free_scarce_resources_immediately);
  859. if (dir == AWS_CHANNEL_DIR_READ) {
  860. /* Shutdown in the read direction is immediate and simple. */
  861. websocket->thread_data.is_reading_stopped = true;
  862. aws_channel_slot_on_handler_shutdown_complete(slot, dir, error_code, free_scarce_resources_immediately);
  863. } else {
  864. websocket->thread_data.channel_shutdown_error_code = error_code;
  865. websocket->thread_data.channel_shutdown_free_scarce_resources_immediately = free_scarce_resources_immediately;
  866. websocket->thread_data.is_shutting_down_and_waiting_for_close_frame_to_be_written = true;
  867. if (websocket->thread_data.channel_shutdown_free_scarce_resources_immediately ||
  868. websocket->thread_data.is_writing_stopped) {
  869. AWS_LOGF_TRACE(
  870. AWS_LS_HTTP_WEBSOCKET,
  871. "id=%p: Finishing handler shutdown immediately, without ensuring a CLOSE frame was sent.",
  872. (void *)websocket);
  873. s_stop_writing(websocket, AWS_ERROR_HTTP_CONNECTION_CLOSED);
  874. s_finish_shutdown(websocket);
  875. } else {
  876. /* Attempt to queue a CLOSE frame, then wait for it to send before finishing shutdown. */
  877. struct aws_websocket_send_frame_options close_frame = {
  878. .opcode = AWS_WEBSOCKET_OPCODE_CLOSE,
  879. .fin = true,
  880. };
  881. err = s_send_frame(websocket, &close_frame, false);
  882. if (err) {
  883. AWS_LOGF_WARN(
  884. AWS_LS_HTTP_WEBSOCKET,
  885. "id=%p: Failed to send CLOSE frame, error %d (%s).",
  886. (void *)websocket,
  887. aws_last_error(),
  888. aws_error_name(aws_last_error()));
  889. s_stop_writing(websocket, AWS_ERROR_HTTP_CONNECTION_CLOSED);
  890. s_finish_shutdown(websocket);
  891. } else {
  892. AWS_LOGF_TRACE(
  893. AWS_LS_HTTP_WEBSOCKET,
  894. "id=%p: Outgoing CLOSE frame queued, handler will finish shutdown once it's sent.",
  895. (void *)websocket);
  896. /* schedule a task to run after 1 sec. If the CLOSE still not sent at that time, we should just cancel
  897. * sending it and shutdown the channel. */
  898. uint64_t schedule_time = 0;
  899. aws_channel_current_clock_time(websocket->channel_slot->channel, &schedule_time);
  900. schedule_time += AWS_WEBSOCKET_CLOSE_TIMEOUT;
  901. AWS_LOGF_TRACE(
  902. AWS_LS_HTTP_WEBSOCKET,
  903. "id=%p: websocket_close_timeout task will be run at timestamp %" PRIu64,
  904. (void *)websocket,
  905. schedule_time);
  906. aws_channel_schedule_task_future(
  907. websocket->channel_slot->channel, &websocket->close_timeout_task, schedule_time);
  908. }
  909. }
  910. }
  911. return AWS_OP_SUCCESS;
  912. }
  913. static void s_close_timeout_task(struct aws_channel_task *task, void *arg, enum aws_task_status status) {
  914. (void)task;
  915. if (status != AWS_TASK_STATUS_RUN_READY) {
  916. /* If channel has shut down, don't need to resume sending payload */
  917. return;
  918. }
  919. struct aws_websocket *websocket = arg;
  920. AWS_ASSERT(aws_channel_thread_is_callers_thread(websocket->channel_slot->channel));
  921. if (!websocket->thread_data.is_shutting_down_and_waiting_for_close_frame_to_be_written) {
  922. /* Not waiting for write to complete, which means the CLOSE frame has sent, just do nothing */
  923. return;
  924. }
  925. AWS_LOGF_WARN(
  926. AWS_LS_HTTP_WEBSOCKET,
  927. "id=%p: Failed to send CLOSE frame, timeout happened, shutdown the channel",
  928. (void *)websocket);
  929. s_stop_writing(websocket, AWS_ERROR_HTTP_CONNECTION_CLOSED);
  930. s_finish_shutdown(websocket);
  931. }
  932. static void s_finish_shutdown(struct aws_websocket *websocket) {
  933. AWS_ASSERT(aws_channel_thread_is_callers_thread(websocket->channel_slot->channel));
  934. AWS_ASSERT(websocket->thread_data.is_writing_stopped);
  935. AWS_ASSERT(websocket->thread_data.is_shutting_down_and_waiting_for_close_frame_to_be_written);
  936. AWS_LOGF_TRACE(AWS_LS_HTTP_WEBSOCKET, "id=%p: Finishing websocket handler shutdown.", (void *)websocket);
  937. websocket->thread_data.is_shutting_down_and_waiting_for_close_frame_to_be_written = false;
  938. /* Cancel all incomplete frames */
  939. if (websocket->thread_data.current_incoming_frame) {
  940. s_complete_incoming_frame(websocket, AWS_ERROR_HTTP_CONNECTION_CLOSED, NULL);
  941. }
  942. if (websocket->thread_data.current_outgoing_frame) {
  943. s_destroy_outgoing_frame(
  944. websocket, websocket->thread_data.current_outgoing_frame, AWS_ERROR_HTTP_CONNECTION_CLOSED);
  945. websocket->thread_data.current_outgoing_frame = NULL;
  946. }
  947. /* BEGIN CRITICAL SECTION */
  948. s_lock_synced_data(websocket);
  949. while (!aws_linked_list_empty(&websocket->synced_data.outgoing_frame_list)) {
  950. /* Move frames from synced_data to thread_data, then cancel them together outside critical section */
  951. struct aws_linked_list_node *node = aws_linked_list_pop_front(&websocket->synced_data.outgoing_frame_list);
  952. aws_linked_list_push_back(&websocket->thread_data.outgoing_frame_list, node);
  953. }
  954. s_unlock_synced_data(websocket);
  955. /* END CRITICAL SECTION */
  956. s_complete_frame_list(websocket, &websocket->thread_data.write_completion_frames, AWS_ERROR_HTTP_CONNECTION_CLOSED);
  957. while (!aws_linked_list_empty(&websocket->thread_data.outgoing_frame_list)) {
  958. struct aws_linked_list_node *node = aws_linked_list_pop_front(&websocket->thread_data.outgoing_frame_list);
  959. struct outgoing_frame *frame = AWS_CONTAINER_OF(node, struct outgoing_frame, node);
  960. s_destroy_outgoing_frame(websocket, frame, AWS_ERROR_HTTP_CONNECTION_CLOSED);
  961. }
  962. aws_channel_slot_on_handler_shutdown_complete(
  963. websocket->channel_slot,
  964. AWS_CHANNEL_DIR_WRITE,
  965. websocket->thread_data.channel_shutdown_error_code,
  966. websocket->thread_data.channel_shutdown_free_scarce_resources_immediately);
  967. }
  968. static int s_handler_process_read_message(
  969. struct aws_channel_handler *handler,
  970. struct aws_channel_slot *slot,
  971. struct aws_io_message *message) {
  972. AWS_ASSERT(message);
  973. AWS_ASSERT(aws_channel_thread_is_callers_thread(slot->channel));
  974. struct aws_websocket *websocket = handler->impl;
  975. struct aws_byte_cursor cursor = aws_byte_cursor_from_buf(&message->message_data);
  976. int err;
  977. /* At the end of this function we'll bump the window back up by this amount.
  978. * We start off assuming we'll re-open the window by the whole amount,
  979. * but this number will go down if we process any payload data that ought to shrink the window */
  980. websocket->thread_data.incoming_message_window_update = message->message_data.len;
  981. AWS_LOGF_TRACE(
  982. AWS_LS_HTTP_WEBSOCKET,
  983. "id=%p: Begin processing incoming message of size %zu.",
  984. (void *)websocket,
  985. message->message_data.len);
  986. while (cursor.len) {
  987. if (websocket->thread_data.is_reading_stopped) {
  988. goto clean_up;
  989. }
  990. bool frame_complete;
  991. err = aws_websocket_decoder_process(&websocket->thread_data.decoder, &cursor, &frame_complete);
  992. if (err) {
  993. AWS_LOGF_ERROR(
  994. AWS_LS_HTTP_WEBSOCKET,
  995. "id=%p: Failed processing incoming message, error %d (%s). Closing connection.",
  996. (void *)websocket,
  997. aws_last_error(),
  998. aws_error_name(aws_last_error()));
  999. goto error;
  1000. }
  1001. if (frame_complete) {
  1002. bool callback_result;
  1003. s_complete_incoming_frame(websocket, AWS_ERROR_SUCCESS, &callback_result);
  1004. if (!callback_result) {
  1005. AWS_LOGF_ERROR(
  1006. AWS_LS_HTTP_WEBSOCKET,
  1007. "id=%p: Incoming frame completion callback has reported a failure. Closing connection",
  1008. (void *)websocket);
  1009. aws_raise_error(AWS_ERROR_HTTP_CALLBACK_FAILURE);
  1010. goto error;
  1011. }
  1012. }
  1013. }
  1014. if (websocket->thread_data.incoming_message_window_update > 0) {
  1015. err = aws_channel_slot_increment_read_window(slot, websocket->thread_data.incoming_message_window_update);
  1016. if (err) {
  1017. AWS_LOGF_ERROR(
  1018. AWS_LS_HTTP_WEBSOCKET,
  1019. "id=%p: Failed to increment read window after message processing, error %d (%s). Closing "
  1020. "connection.",
  1021. (void *)websocket,
  1022. aws_last_error(),
  1023. aws_error_name(aws_last_error()));
  1024. goto error;
  1025. }
  1026. }
  1027. goto clean_up;
  1028. error:
  1029. s_shutdown_due_to_read_err(websocket, aws_last_error());
  1030. clean_up:
  1031. if (cursor.len > 0) {
  1032. AWS_LOGF_TRACE(
  1033. AWS_LS_HTTP_WEBSOCKET,
  1034. "id=%p: Done processing incoming message, final %zu bytes ignored.",
  1035. (void *)websocket,
  1036. cursor.len);
  1037. } else {
  1038. AWS_LOGF_TRACE(AWS_LS_HTTP_WEBSOCKET, "id=%p: Done processing incoming message.", (void *)websocket);
  1039. }
  1040. aws_mem_release(message->allocator, message);
  1041. return AWS_OP_SUCCESS;
  1042. }
  1043. static int s_decoder_on_frame(const struct aws_websocket_frame *frame, void *user_data) {
  1044. struct aws_websocket *websocket = user_data;
  1045. AWS_ASSERT(aws_channel_thread_is_callers_thread(websocket->channel_slot->channel));
  1046. AWS_ASSERT(!websocket->thread_data.current_incoming_frame);
  1047. AWS_ASSERT(!websocket->thread_data.is_reading_stopped);
  1048. websocket->thread_data.current_incoming_frame = &websocket->thread_data.incoming_frame_storage;
  1049. websocket->thread_data.current_incoming_frame->payload_length = frame->payload_length;
  1050. websocket->thread_data.current_incoming_frame->opcode = frame->opcode;
  1051. websocket->thread_data.current_incoming_frame->fin = frame->fin;
  1052. /* If CONTINUATION frames are expected, remember which type of data is being continued.
  1053. * RFC-6455 Section 5.4 Fragmentation */
  1054. if (aws_websocket_is_data_frame(frame->opcode)) {
  1055. if (frame->opcode != AWS_WEBSOCKET_OPCODE_CONTINUATION) {
  1056. if (frame->fin) {
  1057. websocket->thread_data.continuation_of_opcode = 0;
  1058. } else {
  1059. websocket->thread_data.continuation_of_opcode = frame->opcode;
  1060. }
  1061. }
  1062. } else if (frame->opcode == AWS_WEBSOCKET_OPCODE_PING) {
  1063. /* Prepare to store payload of PING so we can echo it back in the PONG */
  1064. aws_byte_buf_reset(&websocket->thread_data.incoming_ping_payload, false /*zero_contents*/);
  1065. /* Note: we are NOT calling aws_byte_buf_reserve().
  1066. * This works around an attack where a malicious peer CLAIMS they'll send a huge frame,
  1067. * which would case OOM if we did the reserve immediately.
  1068. * If a malicious peer wants to run us out of memory, they'll need to do
  1069. * it the costly way and actually send a billion bytes.
  1070. * Or we could impose our own internal limits, but for now this is simpler */
  1071. }
  1072. /* Invoke user cb */
  1073. bool callback_result = true;
  1074. if (websocket->on_incoming_frame_begin && !websocket->thread_data.is_midchannel_handler) {
  1075. callback_result = websocket->on_incoming_frame_begin(
  1076. websocket, websocket->thread_data.current_incoming_frame, websocket->user_data);
  1077. }
  1078. if (!callback_result) {
  1079. AWS_LOGF_ERROR(
  1080. AWS_LS_HTTP_WEBSOCKET, "id=%p: Incoming frame callback has reported a failure.", (void *)websocket);
  1081. return aws_raise_error(AWS_ERROR_HTTP_CALLBACK_FAILURE);
  1082. }
  1083. return AWS_OP_SUCCESS;
  1084. }
  1085. static int s_decoder_on_payload(struct aws_byte_cursor data, void *user_data) {
  1086. struct aws_websocket *websocket = user_data;
  1087. AWS_ASSERT(aws_channel_thread_is_callers_thread(websocket->channel_slot->channel));
  1088. AWS_ASSERT(websocket->thread_data.current_incoming_frame);
  1089. AWS_ASSERT(!websocket->thread_data.is_reading_stopped);
  1090. /* Store payload of PING so we can echo it back in the PONG */
  1091. if (websocket->thread_data.current_incoming_frame->opcode == AWS_WEBSOCKET_OPCODE_PING) {
  1092. aws_byte_buf_append_dynamic(&websocket->thread_data.incoming_ping_payload, &data);
  1093. }
  1094. if (websocket->thread_data.is_midchannel_handler) {
  1095. return s_decoder_on_midchannel_payload(websocket, data);
  1096. }
  1097. return s_decoder_on_user_payload(websocket, data);
  1098. }
  1099. /* Invoke user cb */
  1100. static int s_decoder_on_user_payload(struct aws_websocket *websocket, struct aws_byte_cursor data) {
  1101. if (websocket->on_incoming_frame_payload) {
  1102. if (!websocket->on_incoming_frame_payload(
  1103. websocket, websocket->thread_data.current_incoming_frame, data, websocket->user_data)) {
  1104. AWS_LOGF_ERROR(
  1105. AWS_LS_HTTP_WEBSOCKET, "id=%p: Incoming payload callback has reported a failure.", (void *)websocket);
  1106. return aws_raise_error(AWS_ERROR_HTTP_CALLBACK_FAILURE);
  1107. }
  1108. }
  1109. /* If this is a "data" frame's payload, let the window shrink */
  1110. if (aws_websocket_is_data_frame(websocket->thread_data.current_incoming_frame->opcode) &&
  1111. websocket->manual_window_update) {
  1112. websocket->thread_data.incoming_message_window_update -= data.len;
  1113. AWS_LOGF_DEBUG(
  1114. AWS_LS_HTTP_WEBSOCKET,
  1115. "id=%p: The read window is shrinking by %zu due to incoming payload from 'data' frame.",
  1116. (void *)websocket,
  1117. data.len);
  1118. }
  1119. return AWS_OP_SUCCESS;
  1120. }
  1121. /* Pass data to channel handler on the right */
  1122. static int s_decoder_on_midchannel_payload(struct aws_websocket *websocket, struct aws_byte_cursor data) {
  1123. struct aws_io_message *io_msg = NULL;
  1124. /* Only pass data to next handler if it's from a BINARY frame (or the CONTINUATION of a BINARY frame) */
  1125. bool is_binary_data = websocket->thread_data.current_incoming_frame->opcode == AWS_WEBSOCKET_OPCODE_BINARY ||
  1126. (websocket->thread_data.current_incoming_frame->opcode == AWS_WEBSOCKET_OPCODE_CONTINUATION &&
  1127. websocket->thread_data.continuation_of_opcode == AWS_WEBSOCKET_OPCODE_BINARY);
  1128. if (!is_binary_data) {
  1129. return AWS_OP_SUCCESS;
  1130. }
  1131. AWS_ASSERT(websocket->channel_slot->adj_right); /* Expected another slot in the read direction */
  1132. /* Note that current implementation of websocket handler does not buffer data travelling in the "read" direction,
  1133. * so the downstream read window needs to be large enough to immediately receive incoming data. */
  1134. if (aws_channel_slot_downstream_read_window(websocket->channel_slot) < data.len) {
  1135. AWS_LOGF_ERROR(
  1136. AWS_LS_HTTP_WEBSOCKET,
  1137. "id=%p: Cannot send entire message without exceeding read window.",
  1138. (void *)websocket);
  1139. aws_raise_error(AWS_IO_CHANNEL_READ_WOULD_EXCEED_WINDOW);
  1140. goto error;
  1141. }
  1142. io_msg = aws_channel_acquire_message_from_pool(
  1143. websocket->channel_slot->channel, AWS_IO_MESSAGE_APPLICATION_DATA, data.len);
  1144. if (!io_msg) {
  1145. AWS_LOGF_ERROR(AWS_LS_HTTP_WEBSOCKET, "id=%p: Failed to acquire message.", (void *)websocket);
  1146. goto error;
  1147. }
  1148. if (io_msg->message_data.capacity < data.len) {
  1149. /* Probably can't happen. Data is coming an aws_io_message, should be able to acquire another just as big */
  1150. AWS_LOGF_ERROR(
  1151. AWS_LS_HTTP_WEBSOCKET, "id=%p: Failed to acquire sufficiently large message.", (void *)websocket);
  1152. aws_raise_error(AWS_ERROR_UNKNOWN);
  1153. goto error;
  1154. }
  1155. if (!aws_byte_buf_write_from_whole_cursor(&io_msg->message_data, data)) {
  1156. AWS_LOGF_ERROR(AWS_LS_HTTP_WEBSOCKET, "id=%p: Unexpected error while copying data.", (void *)websocket);
  1157. aws_raise_error(AWS_ERROR_UNKNOWN);
  1158. goto error;
  1159. }
  1160. int err = aws_channel_slot_send_message(websocket->channel_slot, io_msg, AWS_CHANNEL_DIR_READ);
  1161. if (err) {
  1162. AWS_LOGF_ERROR(
  1163. AWS_LS_HTTP_WEBSOCKET,
  1164. "id=%p: Failed to send read message, error %d (%s).",
  1165. (void *)websocket,
  1166. aws_last_error(),
  1167. aws_error_name(aws_last_error()));
  1168. goto error;
  1169. }
  1170. /* Reduce amount by which websocket will update its read window */
  1171. AWS_ASSERT(websocket->thread_data.incoming_message_window_update >= data.len);
  1172. websocket->thread_data.incoming_message_window_update -= data.len;
  1173. return AWS_OP_SUCCESS;
  1174. error:
  1175. if (io_msg) {
  1176. aws_mem_release(io_msg->allocator, io_msg);
  1177. }
  1178. return AWS_OP_ERR;
  1179. }
  1180. /* When the websocket sends a frame automatically (PONG, CLOSE),
  1181. * this holds the payload. */
  1182. struct aws_websocket_autopayload {
  1183. struct aws_allocator *alloc;
  1184. struct aws_byte_buf buf;
  1185. struct aws_byte_cursor advancing_cursor;
  1186. };
  1187. static struct aws_websocket_autopayload *s_autopayload_new(
  1188. struct aws_allocator *alloc,
  1189. const struct aws_byte_buf *src) {
  1190. struct aws_websocket_autopayload *autopayload = aws_mem_calloc(alloc, 1, sizeof(struct aws_websocket_autopayload));
  1191. autopayload->alloc = alloc;
  1192. if (src->len > 0) {
  1193. aws_byte_buf_init_copy(&autopayload->buf, alloc, src);
  1194. autopayload->advancing_cursor = aws_byte_cursor_from_buf(&autopayload->buf);
  1195. }
  1196. return autopayload;
  1197. }
  1198. static void s_autopayload_destroy(struct aws_websocket_autopayload *autopayload) {
  1199. aws_byte_buf_clean_up(&autopayload->buf);
  1200. aws_mem_release(autopayload->alloc, autopayload);
  1201. }
  1202. static void s_autopayload_send_complete(struct aws_websocket *websocket, int error_code, void *user_data) {
  1203. (void)websocket;
  1204. (void)error_code;
  1205. struct aws_websocket_autopayload *autopayload = user_data;
  1206. s_autopayload_destroy(autopayload);
  1207. }
  1208. static bool s_autopayload_stream_outgoing_payload(
  1209. struct aws_websocket *websocket,
  1210. struct aws_byte_buf *out_buf,
  1211. void *user_data) {
  1212. (void)websocket;
  1213. struct aws_websocket_autopayload *autopayload = user_data;
  1214. aws_byte_buf_write_to_capacity(out_buf, &autopayload->advancing_cursor);
  1215. return true;
  1216. }
  1217. static void s_complete_incoming_frame(struct aws_websocket *websocket, int error_code, bool *out_callback_result) {
  1218. AWS_ASSERT(aws_channel_thread_is_callers_thread(websocket->channel_slot->channel));
  1219. AWS_ASSERT(websocket->thread_data.current_incoming_frame);
  1220. if (error_code == 0) {
  1221. /* If this was a CLOSE frame, don't read any more data. */
  1222. if (websocket->thread_data.current_incoming_frame->opcode == AWS_WEBSOCKET_OPCODE_CLOSE) {
  1223. AWS_LOGF_DEBUG(
  1224. AWS_LS_HTTP_WEBSOCKET,
  1225. "id=%p: Close frame received, any further data received will be ignored.",
  1226. (void *)websocket);
  1227. websocket->thread_data.is_reading_stopped = true;
  1228. /* TODO: auto-close if there's a channel-handler to the right */
  1229. } else if (websocket->thread_data.current_incoming_frame->opcode == AWS_WEBSOCKET_OPCODE_PING) {
  1230. /* Automatically respond to a PING with a PONG */
  1231. if (!websocket->thread_data.is_writing_stopped) {
  1232. /* Optimization idea: avoid allocations/copies each time we send an auto-PONG.
  1233. * Maybe have a small autopayload pool, instead of allocating one each time.
  1234. * Maybe encode directly to aws_io_message, instead of copying to a buf, that's copied to a msg later.
  1235. * Maybe "std::move()" the aws_byte_bufs around instead of copying them. */
  1236. struct aws_websocket_autopayload *autopong =
  1237. s_autopayload_new(websocket->alloc, &websocket->thread_data.incoming_ping_payload);
  1238. struct aws_websocket_send_frame_options pong_frame = {
  1239. .opcode = AWS_WEBSOCKET_OPCODE_PONG,
  1240. .fin = true,
  1241. .payload_length = autopong->buf.len,
  1242. .stream_outgoing_payload = s_autopayload_stream_outgoing_payload,
  1243. .on_complete = s_autopayload_send_complete,
  1244. .user_data = autopong,
  1245. };
  1246. int send_err = s_send_frame(websocket, &pong_frame, false /*from_public_api*/);
  1247. /* Failure should be impossible. We already checked that writing is not stopped */
  1248. AWS_FATAL_ASSERT(!send_err && "Unexpected failure sending websocket PONG");
  1249. }
  1250. }
  1251. }
  1252. /* Invoke user cb */
  1253. bool callback_result = true;
  1254. if (websocket->on_incoming_frame_complete && !websocket->thread_data.is_midchannel_handler) {
  1255. callback_result = websocket->on_incoming_frame_complete(
  1256. websocket, websocket->thread_data.current_incoming_frame, error_code, websocket->user_data);
  1257. }
  1258. if (out_callback_result) {
  1259. *out_callback_result = callback_result;
  1260. }
  1261. websocket->thread_data.current_incoming_frame = NULL;
  1262. }
  1263. static size_t s_handler_initial_window_size(struct aws_channel_handler *handler) {
  1264. struct aws_websocket *websocket = handler->impl;
  1265. return websocket->initial_window_size;
  1266. }
  1267. static size_t s_handler_message_overhead(struct aws_channel_handler *handler) {
  1268. (void)handler;
  1269. return AWS_WEBSOCKET_MAX_FRAME_OVERHEAD;
  1270. }
  1271. static int s_handler_increment_read_window(
  1272. struct aws_channel_handler *handler,
  1273. struct aws_channel_slot *slot,
  1274. size_t size) {
  1275. struct aws_websocket *websocket = handler->impl;
  1276. AWS_ASSERT(aws_channel_thread_is_callers_thread(slot->channel));
  1277. AWS_ASSERT(websocket->thread_data.is_midchannel_handler);
  1278. /* NOTE: This is pretty hacky and should change if it ever causes issues.
  1279. *
  1280. * Currently, all read messages are processed the moment they're received.
  1281. * If the downstream read window is open enough to accept this data, we can send it right along.
  1282. * BUT if the downstream window were too small, we'd need to buffer the data and wait until
  1283. * the downstream window opened again to finish sending.
  1284. *
  1285. * To avoid that complexity, we go to pains here to ensure that the websocket's window exactly
  1286. * matches the window to the right, allowing us to avoid buffering in the read direction.
  1287. */
  1288. size_t increment = size;
  1289. if (websocket->thread_data.last_known_right_slot != slot->adj_right) {
  1290. if (size < slot->window_size) {
  1291. AWS_LOGF_ERROR(
  1292. AWS_LS_HTTP_WEBSOCKET,
  1293. "id=%p: The websocket does not support downstream handlers with a smaller window.",
  1294. (void *)websocket);
  1295. aws_raise_error(AWS_IO_CHANNEL_READ_WOULD_EXCEED_WINDOW);
  1296. goto error;
  1297. }
  1298. /* New handler to the right, make sure websocket's window matches its window. */
  1299. websocket->thread_data.last_known_right_slot = slot->adj_right;
  1300. increment = size - slot->window_size;
  1301. }
  1302. if (increment != 0) {
  1303. int err = aws_channel_slot_increment_read_window(slot, increment);
  1304. if (err) {
  1305. goto error;
  1306. }
  1307. }
  1308. return AWS_OP_SUCCESS;
  1309. error:
  1310. websocket->thread_data.is_reading_stopped = true;
  1311. /* Shutting down channel because I know that no one ever checks these errors */
  1312. s_shutdown_due_to_read_err(websocket, aws_last_error());
  1313. return AWS_OP_ERR;
  1314. }
  1315. static void s_increment_read_window_action(struct aws_websocket *websocket, size_t size) {
  1316. AWS_ASSERT(aws_channel_thread_is_callers_thread(websocket->channel_slot->channel));
  1317. int err = aws_channel_slot_increment_read_window(websocket->channel_slot, size);
  1318. if (err) {
  1319. AWS_LOGF_ERROR(
  1320. AWS_LS_HTTP_WEBSOCKET,
  1321. "id=%p: Failed to increment read window, error %d (%s). Closing websocket.",
  1322. (void *)websocket,
  1323. aws_last_error(),
  1324. aws_error_name(aws_last_error()));
  1325. s_schedule_channel_shutdown(websocket, aws_last_error());
  1326. }
  1327. }
  1328. static void s_increment_read_window_task(struct aws_channel_task *task, void *arg, enum aws_task_status status) {
  1329. (void)task;
  1330. if (status != AWS_TASK_STATUS_RUN_READY) {
  1331. return;
  1332. }
  1333. struct aws_websocket *websocket = arg;
  1334. size_t size;
  1335. /* BEGIN CRITICAL SECTION */
  1336. s_lock_synced_data(websocket);
  1337. size = websocket->synced_data.window_increment_size;
  1338. websocket->synced_data.window_increment_size = 0;
  1339. s_unlock_synced_data(websocket);
  1340. /* END CRITICAL SECTION */
  1341. AWS_LOGF_TRACE(
  1342. AWS_LS_HTTP_WEBSOCKET, "id=%p: Running task to increment read window by %zu.", (void *)websocket, size);
  1343. s_increment_read_window_action(websocket, size);
  1344. }
  1345. void aws_websocket_increment_read_window(struct aws_websocket *websocket, size_t size) {
  1346. if (size == 0) {
  1347. AWS_LOGF_TRACE(AWS_LS_HTTP_WEBSOCKET, "id=%p: Ignoring window increment of size 0.", (void *)websocket);
  1348. return;
  1349. }
  1350. if (!websocket->manual_window_update) {
  1351. AWS_LOGF_DEBUG(
  1352. AWS_LS_HTTP_WEBSOCKET,
  1353. "id=%p: Ignoring window increment. Manual window management (aka read backpressure) is not enabled.",
  1354. (void *)websocket);
  1355. return;
  1356. }
  1357. /* Schedule a task to do the increment.
  1358. * If task is already scheduled, just increase size to be incremented */
  1359. bool is_midchannel_handler = false;
  1360. bool should_schedule_task = false;
  1361. /* BEGIN CRITICAL SECTION */
  1362. s_lock_synced_data(websocket);
  1363. if (websocket->synced_data.is_midchannel_handler) {
  1364. is_midchannel_handler = true;
  1365. } else if (websocket->synced_data.window_increment_size == 0) {
  1366. should_schedule_task = true;
  1367. websocket->synced_data.window_increment_size = size;
  1368. } else {
  1369. websocket->synced_data.window_increment_size =
  1370. aws_add_size_saturating(websocket->synced_data.window_increment_size, size);
  1371. }
  1372. s_unlock_synced_data(websocket);
  1373. /* END CRITICAL SECTION */
  1374. if (is_midchannel_handler) {
  1375. AWS_LOGF_TRACE(
  1376. AWS_LS_HTTP_WEBSOCKET,
  1377. "id=%p: Ignoring window increment call, websocket has converted to midchannel handler.",
  1378. (void *)websocket);
  1379. } else if (should_schedule_task) {
  1380. AWS_LOGF_TRACE(
  1381. AWS_LS_HTTP_WEBSOCKET, "id=%p: Scheduling task to increment read window by %zu.", (void *)websocket, size);
  1382. aws_channel_schedule_task_now(websocket->channel_slot->channel, &websocket->increment_read_window_task);
  1383. } else {
  1384. AWS_LOGF_TRACE(
  1385. AWS_LS_HTTP_WEBSOCKET,
  1386. "id=%p: Task to increment read window already scheduled, increasing scheduled size by %zu.",
  1387. (void *)websocket,
  1388. size);
  1389. }
  1390. }
  1391. int aws_websocket_random_handshake_key(struct aws_byte_buf *dst) {
  1392. /* RFC-6455 Section 4.1.
  1393. * Derive random 16-byte value, base64-encoded, for the Sec-WebSocket-Key header */
  1394. uint8_t key_random_storage[16] = {0};
  1395. struct aws_byte_buf key_random_buf = aws_byte_buf_from_empty_array(key_random_storage, sizeof(key_random_storage));
  1396. int err = aws_device_random_buffer(&key_random_buf);
  1397. if (err) {
  1398. return AWS_OP_ERR;
  1399. }
  1400. struct aws_byte_cursor key_random_cur = aws_byte_cursor_from_buf(&key_random_buf);
  1401. err = aws_base64_encode(&key_random_cur, dst);
  1402. if (err) {
  1403. return AWS_OP_ERR;
  1404. }
  1405. return AWS_OP_SUCCESS;
  1406. }
  1407. struct aws_http_message *aws_http_message_new_websocket_handshake_request(
  1408. struct aws_allocator *allocator,
  1409. struct aws_byte_cursor path,
  1410. struct aws_byte_cursor host) {
  1411. AWS_PRECONDITION(allocator);
  1412. AWS_PRECONDITION(aws_byte_cursor_is_valid(&path));
  1413. AWS_PRECONDITION(aws_byte_cursor_is_valid(&host));
  1414. struct aws_http_message *request = aws_http_message_new_request(allocator);
  1415. if (!request) {
  1416. goto error;
  1417. }
  1418. int err = aws_http_message_set_request_method(request, aws_http_method_get);
  1419. if (err) {
  1420. goto error;
  1421. }
  1422. err = aws_http_message_set_request_path(request, path);
  1423. if (err) {
  1424. goto error;
  1425. }
  1426. uint8_t key_storage[AWS_WEBSOCKET_MAX_HANDSHAKE_KEY_LENGTH];
  1427. struct aws_byte_buf key_buf = aws_byte_buf_from_empty_array(key_storage, sizeof(key_storage));
  1428. err = aws_websocket_random_handshake_key(&key_buf);
  1429. if (err) {
  1430. goto error;
  1431. }
  1432. struct aws_http_header required_headers[] = {
  1433. {
  1434. .name = AWS_BYTE_CUR_INIT_FROM_STRING_LITERAL("Host"),
  1435. .value = host,
  1436. },
  1437. {
  1438. .name = AWS_BYTE_CUR_INIT_FROM_STRING_LITERAL("Upgrade"),
  1439. .value = AWS_BYTE_CUR_INIT_FROM_STRING_LITERAL("websocket"),
  1440. },
  1441. {
  1442. .name = AWS_BYTE_CUR_INIT_FROM_STRING_LITERAL("Connection"),
  1443. .value = AWS_BYTE_CUR_INIT_FROM_STRING_LITERAL("Upgrade"),
  1444. },
  1445. {
  1446. .name = AWS_BYTE_CUR_INIT_FROM_STRING_LITERAL("Sec-WebSocket-Key"),
  1447. .value = aws_byte_cursor_from_buf(&key_buf),
  1448. },
  1449. {
  1450. .name = AWS_BYTE_CUR_INIT_FROM_STRING_LITERAL("Sec-WebSocket-Version"),
  1451. .value = AWS_BYTE_CUR_INIT_FROM_STRING_LITERAL("13"),
  1452. },
  1453. };
  1454. for (size_t i = 0; i < AWS_ARRAY_SIZE(required_headers); ++i) {
  1455. err = aws_http_message_add_header(request, required_headers[i]);
  1456. if (err) {
  1457. goto error;
  1458. }
  1459. }
  1460. return request;
  1461. error:
  1462. aws_http_message_destroy(request);
  1463. return NULL;
  1464. }