client_channel_handler.c 41 KB

12345678910111213141516171819202122232425262728293031323334353637383940414243444546474849505152535455565758596061626364656667686970717273747576777879808182838485868788899091929394959697989910010110210310410510610710810911011111211311411511611711811912012112212312412512612712812913013113213313413513613713813914014114214314414514614714814915015115215315415515615715815916016116216316416516616716816917017117217317417517617717817918018118218318418518618718818919019119219319419519619719819920020120220320420520620720820921021121221321421521621721821922022122222322422522622722822923023123223323423523623723823924024124224324424524624724824925025125225325425525625725825926026126226326426526626726826927027127227327427527627727827928028128228328428528628728828929029129229329429529629729829930030130230330430530630730830931031131231331431531631731831932032132232332432532632732832933033133233333433533633733833934034134234334434534634734834935035135235335435535635735835936036136236336436536636736836937037137237337437537637737837938038138238338438538638738838939039139239339439539639739839940040140240340440540640740840941041141241341441541641741841942042142242342442542642742842943043143243343443543643743843944044144244344444544644744844945045145245345445545645745845946046146246346446546646746846947047147247347447547647747847948048148248348448548648748848949049149249349449549649749849950050150250350450550650750850951051151251351451551651751851952052152252352452552652752852953053153253353453553653753853954054154254354454554654754854955055155255355455555655755855956056156256356456556656756856957057157257357457557657757857958058158258358458558658758858959059159259359459559659759859960060160260360460560660760860961061161261361461561661761861962062162262362462562662762862963063163263363463563663763863964064164264364464564664764864965065165265365465565665765865966066166266366466566666766866967067167267367467567667767867968068168268368468568668768868969069169269369469569669769869970070170270370470570670770870971071171271371471571671771871972072172272372472572672772872973073173273373473573673773873974074174274374474574674774874975075175275375475575675775875976076176276376476576676776876977077177277377477577677777877978078178278378478578678778878979079179279379479579679779879980080180280380480580680780880981081181281381481581681781881982082182282382482582682782882983083183283383483583683783883984084184284384484584684784884985085185285385485585685785885986086186286386486586686786886987087187287387487587687787887988088188288388488588688788888989089189289389489589689789889990090190290390490590690790890991091191291391491591691791891992092192292392492592692792892993093193293393493593693793893994094194294394494594694794894995095195295395495595695795895996096196296396496596696796896997097197297397497597697797897998098198298398498598698798898999099199299399499599699799899910001001100210031004100510061007100810091010101110121013101410151016101710181019102010211022102310241025102610271028102910301031103210331034103510361037103810391040104110421043104410451046104710481049105010511052105310541055105610571058105910601061
  1. /**
  2. * Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved.
  3. * SPDX-License-Identifier: Apache-2.0.
  4. */
  5. #include <aws/mqtt/private/client_impl.h>
  6. #include <aws/mqtt/private/packets.h>
  7. #include <aws/mqtt/private/topic_tree.h>
  8. #include <aws/io/logging.h>
  9. #include <aws/common/clock.h>
  10. #include <aws/common/math.h>
  11. #include <aws/common/task_scheduler.h>
  12. #include <inttypes.h>
  13. #ifdef _MSC_VER
  14. # pragma warning(disable : 4204)
  15. #endif
  16. /*******************************************************************************
  17. * Packet State Machine
  18. ******************************************************************************/
  19. typedef int(packet_handler_fn)(struct aws_mqtt_client_connection *connection, struct aws_byte_cursor message_cursor);
  20. static int s_packet_handler_default(
  21. struct aws_mqtt_client_connection *connection,
  22. struct aws_byte_cursor message_cursor) {
  23. (void)connection;
  24. (void)message_cursor;
  25. AWS_LOGF_ERROR(AWS_LS_MQTT_CLIENT, "id=%p: Unhandled packet type received", (void *)connection);
  26. return aws_raise_error(AWS_ERROR_MQTT_INVALID_PACKET_TYPE);
  27. }
  28. static void s_on_time_to_ping(struct aws_channel_task *channel_task, void *arg, enum aws_task_status status);
  29. static void s_schedule_ping(struct aws_mqtt_client_connection *connection) {
  30. aws_channel_task_init(&connection->ping_task, s_on_time_to_ping, connection, "mqtt_ping");
  31. uint64_t now = 0;
  32. aws_channel_current_clock_time(connection->slot->channel, &now);
  33. AWS_LOGF_TRACE(
  34. AWS_LS_MQTT_CLIENT, "id=%p: Scheduling PING. current timestamp is %" PRIu64, (void *)connection, now);
  35. uint64_t schedule_time =
  36. now + aws_timestamp_convert(connection->keep_alive_time_secs, AWS_TIMESTAMP_SECS, AWS_TIMESTAMP_NANOS, NULL);
  37. AWS_LOGF_TRACE(
  38. AWS_LS_MQTT_CLIENT,
  39. "id=%p: The next ping will be run at timestamp %" PRIu64,
  40. (void *)connection,
  41. schedule_time);
  42. aws_channel_schedule_task_future(connection->slot->channel, &connection->ping_task, schedule_time);
  43. }
  44. static void s_on_time_to_ping(struct aws_channel_task *channel_task, void *arg, enum aws_task_status status) {
  45. (void)channel_task;
  46. if (status == AWS_TASK_STATUS_RUN_READY) {
  47. struct aws_mqtt_client_connection *connection = arg;
  48. AWS_LOGF_TRACE(AWS_LS_MQTT_CLIENT, "id=%p: Sending PING", (void *)connection);
  49. aws_mqtt_client_connection_ping(connection);
  50. s_schedule_ping(connection);
  51. }
  52. }
  53. static int s_packet_handler_connack(
  54. struct aws_mqtt_client_connection *connection,
  55. struct aws_byte_cursor message_cursor) {
  56. AWS_LOGF_TRACE(AWS_LS_MQTT_CLIENT, "id=%p: CONNACK received", (void *)connection);
  57. struct aws_mqtt_packet_connack connack;
  58. if (aws_mqtt_packet_connack_decode(&message_cursor, &connack)) {
  59. AWS_LOGF_ERROR(
  60. AWS_LS_MQTT_CLIENT, "id=%p: error %d parsing CONNACK packet", (void *)connection, aws_last_error());
  61. return AWS_OP_ERR;
  62. }
  63. bool was_reconnecting;
  64. struct aws_linked_list requests;
  65. aws_linked_list_init(&requests);
  66. { /* BEGIN CRITICAL SECTION */
  67. mqtt_connection_lock_synced_data(connection);
  68. /* User requested disconnect, don't do anything */
  69. if (connection->synced_data.state >= AWS_MQTT_CLIENT_STATE_DISCONNECTING) {
  70. mqtt_connection_unlock_synced_data(connection);
  71. AWS_LOGF_TRACE(
  72. AWS_LS_MQTT_CLIENT, "id=%p: User has requested disconnect, dropping connection", (void *)connection);
  73. return AWS_OP_SUCCESS;
  74. }
  75. was_reconnecting = connection->synced_data.state == AWS_MQTT_CLIENT_STATE_RECONNECTING;
  76. if (connack.connect_return_code == AWS_MQTT_CONNECT_ACCEPTED) {
  77. AWS_LOGF_DEBUG(
  78. AWS_LS_MQTT_CLIENT,
  79. "id=%p: connection was accepted, switch state from %d to CONNECTED.",
  80. (void *)connection,
  81. (int)connection->synced_data.state);
  82. /* Don't change the state if it's not ACCEPTED by broker */
  83. mqtt_connection_set_state(connection, AWS_MQTT_CLIENT_STATE_CONNECTED);
  84. aws_linked_list_swap_contents(&connection->synced_data.pending_requests_list, &requests);
  85. }
  86. mqtt_connection_unlock_synced_data(connection);
  87. } /* END CRITICAL SECTION */
  88. connection->connection_count++;
  89. uint64_t now = 0;
  90. aws_high_res_clock_get_ticks(&now);
  91. if (connack.connect_return_code == AWS_MQTT_CONNECT_ACCEPTED) {
  92. /*
  93. * This was a successful MQTT connection establishment, record the time so that channel shutdown
  94. * can make a good decision about reconnect backoff reset.
  95. */
  96. connection->reconnect_timeouts.channel_successful_connack_timestamp_ns = now;
  97. /* If successfully connected, schedule all pending tasks */
  98. AWS_LOGF_TRACE(
  99. AWS_LS_MQTT_CLIENT, "id=%p: connection was accepted processing offline requests.", (void *)connection);
  100. if (!aws_linked_list_empty(&requests)) {
  101. struct aws_linked_list_node *current = aws_linked_list_front(&requests);
  102. const struct aws_linked_list_node *end = aws_linked_list_end(&requests);
  103. do {
  104. struct aws_mqtt_request *request = AWS_CONTAINER_OF(current, struct aws_mqtt_request, list_node);
  105. AWS_LOGF_TRACE(
  106. AWS_LS_MQTT_CLIENT,
  107. "id=%p: processing offline request %" PRIu16,
  108. (void *)connection,
  109. request->packet_id);
  110. aws_channel_schedule_task_now(connection->slot->channel, &request->outgoing_task);
  111. current = current->next;
  112. } while (current != end);
  113. }
  114. } else {
  115. AWS_LOGF_ERROR(
  116. AWS_LS_MQTT_CLIENT,
  117. "id=%p: invalid connect return code %d, disconnecting",
  118. (void *)connection,
  119. connack.connect_return_code);
  120. /* If error code returned, disconnect, on_completed will be invoked from shutdown process */
  121. aws_channel_shutdown(connection->slot->channel, AWS_ERROR_MQTT_PROTOCOL_ERROR);
  122. return AWS_OP_SUCCESS;
  123. }
  124. /* It is possible for a connection to complete, and a hangup to occur before the
  125. * CONNECT/CONNACK cycle completes. In that case, we must deliver on_connection_complete
  126. * on the first successful CONNACK or user code will never think it's connected */
  127. if (was_reconnecting && connection->connection_count > 1) {
  128. AWS_LOGF_TRACE(
  129. AWS_LS_MQTT_CLIENT,
  130. "id=%p: connection is a resumed connection, invoking on_resumed callback",
  131. (void *)connection);
  132. MQTT_CLIENT_CALL_CALLBACK_ARGS(connection, on_resumed, connack.connect_return_code, connack.session_present);
  133. } else {
  134. aws_create_reconnect_task(connection);
  135. AWS_LOGF_TRACE(
  136. AWS_LS_MQTT_CLIENT,
  137. "id=%p: connection is a new connection, invoking on_connection_complete callback",
  138. (void *)connection);
  139. MQTT_CLIENT_CALL_CALLBACK_ARGS(
  140. connection, on_connection_complete, AWS_OP_SUCCESS, connack.connect_return_code, connack.session_present);
  141. }
  142. AWS_LOGF_TRACE(AWS_LS_MQTT_CLIENT, "id=%p: connection callback completed", (void *)connection);
  143. s_schedule_ping(connection);
  144. return AWS_OP_SUCCESS;
  145. }
  146. static int s_packet_handler_publish(
  147. struct aws_mqtt_client_connection *connection,
  148. struct aws_byte_cursor message_cursor) {
  149. /* TODO: need to handle the QoS 2 message to avoid processing the message a second time */
  150. struct aws_mqtt_packet_publish publish;
  151. if (aws_mqtt_packet_publish_decode(&message_cursor, &publish)) {
  152. return AWS_OP_ERR;
  153. }
  154. aws_mqtt_topic_tree_publish(&connection->thread_data.subscriptions, &publish);
  155. bool dup = aws_mqtt_packet_publish_get_dup(&publish);
  156. enum aws_mqtt_qos qos = aws_mqtt_packet_publish_get_qos(&publish);
  157. bool retain = aws_mqtt_packet_publish_get_retain(&publish);
  158. MQTT_CLIENT_CALL_CALLBACK_ARGS(connection, on_any_publish, &publish.topic_name, &publish.payload, dup, qos, retain);
  159. AWS_LOGF_TRACE(
  160. AWS_LS_MQTT_CLIENT,
  161. "id=%p: publish received with msg id=%" PRIu16 " dup=%d qos=%d retain=%d payload-size=%zu topic=" PRInSTR,
  162. (void *)connection,
  163. publish.packet_identifier,
  164. dup,
  165. qos,
  166. retain,
  167. publish.payload.len,
  168. AWS_BYTE_CURSOR_PRI(publish.topic_name));
  169. struct aws_mqtt_packet_ack puback;
  170. AWS_ZERO_STRUCT(puback);
  171. /* Switch on QoS flags (bits 1 & 2) */
  172. switch (qos) {
  173. case AWS_MQTT_QOS_AT_MOST_ONCE:
  174. AWS_LOGF_TRACE(
  175. AWS_LS_MQTT_CLIENT, "id=%p: received publish QOS is 0, not sending puback", (void *)connection);
  176. /* No more communication necessary */
  177. break;
  178. case AWS_MQTT_QOS_AT_LEAST_ONCE:
  179. AWS_LOGF_TRACE(AWS_LS_MQTT_CLIENT, "id=%p: received publish QOS is 1, sending puback", (void *)connection);
  180. aws_mqtt_packet_puback_init(&puback, publish.packet_identifier);
  181. break;
  182. case AWS_MQTT_QOS_EXACTLY_ONCE:
  183. AWS_LOGF_TRACE(AWS_LS_MQTT_CLIENT, "id=%p: received publish QOS is 2, sending pubrec", (void *)connection);
  184. aws_mqtt_packet_pubrec_init(&puback, publish.packet_identifier);
  185. break;
  186. default:
  187. /* Impossible to hit this branch. QoS value is checked when decoding */
  188. AWS_FATAL_ASSERT(0);
  189. break;
  190. }
  191. if (puback.packet_identifier) {
  192. struct aws_io_message *message = mqtt_get_message_for_packet(connection, &puback.fixed_header);
  193. if (!message) {
  194. return AWS_OP_ERR;
  195. }
  196. if (aws_mqtt_packet_ack_encode(&message->message_data, &puback)) {
  197. aws_mem_release(message->allocator, message);
  198. return AWS_OP_ERR;
  199. }
  200. if (aws_channel_slot_send_message(connection->slot, message, AWS_CHANNEL_DIR_WRITE)) {
  201. aws_mem_release(message->allocator, message);
  202. return AWS_OP_ERR;
  203. }
  204. }
  205. return AWS_OP_SUCCESS;
  206. }
  207. static int s_packet_handler_ack(struct aws_mqtt_client_connection *connection, struct aws_byte_cursor message_cursor) {
  208. struct aws_mqtt_packet_ack ack;
  209. if (aws_mqtt_packet_ack_decode(&message_cursor, &ack)) {
  210. return AWS_OP_ERR;
  211. }
  212. AWS_LOGF_TRACE(
  213. AWS_LS_MQTT_CLIENT, "id=%p: received ack for message id %" PRIu16, (void *)connection, ack.packet_identifier);
  214. mqtt_request_complete(connection, AWS_ERROR_SUCCESS, ack.packet_identifier);
  215. return AWS_OP_SUCCESS;
  216. }
  217. static int s_packet_handler_suback(
  218. struct aws_mqtt_client_connection *connection,
  219. struct aws_byte_cursor message_cursor) {
  220. struct aws_mqtt_packet_suback suback;
  221. if (aws_mqtt_packet_suback_init(&suback, connection->allocator, 0 /* fake packet_id */)) {
  222. return AWS_OP_ERR;
  223. }
  224. if (aws_mqtt_packet_suback_decode(&message_cursor, &suback)) {
  225. goto error;
  226. }
  227. AWS_LOGF_TRACE(
  228. AWS_LS_MQTT_CLIENT,
  229. "id=%p: received suback for message id %" PRIu16,
  230. (void *)connection,
  231. suback.packet_identifier);
  232. struct aws_mqtt_request *request = NULL;
  233. { /* BEGIN CRITICAL SECTION */
  234. mqtt_connection_lock_synced_data(connection);
  235. struct aws_hash_element *elem = NULL;
  236. aws_hash_table_find(&connection->synced_data.outstanding_requests_table, &suback.packet_identifier, &elem);
  237. if (elem != NULL) {
  238. request = elem->value;
  239. }
  240. mqtt_connection_unlock_synced_data(connection);
  241. } /* END CRITICAL SECTION */
  242. if (request == NULL) {
  243. /* no corresponding request found */
  244. goto done;
  245. }
  246. struct subscribe_task_arg *task_arg = request->on_complete_ud;
  247. size_t request_topics_len = aws_array_list_length(&task_arg->topics);
  248. size_t suback_return_code_len = aws_array_list_length(&suback.return_codes);
  249. if (request_topics_len != suback_return_code_len) {
  250. goto error;
  251. }
  252. size_t num_filters = aws_array_list_length(&suback.return_codes);
  253. for (size_t i = 0; i < num_filters; ++i) {
  254. uint8_t return_code = 0;
  255. struct subscribe_task_topic *topic = NULL;
  256. aws_array_list_get_at(&suback.return_codes, (void *)&return_code, i);
  257. aws_array_list_get_at(&task_arg->topics, &topic, i);
  258. topic->request.qos = return_code;
  259. }
  260. done:
  261. mqtt_request_complete(connection, AWS_ERROR_SUCCESS, suback.packet_identifier);
  262. aws_mqtt_packet_suback_clean_up(&suback);
  263. return AWS_OP_SUCCESS;
  264. error:
  265. aws_mqtt_packet_suback_clean_up(&suback);
  266. return AWS_OP_ERR;
  267. }
  268. static int s_packet_handler_pubrec(
  269. struct aws_mqtt_client_connection *connection,
  270. struct aws_byte_cursor message_cursor) {
  271. struct aws_mqtt_packet_ack ack;
  272. if (aws_mqtt_packet_ack_decode(&message_cursor, &ack)) {
  273. return AWS_OP_ERR;
  274. }
  275. /* TODO: When sending PUBLISH with QoS 2, we should be storing the data until this packet is received, at which
  276. * point we may discard it. */
  277. /* Send PUBREL */
  278. aws_mqtt_packet_pubrel_init(&ack, ack.packet_identifier);
  279. struct aws_io_message *message = mqtt_get_message_for_packet(connection, &ack.fixed_header);
  280. if (!message) {
  281. return AWS_OP_ERR;
  282. }
  283. if (aws_mqtt_packet_ack_encode(&message->message_data, &ack)) {
  284. goto on_error;
  285. }
  286. if (aws_channel_slot_send_message(connection->slot, message, AWS_CHANNEL_DIR_WRITE)) {
  287. goto on_error;
  288. }
  289. return AWS_OP_SUCCESS;
  290. on_error:
  291. if (message) {
  292. aws_mem_release(message->allocator, message);
  293. }
  294. return AWS_OP_ERR;
  295. }
  296. static int s_packet_handler_pubrel(
  297. struct aws_mqtt_client_connection *connection,
  298. struct aws_byte_cursor message_cursor) {
  299. struct aws_mqtt_packet_ack ack;
  300. if (aws_mqtt_packet_ack_decode(&message_cursor, &ack)) {
  301. return AWS_OP_ERR;
  302. }
  303. /* Send PUBCOMP */
  304. aws_mqtt_packet_pubcomp_init(&ack, ack.packet_identifier);
  305. struct aws_io_message *message = mqtt_get_message_for_packet(connection, &ack.fixed_header);
  306. if (!message) {
  307. return AWS_OP_ERR;
  308. }
  309. if (aws_mqtt_packet_ack_encode(&message->message_data, &ack)) {
  310. goto on_error;
  311. }
  312. if (aws_channel_slot_send_message(connection->slot, message, AWS_CHANNEL_DIR_WRITE)) {
  313. goto on_error;
  314. }
  315. return AWS_OP_SUCCESS;
  316. on_error:
  317. if (message) {
  318. aws_mem_release(message->allocator, message);
  319. }
  320. return AWS_OP_ERR;
  321. }
  322. static int s_packet_handler_pingresp(
  323. struct aws_mqtt_client_connection *connection,
  324. struct aws_byte_cursor message_cursor) {
  325. (void)message_cursor;
  326. AWS_LOGF_TRACE(AWS_LS_MQTT_CLIENT, "id=%p: PINGRESP received", (void *)connection);
  327. connection->thread_data.waiting_on_ping_response = false;
  328. return AWS_OP_SUCCESS;
  329. }
  330. /* Bake up a big ol' function table just like Gramma used to make */
  331. static packet_handler_fn *s_packet_handlers[] = {
  332. [AWS_MQTT_PACKET_CONNECT] = &s_packet_handler_default,
  333. [AWS_MQTT_PACKET_CONNACK] = &s_packet_handler_connack,
  334. [AWS_MQTT_PACKET_PUBLISH] = &s_packet_handler_publish,
  335. [AWS_MQTT_PACKET_PUBACK] = &s_packet_handler_ack,
  336. [AWS_MQTT_PACKET_PUBREC] = &s_packet_handler_pubrec,
  337. [AWS_MQTT_PACKET_PUBREL] = &s_packet_handler_pubrel,
  338. [AWS_MQTT_PACKET_PUBCOMP] = &s_packet_handler_ack,
  339. [AWS_MQTT_PACKET_SUBSCRIBE] = &s_packet_handler_default,
  340. [AWS_MQTT_PACKET_SUBACK] = &s_packet_handler_suback,
  341. [AWS_MQTT_PACKET_UNSUBSCRIBE] = &s_packet_handler_default,
  342. [AWS_MQTT_PACKET_UNSUBACK] = &s_packet_handler_ack,
  343. [AWS_MQTT_PACKET_PINGREQ] = &s_packet_handler_default,
  344. [AWS_MQTT_PACKET_PINGRESP] = &s_packet_handler_pingresp,
  345. [AWS_MQTT_PACKET_DISCONNECT] = &s_packet_handler_default,
  346. };
  347. /*******************************************************************************
  348. * Channel Handler
  349. ******************************************************************************/
  350. static int s_process_mqtt_packet(
  351. struct aws_mqtt_client_connection *connection,
  352. enum aws_mqtt_packet_type packet_type,
  353. struct aws_byte_cursor packet) {
  354. { /* BEGIN CRITICAL SECTION */
  355. mqtt_connection_lock_synced_data(connection);
  356. /* [MQTT-3.2.0-1] The first packet sent from the Server to the Client MUST be a CONNACK Packet */
  357. if (connection->synced_data.state == AWS_MQTT_CLIENT_STATE_CONNECTING &&
  358. packet_type != AWS_MQTT_PACKET_CONNACK) {
  359. mqtt_connection_unlock_synced_data(connection);
  360. AWS_LOGF_ERROR(
  361. AWS_LS_MQTT_CLIENT,
  362. "id=%p: First message received from the server was not a CONNACK. Terminating connection.",
  363. (void *)connection);
  364. aws_channel_shutdown(connection->slot->channel, AWS_ERROR_MQTT_PROTOCOL_ERROR);
  365. return aws_raise_error(AWS_ERROR_MQTT_PROTOCOL_ERROR);
  366. }
  367. mqtt_connection_unlock_synced_data(connection);
  368. } /* END CRITICAL SECTION */
  369. if (AWS_UNLIKELY(packet_type > AWS_MQTT_PACKET_DISCONNECT || packet_type < AWS_MQTT_PACKET_CONNECT)) {
  370. AWS_LOGF_ERROR(
  371. AWS_LS_MQTT_CLIENT,
  372. "id=%p: Invalid packet type received %d. Terminating connection.",
  373. (void *)connection,
  374. packet_type);
  375. return aws_raise_error(AWS_ERROR_MQTT_INVALID_PACKET_TYPE);
  376. }
  377. /* Handle the packet */
  378. return s_packet_handlers[packet_type](connection, packet);
  379. }
  380. /**
  381. * Handles incoming messages from the server.
  382. */
  383. static int s_process_read_message(
  384. struct aws_channel_handler *handler,
  385. struct aws_channel_slot *slot,
  386. struct aws_io_message *message) {
  387. struct aws_mqtt_client_connection *connection = handler->impl;
  388. if (message->message_type != AWS_IO_MESSAGE_APPLICATION_DATA || message->message_data.len < 1) {
  389. return AWS_OP_ERR;
  390. }
  391. AWS_LOGF_TRACE(
  392. AWS_LS_MQTT_CLIENT,
  393. "id=%p: precessing read message of size %zu",
  394. (void *)connection,
  395. message->message_data.len);
  396. /* This cursor will be updated as we read through the message. */
  397. struct aws_byte_cursor message_cursor = aws_byte_cursor_from_buf(&message->message_data);
  398. /* If there's pending packet left over from last time, attempt to complete it. */
  399. if (connection->thread_data.pending_packet.len) {
  400. int result = AWS_OP_SUCCESS;
  401. /* This determines how much to read from the message (min(expected_remaining, message.len)) */
  402. size_t to_read = connection->thread_data.pending_packet.capacity - connection->thread_data.pending_packet.len;
  403. /* This will be set to false if this message still won't complete the packet object. */
  404. bool packet_complete = true;
  405. if (to_read > message_cursor.len) {
  406. to_read = message_cursor.len;
  407. packet_complete = false;
  408. }
  409. /* Write the chunk to the buffer.
  410. * This will either complete the packet, or be the entirety of message if more data is required. */
  411. struct aws_byte_cursor chunk = aws_byte_cursor_advance(&message_cursor, to_read);
  412. AWS_ASSERT(chunk.ptr); /* Guaranteed to be in bounds */
  413. result = (int)aws_byte_buf_write_from_whole_cursor(&connection->thread_data.pending_packet, chunk) - 1;
  414. if (result) {
  415. goto handle_error;
  416. }
  417. /* If the packet is still incomplete, don't do anything with the data. */
  418. if (!packet_complete) {
  419. AWS_LOGF_TRACE(
  420. AWS_LS_MQTT_CLIENT,
  421. "id=%p: partial message is still incomplete, waiting on another read.",
  422. (void *)connection);
  423. goto cleanup;
  424. }
  425. /* Handle the completed pending packet */
  426. struct aws_byte_cursor packet_data = aws_byte_cursor_from_buf(&connection->thread_data.pending_packet);
  427. AWS_LOGF_TRACE(AWS_LS_MQTT_CLIENT, "id=%p: full mqtt packet re-assembled, dispatching.", (void *)connection);
  428. result = s_process_mqtt_packet(connection, aws_mqtt_get_packet_type(packet_data.ptr), packet_data);
  429. handle_error:
  430. /* Clean up the pending packet */
  431. aws_byte_buf_clean_up(&connection->thread_data.pending_packet);
  432. AWS_ZERO_STRUCT(connection->thread_data.pending_packet);
  433. if (result) {
  434. return AWS_OP_ERR;
  435. }
  436. }
  437. while (message_cursor.len) {
  438. /* Temp byte cursor so we can decode the header without advancing message_cursor. */
  439. struct aws_byte_cursor header_decode = message_cursor;
  440. struct aws_mqtt_fixed_header packet_header;
  441. AWS_ZERO_STRUCT(packet_header);
  442. int result = aws_mqtt_fixed_header_decode(&header_decode, &packet_header);
  443. /* Calculate how much data was read. */
  444. const size_t fixed_header_size = message_cursor.len - header_decode.len;
  445. if (result) {
  446. if (aws_last_error() == AWS_ERROR_SHORT_BUFFER) {
  447. /* Message data too short, store data and come back later. */
  448. AWS_LOGF_TRACE(
  449. AWS_LS_MQTT_CLIENT, "id=%p: message is incomplete, waiting on another read.", (void *)connection);
  450. if (aws_byte_buf_init(
  451. &connection->thread_data.pending_packet,
  452. connection->allocator,
  453. fixed_header_size + packet_header.remaining_length)) {
  454. return AWS_OP_ERR;
  455. }
  456. /* Write the partial packet. */
  457. if (!aws_byte_buf_write_from_whole_cursor(&connection->thread_data.pending_packet, message_cursor)) {
  458. aws_byte_buf_clean_up(&connection->thread_data.pending_packet);
  459. return AWS_OP_ERR;
  460. }
  461. aws_reset_error();
  462. goto cleanup;
  463. } else {
  464. return AWS_OP_ERR;
  465. }
  466. }
  467. struct aws_byte_cursor packet_data =
  468. aws_byte_cursor_advance(&message_cursor, fixed_header_size + packet_header.remaining_length);
  469. AWS_LOGF_TRACE(AWS_LS_MQTT_CLIENT, "id=%p: full mqtt packet read, dispatching.", (void *)connection);
  470. s_process_mqtt_packet(connection, packet_header.packet_type, packet_data);
  471. }
  472. cleanup:
  473. /* Do cleanup */
  474. aws_channel_slot_increment_read_window(slot, message->message_data.len);
  475. aws_mem_release(message->allocator, message);
  476. return AWS_OP_SUCCESS;
  477. }
  478. static int s_shutdown(
  479. struct aws_channel_handler *handler,
  480. struct aws_channel_slot *slot,
  481. enum aws_channel_direction dir,
  482. int error_code,
  483. bool free_scarce_resources_immediately) {
  484. struct aws_mqtt_client_connection *connection = handler->impl;
  485. if (dir == AWS_CHANNEL_DIR_WRITE) {
  486. /* On closing write direction, send out disconnect packet before closing connection. */
  487. if (!free_scarce_resources_immediately) {
  488. if (error_code == AWS_OP_SUCCESS) {
  489. AWS_LOGF_INFO(
  490. AWS_LS_MQTT_CLIENT,
  491. "id=%p: sending disconnect message as part of graceful shutdown.",
  492. (void *)connection);
  493. /* On clean shutdown, send the disconnect message */
  494. struct aws_mqtt_packet_connection disconnect;
  495. aws_mqtt_packet_disconnect_init(&disconnect);
  496. struct aws_io_message *message = mqtt_get_message_for_packet(connection, &disconnect.fixed_header);
  497. if (!message) {
  498. goto done;
  499. }
  500. if (aws_mqtt_packet_connection_encode(&message->message_data, &disconnect)) {
  501. AWS_LOGF_DEBUG(
  502. AWS_LS_MQTT_CLIENT,
  503. "id=%p: failed to encode courteous disconnect io message",
  504. (void *)connection);
  505. aws_mem_release(message->allocator, message);
  506. goto done;
  507. }
  508. if (aws_channel_slot_send_message(slot, message, AWS_CHANNEL_DIR_WRITE)) {
  509. AWS_LOGF_DEBUG(
  510. AWS_LS_MQTT_CLIENT,
  511. "id=%p: failed to send courteous disconnect io message",
  512. (void *)connection);
  513. aws_mem_release(message->allocator, message);
  514. goto done;
  515. }
  516. }
  517. }
  518. }
  519. done:
  520. return aws_channel_slot_on_handler_shutdown_complete(slot, dir, error_code, free_scarce_resources_immediately);
  521. }
  522. static size_t s_initial_window_size(struct aws_channel_handler *handler) {
  523. (void)handler;
  524. return SIZE_MAX;
  525. }
  526. static void s_destroy(struct aws_channel_handler *handler) {
  527. struct aws_mqtt_client_connection *connection = handler->impl;
  528. (void)connection;
  529. }
  530. static size_t s_message_overhead(struct aws_channel_handler *handler) {
  531. (void)handler;
  532. return 0;
  533. }
  534. struct aws_channel_handler_vtable *aws_mqtt_get_client_channel_vtable(void) {
  535. static struct aws_channel_handler_vtable s_vtable = {
  536. .process_read_message = &s_process_read_message,
  537. .process_write_message = NULL,
  538. .increment_read_window = NULL,
  539. .shutdown = &s_shutdown,
  540. .initial_window_size = &s_initial_window_size,
  541. .message_overhead = &s_message_overhead,
  542. .destroy = &s_destroy,
  543. };
  544. return &s_vtable;
  545. }
  546. /*******************************************************************************
  547. * Helpers
  548. ******************************************************************************/
  549. struct aws_io_message *mqtt_get_message_for_packet(
  550. struct aws_mqtt_client_connection *connection,
  551. struct aws_mqtt_fixed_header *header) {
  552. const size_t required_length = 3 + header->remaining_length;
  553. struct aws_io_message *message = aws_channel_acquire_message_from_pool(
  554. connection->slot->channel, AWS_IO_MESSAGE_APPLICATION_DATA, required_length);
  555. AWS_LOGF_TRACE(
  556. AWS_LS_MQTT_CLIENT,
  557. "id=%p: Acquiring memory from pool of required_length %zu",
  558. (void *)connection,
  559. required_length);
  560. return message;
  561. }
  562. /*******************************************************************************
  563. * Requests
  564. ******************************************************************************/
  565. /* Send the request */
  566. static void s_request_outgoing_task(struct aws_channel_task *task, void *arg, enum aws_task_status status) {
  567. struct aws_mqtt_request *request = arg;
  568. struct aws_mqtt_client_connection *connection = request->connection;
  569. if (status == AWS_TASK_STATUS_CANCELED) {
  570. /* Connection lost before the request ever get send, check the request needs to be retried or not */
  571. if (request->retryable) {
  572. AWS_LOGF_DEBUG(
  573. AWS_LS_MQTT_CLIENT,
  574. "static: task id %p, was canceled due to the channel shutting down. Request for packet id "
  575. "%" PRIu16 ". will be retried",
  576. (void *)task,
  577. request->packet_id);
  578. /* put it into the offline queue. */
  579. { /* BEGIN CRITICAL SECTION */
  580. mqtt_connection_lock_synced_data(connection);
  581. /* Set the status as incomplete */
  582. aws_mqtt_connection_statistics_change_operation_statistic_state(
  583. connection, request, AWS_MQTT_OSS_INCOMPLETE);
  584. aws_linked_list_push_back(&connection->synced_data.pending_requests_list, &request->list_node);
  585. mqtt_connection_unlock_synced_data(connection);
  586. } /* END CRITICAL SECTION */
  587. } else {
  588. AWS_LOGF_DEBUG(
  589. AWS_LS_MQTT_CLIENT,
  590. "static: task id %p, was canceled due to the channel shutting down. Request for packet id "
  591. "%" PRIu16 ". will NOT be retried, will be cancelled",
  592. (void *)task,
  593. request->packet_id);
  594. /* Fire the callback and clean up the memory, as the connection get destroyed. */
  595. if (request->on_complete) {
  596. request->on_complete(
  597. connection, request->packet_id, AWS_ERROR_MQTT_NOT_CONNECTED, request->on_complete_ud);
  598. }
  599. { /* BEGIN CRITICAL SECTION */
  600. mqtt_connection_lock_synced_data(connection);
  601. /* Cancel the request in the operation statistics */
  602. aws_mqtt_connection_statistics_change_operation_statistic_state(connection, request, AWS_MQTT_OSS_NONE);
  603. aws_hash_table_remove(
  604. &connection->synced_data.outstanding_requests_table, &request->packet_id, NULL, NULL);
  605. aws_memory_pool_release(&connection->synced_data.requests_pool, request);
  606. mqtt_connection_unlock_synced_data(connection);
  607. } /* END CRITICAL SECTION */
  608. }
  609. return;
  610. }
  611. /* Send the request */
  612. enum aws_mqtt_client_request_state state =
  613. request->send_request(request->packet_id, !request->initiated, request->send_request_ud);
  614. request->initiated = true;
  615. int error_code = AWS_ERROR_SUCCESS;
  616. switch (state) {
  617. case AWS_MQTT_CLIENT_REQUEST_ERROR:
  618. error_code = aws_last_error();
  619. AWS_LOGF_ERROR(
  620. AWS_LS_MQTT_CLIENT,
  621. "id=%p: sending request %" PRIu16 " failed with error %d.",
  622. (void *)request->connection,
  623. request->packet_id,
  624. error_code);
  625. /* fall-thru */
  626. case AWS_MQTT_CLIENT_REQUEST_COMPLETE:
  627. AWS_LOGF_TRACE(
  628. AWS_LS_MQTT_CLIENT,
  629. "id=%p: sending request %" PRIu16 " complete, invoking on_complete callback.",
  630. (void *)request->connection,
  631. request->packet_id);
  632. /* If the send_request function reports the request is complete,
  633. * remove from the hash table and call the callback. */
  634. if (request->on_complete) {
  635. request->on_complete(connection, request->packet_id, error_code, request->on_complete_ud);
  636. }
  637. { /* BEGIN CRITICAL SECTION */
  638. mqtt_connection_lock_synced_data(connection);
  639. /* Set the request as complete in the operation statistics */
  640. aws_mqtt_connection_statistics_change_operation_statistic_state(
  641. request->connection, request, AWS_MQTT_OSS_NONE);
  642. aws_hash_table_remove(
  643. &connection->synced_data.outstanding_requests_table, &request->packet_id, NULL, NULL);
  644. aws_memory_pool_release(&connection->synced_data.requests_pool, request);
  645. mqtt_connection_unlock_synced_data(connection);
  646. } /* END CRITICAL SECTION */
  647. break;
  648. case AWS_MQTT_CLIENT_REQUEST_ONGOING:
  649. AWS_LOGF_TRACE(
  650. AWS_LS_MQTT_CLIENT,
  651. "id=%p: request %" PRIu16 " sent, but waiting on an acknowledgement from peer.",
  652. (void *)request->connection,
  653. request->packet_id);
  654. { /* BEGIN CRITICAL SECTION */
  655. mqtt_connection_lock_synced_data(connection);
  656. /* Set the request as incomplete and un-acked in the operation statistics */
  657. aws_mqtt_connection_statistics_change_operation_statistic_state(
  658. request->connection, request, AWS_MQTT_OSS_INCOMPLETE | AWS_MQTT_OSS_UNACKED);
  659. mqtt_connection_unlock_synced_data(connection);
  660. } /* END CRITICAL SECTION */
  661. /* Put the request into the ongoing list */
  662. aws_linked_list_push_back(&connection->thread_data.ongoing_requests_list, &request->list_node);
  663. break;
  664. }
  665. }
  666. uint16_t mqtt_create_request(
  667. struct aws_mqtt_client_connection *connection,
  668. aws_mqtt_send_request_fn *send_request,
  669. void *send_request_ud,
  670. aws_mqtt_op_complete_fn *on_complete,
  671. void *on_complete_ud,
  672. bool noRetry,
  673. uint64_t packet_size) {
  674. AWS_ASSERT(connection);
  675. AWS_ASSERT(send_request);
  676. struct aws_mqtt_request *next_request = NULL;
  677. bool should_schedule_task = false;
  678. struct aws_channel *channel = NULL;
  679. { /* BEGIN CRITICAL SECTION */
  680. mqtt_connection_lock_synced_data(connection);
  681. if (connection->synced_data.state == AWS_MQTT_CLIENT_STATE_DISCONNECTING) {
  682. mqtt_connection_unlock_synced_data(connection);
  683. /* User requested disconnecting, ensure no new requests are made until the channel finished shutting
  684. * down. */
  685. AWS_LOGF_ERROR(
  686. AWS_LS_MQTT_CLIENT,
  687. "id=%p: Disconnect requested, stop creating any new request until disconnect process finishes.",
  688. (void *)connection);
  689. aws_raise_error(AWS_ERROR_MQTT_CONNECTION_DISCONNECTING);
  690. return 0;
  691. }
  692. if (noRetry && connection->synced_data.state != AWS_MQTT_CLIENT_STATE_CONNECTED) {
  693. mqtt_connection_unlock_synced_data(connection);
  694. /* Not offline queueing QoS 0 publish or PINGREQ. Fail the call. */
  695. AWS_LOGF_DEBUG(
  696. AWS_LS_MQTT_CLIENT,
  697. "id=%p: Not currently connected. No offline queueing for QoS 0 publish or pingreq.",
  698. (void *)connection);
  699. aws_raise_error(AWS_ERROR_MQTT_NOT_CONNECTED);
  700. return 0;
  701. }
  702. /**
  703. * Find a free packet ID.
  704. * QoS 0 PUBLISH packets don't actually need an ID on the wire,
  705. * but we assign them internally anyway just so everything has a unique ID.
  706. *
  707. * Yes, this is an O(N) search.
  708. * We remember the last ID we assigned, so it's O(1) in the common case.
  709. * But it's theoretically possible to reach O(N) where N is just above 64000
  710. * if the user is letting a ton of un-ack'd messages queue up
  711. */
  712. uint16_t search_start = connection->synced_data.packet_id;
  713. struct aws_hash_element *elem = NULL;
  714. while (true) {
  715. /* Increment ID, watch out for overflow, ID cannot be 0 */
  716. if (connection->synced_data.packet_id == UINT16_MAX) {
  717. connection->synced_data.packet_id = 1;
  718. } else {
  719. connection->synced_data.packet_id++;
  720. }
  721. /* Is there already an outstanding request using this ID? */
  722. aws_hash_table_find(
  723. &connection->synced_data.outstanding_requests_table, &connection->synced_data.packet_id, &elem);
  724. if (elem == NULL) {
  725. /* Found a free ID! Break out of loop */
  726. break;
  727. } else if (connection->synced_data.packet_id == search_start) {
  728. /* Every ID is taken */
  729. mqtt_connection_unlock_synced_data(connection);
  730. AWS_LOGF_ERROR(
  731. AWS_LS_MQTT_CLIENT,
  732. "id=%p: Queue is full. No more packet IDs are available at this time.",
  733. (void *)connection);
  734. aws_raise_error(AWS_ERROR_MQTT_QUEUE_FULL);
  735. return 0;
  736. }
  737. }
  738. next_request = aws_memory_pool_acquire(&connection->synced_data.requests_pool);
  739. if (!next_request) {
  740. mqtt_connection_unlock_synced_data(connection);
  741. return 0;
  742. }
  743. memset(next_request, 0, sizeof(struct aws_mqtt_request));
  744. next_request->packet_id = connection->synced_data.packet_id;
  745. if (aws_hash_table_put(
  746. &connection->synced_data.outstanding_requests_table, &next_request->packet_id, next_request, NULL)) {
  747. /* failed to put the next request into the table */
  748. aws_memory_pool_release(&connection->synced_data.requests_pool, next_request);
  749. mqtt_connection_unlock_synced_data(connection);
  750. return 0;
  751. }
  752. /* Store the request by packet_id */
  753. next_request->allocator = connection->allocator;
  754. next_request->connection = connection;
  755. next_request->initiated = false;
  756. next_request->retryable = !noRetry;
  757. next_request->send_request = send_request;
  758. next_request->send_request_ud = send_request_ud;
  759. next_request->on_complete = on_complete;
  760. next_request->on_complete_ud = on_complete_ud;
  761. next_request->packet_size = packet_size;
  762. aws_channel_task_init(
  763. &next_request->outgoing_task, s_request_outgoing_task, next_request, "mqtt_outgoing_request_task");
  764. if (connection->synced_data.state != AWS_MQTT_CLIENT_STATE_CONNECTED) {
  765. aws_linked_list_push_back(&connection->synced_data.pending_requests_list, &next_request->list_node);
  766. } else {
  767. AWS_ASSERT(connection->slot);
  768. AWS_ASSERT(connection->slot->channel);
  769. should_schedule_task = true;
  770. channel = connection->slot->channel;
  771. /* keep the channel alive until the task is scheduled */
  772. aws_channel_acquire_hold(channel);
  773. }
  774. if (next_request && next_request->packet_size > 0) {
  775. /* Set the status as incomplete */
  776. aws_mqtt_connection_statistics_change_operation_statistic_state(
  777. next_request->connection, next_request, AWS_MQTT_OSS_INCOMPLETE);
  778. }
  779. mqtt_connection_unlock_synced_data(connection);
  780. } /* END CRITICAL SECTION */
  781. if (should_schedule_task) {
  782. AWS_LOGF_TRACE(
  783. AWS_LS_MQTT_CLIENT,
  784. "id=%p: Currently not in the event-loop thread, scheduling a task to send message id %" PRIu16 ".",
  785. (void *)connection,
  786. next_request->packet_id);
  787. aws_channel_schedule_task_now(channel, &next_request->outgoing_task);
  788. /* release the refcount we hold with the protection of lock */
  789. aws_channel_release_hold(channel);
  790. }
  791. return next_request->packet_id;
  792. }
  793. void mqtt_request_complete(struct aws_mqtt_client_connection *connection, int error_code, uint16_t packet_id) {
  794. AWS_LOGF_TRACE(
  795. AWS_LS_MQTT_CLIENT,
  796. "id=%p: message id %" PRIu16 " completed with error code %d, removing from outstanding requests list.",
  797. (void *)connection,
  798. packet_id,
  799. error_code);
  800. bool found_request = false;
  801. aws_mqtt_op_complete_fn *on_complete = NULL;
  802. void *on_complete_ud = NULL;
  803. { /* BEGIN CRITICAL SECTION */
  804. mqtt_connection_lock_synced_data(connection);
  805. struct aws_hash_element *elem = NULL;
  806. aws_hash_table_find(&connection->synced_data.outstanding_requests_table, &packet_id, &elem);
  807. if (elem != NULL) {
  808. found_request = true;
  809. struct aws_mqtt_request *request = elem->value;
  810. on_complete = request->on_complete;
  811. on_complete_ud = request->on_complete_ud;
  812. /* Set the status as complete */
  813. aws_mqtt_connection_statistics_change_operation_statistic_state(
  814. request->connection, request, AWS_MQTT_OSS_NONE);
  815. /* clean up request resources */
  816. aws_hash_table_remove_element(&connection->synced_data.outstanding_requests_table, elem);
  817. /* remove the request from the list, which is thread_data.ongoing_requests_list */
  818. aws_linked_list_remove(&request->list_node);
  819. aws_memory_pool_release(&connection->synced_data.requests_pool, request);
  820. }
  821. mqtt_connection_unlock_synced_data(connection);
  822. } /* END CRITICAL SECTION */
  823. if (!found_request) {
  824. AWS_LOGF_DEBUG(
  825. AWS_LS_MQTT_CLIENT,
  826. "id=%p: received completion for message id %" PRIu16
  827. " but no outstanding request exists. Assuming this is an ack of a resend when the first request has "
  828. "already completed.",
  829. (void *)connection,
  830. packet_id);
  831. return;
  832. }
  833. /* Invoke the complete callback. */
  834. if (on_complete) {
  835. on_complete(connection, packet_id, error_code, on_complete_ud);
  836. }
  837. }
  838. struct mqtt_shutdown_task {
  839. int error_code;
  840. struct aws_channel_task task;
  841. };
  842. static void s_mqtt_disconnect_task(struct aws_channel_task *channel_task, void *arg, enum aws_task_status status) {
  843. (void)status;
  844. struct mqtt_shutdown_task *task = AWS_CONTAINER_OF(channel_task, struct mqtt_shutdown_task, task);
  845. struct aws_mqtt_client_connection *connection = arg;
  846. AWS_LOGF_TRACE(AWS_LS_MQTT_CLIENT, "id=%p: Doing disconnect", (void *)connection);
  847. { /* BEGIN CRITICAL SECTION */
  848. mqtt_connection_lock_synced_data(connection);
  849. /* If there is an outstanding reconnect task, cancel it */
  850. if (connection->synced_data.state == AWS_MQTT_CLIENT_STATE_DISCONNECTING && connection->reconnect_task) {
  851. aws_atomic_store_ptr(&connection->reconnect_task->connection_ptr, NULL);
  852. /* If the reconnect_task isn't scheduled, free it */
  853. if (connection->reconnect_task && !connection->reconnect_task->task.timestamp) {
  854. aws_mem_release(connection->reconnect_task->allocator, connection->reconnect_task);
  855. }
  856. connection->reconnect_task = NULL;
  857. }
  858. mqtt_connection_unlock_synced_data(connection);
  859. } /* END CRITICAL SECTION */
  860. if (connection->slot && connection->slot->channel) {
  861. aws_channel_shutdown(connection->slot->channel, task->error_code);
  862. }
  863. aws_mem_release(connection->allocator, task);
  864. }
  865. void mqtt_disconnect_impl(struct aws_mqtt_client_connection *connection, int error_code) {
  866. if (connection->slot) {
  867. struct mqtt_shutdown_task *shutdown_task =
  868. aws_mem_calloc(connection->allocator, 1, sizeof(struct mqtt_shutdown_task));
  869. shutdown_task->error_code = error_code;
  870. aws_channel_task_init(&shutdown_task->task, s_mqtt_disconnect_task, connection, "mqtt_disconnect");
  871. aws_channel_schedule_task_now(connection->slot->channel, &shutdown_task->task);
  872. }
  873. }