client.c 122 KB


  1. /**
  2. * Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved.
  3. * SPDX-License-Identifier: Apache-2.0.
  4. */
  5. #include <aws/mqtt/client.h>
  6. #include <aws/mqtt/private/client_impl.h>
  7. #include <aws/mqtt/private/mqtt_client_test_helper.h>
  8. #include <aws/mqtt/private/packets.h>
  9. #include <aws/mqtt/private/shared_constants.h>
  10. #include <aws/mqtt/private/topic_tree.h>
  11. #include <aws/http/proxy.h>
  12. #include <aws/io/channel_bootstrap.h>
  13. #include <aws/io/event_loop.h>
  14. #include <aws/io/socket.h>
  15. #include <aws/io/tls_channel_handler.h>
  16. #include <aws/io/uri.h>
  17. #include <aws/common/clock.h>
  18. #include <aws/common/task_scheduler.h>
  19. #include <inttypes.h>
  20. #ifdef AWS_MQTT_WITH_WEBSOCKETS
  21. # include <aws/http/request_response.h>
  22. # include <aws/http/websocket.h>
  23. #endif
  24. #ifdef _MSC_VER
  25. # pragma warning(disable : 4204)
  26. #endif
  27. /* 3 seconds */
  28. static const uint64_t s_default_ping_timeout_ns = 3000000000;
  29. /* 20 minutes - This is the default (and max) for AWS IoT as of 2020.02.18 */
  30. static const uint16_t s_default_keep_alive_sec = 1200;
  31. static int s_mqtt_client_connect(
  32. struct aws_mqtt_client_connection *connection,
  33. aws_mqtt_client_on_connection_complete_fn *on_connection_complete,
  34. void *userdata);
  35. /*******************************************************************************
  36. * Helper functions
  37. ******************************************************************************/
  38. void mqtt_connection_lock_synced_data(struct aws_mqtt_client_connection *connection) {
  39. int err = aws_mutex_lock(&connection->synced_data.lock);
  40. AWS_ASSERT(!err);
  41. (void)err;
  42. }
  43. void mqtt_connection_unlock_synced_data(struct aws_mqtt_client_connection *connection) {
  44. ASSERT_SYNCED_DATA_LOCK_HELD(connection);
  45. int err = aws_mutex_unlock(&connection->synced_data.lock);
  46. AWS_ASSERT(!err);
  47. (void)err;
  48. }
  49. static void s_aws_mqtt_schedule_reconnect_task(struct aws_mqtt_client_connection *connection) {
  50. uint64_t next_attempt_ns = 0;
  51. aws_high_res_clock_get_ticks(&next_attempt_ns);
  52. next_attempt_ns += aws_timestamp_convert(
  53. connection->reconnect_timeouts.current_sec, AWS_TIMESTAMP_SECS, AWS_TIMESTAMP_NANOS, NULL);
  54. aws_event_loop_schedule_task_future(connection->loop, &connection->reconnect_task->task, next_attempt_ns);
  55. AWS_LOGF_TRACE(
  56. AWS_LS_MQTT_CLIENT,
  57. "id=%p: Scheduling reconnect, for %" PRIu64 " on event-loop %p",
  58. (void *)connection,
  59. next_attempt_ns,
  60. (void *)connection->loop);
  61. }
  62. static void s_aws_mqtt_client_destroy(struct aws_mqtt_client *client) {
  63. AWS_LOGF_DEBUG(AWS_LS_MQTT_CLIENT, "client=%p: Cleaning up MQTT client", (void *)client);
  64. aws_client_bootstrap_release(client->bootstrap);
  65. aws_mem_release(client->allocator, client);
  66. }
  67. void mqtt_connection_set_state(
  68. struct aws_mqtt_client_connection *connection,
  69. enum aws_mqtt_client_connection_state state) {
  70. ASSERT_SYNCED_DATA_LOCK_HELD(connection);
  71. if (connection->synced_data.state == state) {
  72. AWS_LOGF_DEBUG(AWS_LS_MQTT_CLIENT, "id=%p: MQTT connection already in state %d", (void *)connection, state);
  73. return;
  74. }
  75. connection->synced_data.state = state;
  76. }
  77. struct request_timeout_wrapper;
  78. /* used for timeout task */
  79. struct request_timeout_task_arg {
  80. uint16_t packet_id;
  81. struct aws_mqtt_client_connection *connection;
  82. struct request_timeout_wrapper *task_arg_wrapper;
  83. };
  84. /*
  85. * We want the timeout task to be able to destroy the forward reference from the operation's task arg structure
  86. * to the timeout task. But the operation task arg structures don't have any data structure in common. So to allow
  87. * the timeout to refer back to a zero-able forward pointer, we wrap a pointer to the timeout task and embed it
  88. * in every operation's task arg that needs to create a timeout.
  89. */
  90. struct request_timeout_wrapper {
  91. struct request_timeout_task_arg *timeout_task_arg;
  92. };
  93. static void s_request_timeout(struct aws_channel_task *channel_task, void *arg, enum aws_task_status status) {
  94. (void)channel_task;
  95. struct request_timeout_task_arg *timeout_task_arg = arg;
  96. struct aws_mqtt_client_connection *connection = timeout_task_arg->connection;
  97. if (status == AWS_TASK_STATUS_RUN_READY) {
  98. if (timeout_task_arg->task_arg_wrapper != NULL) {
  99. mqtt_request_complete(connection, AWS_ERROR_MQTT_TIMEOUT, timeout_task_arg->packet_id);
  100. }
  101. }
  102. /*
  103. * Whether cancelled or run, if we have a back pointer to the operation's task arg, we must zero it out
  104. * so that when it completes it does not try to cancel us, because we will already be freed.
  105. *
  106. * If we don't have a back pointer to the operation's task arg, that means it already ran and completed.
  107. */
  108. if (timeout_task_arg->task_arg_wrapper != NULL) {
  109. timeout_task_arg->task_arg_wrapper->timeout_task_arg = NULL;
  110. timeout_task_arg->task_arg_wrapper = NULL;
  111. }
  112. aws_mem_release(connection->allocator, timeout_task_arg);
  113. }
  114. static struct request_timeout_task_arg *s_schedule_timeout_task(
  115. struct aws_mqtt_client_connection *connection,
  116. uint16_t packet_id) {
  117. /* schedule a timeout task to run, in case server consider the publish is not received */
  118. struct aws_channel_task *request_timeout_task = NULL;
  119. struct request_timeout_task_arg *timeout_task_arg = NULL;
  120. if (!aws_mem_acquire_many(
  121. connection->allocator,
  122. 2,
  123. &timeout_task_arg,
  124. sizeof(struct request_timeout_task_arg),
  125. &request_timeout_task,
  126. sizeof(struct aws_channel_task))) {
  127. return NULL;
  128. }
  129. aws_channel_task_init(request_timeout_task, s_request_timeout, timeout_task_arg, "mqtt_request_timeout");
  130. AWS_ZERO_STRUCT(*timeout_task_arg);
  131. timeout_task_arg->connection = connection;
  132. timeout_task_arg->packet_id = packet_id;
  133. uint64_t timestamp = 0;
  134. if (aws_channel_current_clock_time(connection->slot->channel, &timestamp)) {
  135. aws_mem_release(connection->allocator, timeout_task_arg);
  136. return NULL;
  137. }
  138. timestamp = aws_add_u64_saturating(timestamp, connection->operation_timeout_ns);
  139. aws_channel_schedule_task_future(connection->slot->channel, request_timeout_task, timestamp);
  140. return timeout_task_arg;
  141. }
  142. static void s_init_statistics(struct aws_mqtt_connection_operation_statistics_impl *stats) {
  143. aws_atomic_store_int(&stats->incomplete_operation_count_atomic, 0);
  144. aws_atomic_store_int(&stats->incomplete_operation_size_atomic, 0);
  145. aws_atomic_store_int(&stats->unacked_operation_count_atomic, 0);
  146. aws_atomic_store_int(&stats->unacked_operation_size_atomic, 0);
  147. }
  148. /*******************************************************************************
  149. * Client Init
  150. ******************************************************************************/
  151. struct aws_mqtt_client *aws_mqtt_client_new(struct aws_allocator *allocator, struct aws_client_bootstrap *bootstrap) {
  152. aws_mqtt_fatal_assert_library_initialized();
  153. struct aws_mqtt_client *client = aws_mem_calloc(allocator, 1, sizeof(struct aws_mqtt_client));
  154. if (client == NULL) {
  155. return NULL;
  156. }
  157. AWS_LOGF_DEBUG(AWS_LS_MQTT_CLIENT, "client=%p: Initalizing MQTT client", (void *)client);
  158. client->allocator = allocator;
  159. client->bootstrap = aws_client_bootstrap_acquire(bootstrap);
  160. aws_ref_count_init(&client->ref_count, client, (aws_simple_completion_callback *)s_aws_mqtt_client_destroy);
  161. return client;
  162. }
  163. struct aws_mqtt_client *aws_mqtt_client_acquire(struct aws_mqtt_client *client) {
  164. if (client != NULL) {
  165. aws_ref_count_acquire(&client->ref_count);
  166. }
  167. return client;
  168. }
  169. void aws_mqtt_client_release(struct aws_mqtt_client *client) {
  170. if (client != NULL) {
  171. aws_ref_count_release(&client->ref_count);
  172. }
  173. }
  174. #define AWS_RESET_RECONNECT_BACKOFF_DELAY_SECONDS 10
  175. /* At this point, the channel for the MQTT connection has completed its shutdown */
  176. static void s_mqtt_client_shutdown(
  177. struct aws_client_bootstrap *bootstrap,
  178. int error_code,
  179. struct aws_channel *channel,
  180. void *user_data) {
  181. (void)bootstrap;
  182. (void)channel;
  183. struct aws_mqtt_client_connection *connection = user_data;
  184. AWS_LOGF_TRACE(
  185. AWS_LS_MQTT_CLIENT, "id=%p: Channel has been shutdown with error code %d", (void *)connection, error_code);
  186. enum aws_mqtt_client_connection_state prev_state;
  187. struct aws_linked_list cancelling_requests;
  188. aws_linked_list_init(&cancelling_requests);
  189. bool disconnected_state = false;
  190. { /* BEGIN CRITICAL SECTION */
  191. mqtt_connection_lock_synced_data(connection);
  192. /*
  193. * On a channel that represents a valid connection (successful connack received),
  194. * channel_successful_connack_timestamp_ns will be the time the connack was received. Otherwise it will be
  195. * zero.
  196. *
  197. * Use that fact to determine whether or not we should reset the current reconnect backoff delay.
  198. *
  199. * We reset the reconnect backoff if either of:
  200. * 1) the user called disconnect()
  201. * 2) a successful connection had lasted longer than our minimum reset time (10s at the moment)
  202. */
  203. uint64_t now = 0;
  204. aws_high_res_clock_get_ticks(&now);
  205. uint64_t time_diff = now - connection->reconnect_timeouts.channel_successful_connack_timestamp_ns;
  206. bool was_user_disconnect = connection->synced_data.state == AWS_MQTT_CLIENT_STATE_DISCONNECTING;
  207. bool was_sufficiently_long_connection =
  208. (connection->reconnect_timeouts.channel_successful_connack_timestamp_ns != 0) &&
  209. (time_diff >=
  210. aws_timestamp_convert(
  211. AWS_RESET_RECONNECT_BACKOFF_DELAY_SECONDS, AWS_TIMESTAMP_SECS, AWS_TIMESTAMP_NANOS, NULL));
  212. if (was_user_disconnect || was_sufficiently_long_connection) {
  213. connection->reconnect_timeouts.current_sec = connection->reconnect_timeouts.min_sec;
  214. }
  215. connection->reconnect_timeouts.channel_successful_connack_timestamp_ns = 0;
  216. /* Move all the ongoing requests to the pending requests list, because the response they are waiting for will
  217. * never arrives. Sad. But, we will retry. */
  218. if (connection->clean_session) {
  219. /* For a clean session, the Session lasts as long as the Network Connection. Thus, discard the previous
  220. * session */
  221. AWS_LOGF_TRACE(
  222. AWS_LS_MQTT_CLIENT,
  223. "id=%p: Discard ongoing requests and pending requests when a clean session connection lost.",
  224. (void *)connection);
  225. aws_linked_list_move_all_back(&cancelling_requests, &connection->thread_data.ongoing_requests_list);
  226. aws_linked_list_move_all_back(&cancelling_requests, &connection->synced_data.pending_requests_list);
  227. } else {
  228. aws_linked_list_move_all_back(
  229. &connection->synced_data.pending_requests_list, &connection->thread_data.ongoing_requests_list);
  230. AWS_LOGF_TRACE(
  231. AWS_LS_MQTT_CLIENT,
  232. "id=%p: All subscribe/unsubscribe and publish QoS>0 have been move to pending list",
  233. (void *)connection);
  234. }
  235. prev_state = connection->synced_data.state;
  236. switch (connection->synced_data.state) {
  237. case AWS_MQTT_CLIENT_STATE_CONNECTED:
  238. /* unexpected hangup from broker, try to reconnect */
  239. mqtt_connection_set_state(connection, AWS_MQTT_CLIENT_STATE_RECONNECTING);
  240. AWS_LOGF_DEBUG(
  241. AWS_LS_MQTT_CLIENT,
  242. "id=%p: connection was unexpected interrupted, switch state to RECONNECTING.",
  243. (void *)connection);
  244. break;
  245. case AWS_MQTT_CLIENT_STATE_DISCONNECTING:
  246. /* disconnect requested by user */
  247. /* Successfully shutdown, if cleansession is set, ongoing and pending requests will be cleared */
  248. disconnected_state = true;
  249. AWS_LOGF_DEBUG(
  250. AWS_LS_MQTT_CLIENT,
  251. "id=%p: disconnect finished, switch state to DISCONNECTED.",
  252. (void *)connection);
  253. break;
  254. case AWS_MQTT_CLIENT_STATE_CONNECTING:
  255. /* failed to connect */
  256. disconnected_state = true;
  257. break;
  258. case AWS_MQTT_CLIENT_STATE_RECONNECTING:
  259. /* reconnect failed, schedule the next attempt later, no need to change the state. */
  260. break;
  261. default:
  262. /* AWS_MQTT_CLIENT_STATE_DISCONNECTED */
  263. break;
  264. }
  265. AWS_LOGF_TRACE(
  266. AWS_LS_MQTT_CLIENT, "id=%p: current state is %d", (void *)connection, (int)connection->synced_data.state);
  267. /* Always clear slot, as that's what's been shutdown */
  268. if (connection->slot) {
  269. aws_channel_slot_remove(connection->slot);
  270. AWS_LOGF_TRACE(AWS_LS_MQTT_CLIENT, "id=%p: slot is removed successfully", (void *)connection);
  271. connection->slot = NULL;
  272. }
  273. mqtt_connection_unlock_synced_data(connection);
  274. } /* END CRITICAL SECTION */
  275. if (!aws_linked_list_empty(&cancelling_requests)) {
  276. struct aws_linked_list_node *current = aws_linked_list_front(&cancelling_requests);
  277. const struct aws_linked_list_node *end = aws_linked_list_end(&cancelling_requests);
  278. while (current != end) {
  279. struct aws_mqtt_request *request = AWS_CONTAINER_OF(current, struct aws_mqtt_request, list_node);
  280. if (request->on_complete) {
  281. request->on_complete(
  282. connection,
  283. request->packet_id,
  284. AWS_ERROR_MQTT_CANCELLED_FOR_CLEAN_SESSION,
  285. request->on_complete_ud);
  286. }
  287. current = current->next;
  288. }
  289. { /* BEGIN CRITICAL SECTION */
  290. mqtt_connection_lock_synced_data(connection);
  291. while (!aws_linked_list_empty(&cancelling_requests)) {
  292. struct aws_linked_list_node *node = aws_linked_list_pop_front(&cancelling_requests);
  293. struct aws_mqtt_request *request = AWS_CONTAINER_OF(node, struct aws_mqtt_request, list_node);
  294. aws_hash_table_remove(
  295. &connection->synced_data.outstanding_requests_table, &request->packet_id, NULL, NULL);
  296. aws_memory_pool_release(&connection->synced_data.requests_pool, request);
  297. }
  298. mqtt_connection_unlock_synced_data(connection);
  299. } /* END CRITICAL SECTION */
  300. }
  301. /* If there's no error code and this wasn't user-requested, set the error code to something useful */
  302. if (error_code == AWS_ERROR_SUCCESS) {
  303. if (prev_state != AWS_MQTT_CLIENT_STATE_DISCONNECTING && prev_state != AWS_MQTT_CLIENT_STATE_DISCONNECTED) {
  304. error_code = AWS_ERROR_MQTT_UNEXPECTED_HANGUP;
  305. }
  306. }
  307. switch (prev_state) {
  308. case AWS_MQTT_CLIENT_STATE_RECONNECTING: {
  309. /* If reconnect attempt failed, schedule the next attempt */
  310. AWS_LOGF_TRACE(AWS_LS_MQTT_CLIENT, "id=%p: Reconnect failed, retrying", (void *)connection);
  311. s_aws_mqtt_schedule_reconnect_task(connection);
  312. break;
  313. }
  314. case AWS_MQTT_CLIENT_STATE_CONNECTED: {
  315. AWS_LOGF_DEBUG(
  316. AWS_LS_MQTT_CLIENT,
  317. "id=%p: Connection interrupted, calling callback and attempting reconnect",
  318. (void *)connection);
  319. MQTT_CLIENT_CALL_CALLBACK_ARGS(connection, on_interrupted, error_code);
  320. /* In case user called disconnect from the on_interrupted callback */
  321. bool stop_reconnect;
  322. { /* BEGIN CRITICAL SECTION */
  323. mqtt_connection_lock_synced_data(connection);
  324. stop_reconnect = connection->synced_data.state == AWS_MQTT_CLIENT_STATE_DISCONNECTING;
  325. if (stop_reconnect) {
  326. disconnected_state = true;
  327. AWS_LOGF_DEBUG(
  328. AWS_LS_MQTT_CLIENT,
  329. "id=%p: disconnect finished, switch state to DISCONNECTED.",
  330. (void *)connection);
  331. }
  332. mqtt_connection_unlock_synced_data(connection);
  333. } /* END CRITICAL SECTION */
  334. if (!stop_reconnect) {
  335. s_aws_mqtt_schedule_reconnect_task(connection);
  336. }
  337. break;
  338. }
  339. default:
  340. break;
  341. }
  342. if (disconnected_state) {
  343. { /* BEGIN CRITICAL SECTION */
  344. mqtt_connection_lock_synced_data(connection);
  345. mqtt_connection_set_state(connection, AWS_MQTT_CLIENT_STATE_DISCONNECTED);
  346. mqtt_connection_unlock_synced_data(connection);
  347. } /* END CRITICAL SECTION */
  348. switch (prev_state) {
  349. case AWS_MQTT_CLIENT_STATE_CONNECTED:
  350. AWS_LOGF_TRACE(
  351. AWS_LS_MQTT_CLIENT,
  352. "id=%p: Caller requested disconnect from on_interrupted callback, aborting reconnect",
  353. (void *)connection);
  354. MQTT_CLIENT_CALL_CALLBACK(connection, on_disconnect);
  355. MQTT_CLIENT_CALL_CALLBACK_ARGS(connection, on_closed, NULL);
  356. break;
  357. case AWS_MQTT_CLIENT_STATE_DISCONNECTING:
  358. AWS_LOGF_DEBUG(
  359. AWS_LS_MQTT_CLIENT,
  360. "id=%p: Disconnect completed, clearing request queue and calling callback",
  361. (void *)connection);
  362. MQTT_CLIENT_CALL_CALLBACK(connection, on_disconnect);
  363. MQTT_CLIENT_CALL_CALLBACK_ARGS(connection, on_closed, NULL);
  364. break;
  365. case AWS_MQTT_CLIENT_STATE_CONNECTING:
  366. AWS_LOGF_TRACE(
  367. AWS_LS_MQTT_CLIENT,
  368. "id=%p: Initial connection attempt failed, calling callback",
  369. (void *)connection);
  370. MQTT_CLIENT_CALL_CALLBACK_ARGS(connection, on_connection_complete, error_code, 0, false);
  371. break;
  372. default:
  373. break;
  374. }
  375. /* The connection can die now. Release the refcount */
  376. aws_mqtt_client_connection_release(connection);
  377. }
  378. }
  379. /*******************************************************************************
  380. * Connection New
  381. ******************************************************************************/
  382. /* The assumption here is that a connection always outlives its channels, and the channel this task was scheduled on
  383. * always outlives this task, so all we need to do is check the connection state. If we are in a state that waits
  384. * for a CONNACK, kill it off. In the case that the connection died between scheduling this task and it being executed
  385. * the status will always be CANCELED because this task will be canceled when the owning channel goes away. */
  386. static void s_connack_received_timeout(struct aws_channel_task *channel_task, void *arg, enum aws_task_status status) {
  387. struct aws_mqtt_client_connection *connection = arg;
  388. if (status == AWS_TASK_STATUS_RUN_READY) {
  389. bool time_out = false;
  390. { /* BEGIN CRITICAL SECTION */
  391. mqtt_connection_lock_synced_data(connection);
  392. time_out =
  393. (connection->synced_data.state == AWS_MQTT_CLIENT_STATE_CONNECTING ||
  394. connection->synced_data.state == AWS_MQTT_CLIENT_STATE_RECONNECTING);
  395. mqtt_connection_unlock_synced_data(connection);
  396. } /* END CRITICAL SECTION */
  397. if (time_out) {
  398. AWS_LOGF_ERROR(AWS_LS_MQTT_CLIENT, "id=%p: mqtt CONNACK response timeout detected", (void *)connection);
  399. aws_channel_shutdown(connection->slot->channel, AWS_ERROR_MQTT_TIMEOUT);
  400. }
  401. }
  402. aws_mem_release(connection->allocator, channel_task);
  403. }
  404. /**
  405. * Channel has been initialized callback. Sets up channel handler and sends out CONNECT packet.
  406. * The on_connack callback is called with the CONNACK packet is received from the server.
  407. */
  408. static void s_mqtt_client_init(
  409. struct aws_client_bootstrap *bootstrap,
  410. int error_code,
  411. struct aws_channel *channel,
  412. void *user_data) {
  413. (void)bootstrap;
  414. struct aws_io_message *message = NULL;
  415. /* Setup callback contract is: if error_code is non-zero then channel is NULL. */
  416. AWS_FATAL_ASSERT((error_code != 0) == (channel == NULL));
  417. struct aws_mqtt_client_connection *connection = user_data;
  418. if (error_code != AWS_OP_SUCCESS) {
  419. /* client shutdown already handles this case, so just call that. */
  420. s_mqtt_client_shutdown(bootstrap, error_code, channel, user_data);
  421. return;
  422. }
  423. AWS_FATAL_ASSERT(aws_channel_get_event_loop(channel) == connection->loop);
  424. /* user requested disconnect before the channel has been set up. Stop installing the slot and sending CONNECT. */
  425. bool failed_create_slot = false;
  426. { /* BEGIN CRITICAL SECTION */
  427. mqtt_connection_lock_synced_data(connection);
  428. if (connection->synced_data.state == AWS_MQTT_CLIENT_STATE_DISCONNECTING) {
  429. /* It only happens when the user request disconnect during reconnecting, we don't need to fire any callback.
  430. * The on_disconnect will be invoked as channel finish shutting down. */
  431. mqtt_connection_unlock_synced_data(connection);
  432. aws_channel_shutdown(channel, AWS_ERROR_SUCCESS);
  433. return;
  434. }
  435. /* Create the slot */
  436. connection->slot = aws_channel_slot_new(channel);
  437. if (!connection->slot) {
  438. failed_create_slot = true;
  439. }
  440. mqtt_connection_unlock_synced_data(connection);
  441. } /* END CRITICAL SECTION */
  442. /* intall the slot and handler */
  443. if (failed_create_slot) {
  444. AWS_LOGF_ERROR(
  445. AWS_LS_MQTT_CLIENT,
  446. "id=%p: Failed to create new slot, something has gone horribly wrong, error %d (%s).",
  447. (void *)connection,
  448. aws_last_error(),
  449. aws_error_name(aws_last_error()));
  450. goto handle_error;
  451. }
  452. if (aws_channel_slot_insert_end(channel, connection->slot)) {
  453. AWS_LOGF_ERROR(
  454. AWS_LS_MQTT_CLIENT,
  455. "id=%p: Failed to insert slot into channel %p, error %d (%s).",
  456. (void *)connection,
  457. (void *)channel,
  458. aws_last_error(),
  459. aws_error_name(aws_last_error()));
  460. goto handle_error;
  461. }
  462. if (aws_channel_slot_set_handler(connection->slot, &connection->handler)) {
  463. AWS_LOGF_ERROR(
  464. AWS_LS_MQTT_CLIENT,
  465. "id=%p: Failed to set MQTT handler into slot on channel %p, error %d (%s).",
  466. (void *)connection,
  467. (void *)channel,
  468. aws_last_error(),
  469. aws_error_name(aws_last_error()));
  470. goto handle_error;
  471. }
  472. AWS_LOGF_DEBUG(
  473. AWS_LS_MQTT_CLIENT, "id=%p: Connection successfully opened, sending CONNECT packet", (void *)connection);
  474. struct aws_channel_task *connack_task = aws_mem_calloc(connection->allocator, 1, sizeof(struct aws_channel_task));
  475. if (!connack_task) {
  476. AWS_LOGF_ERROR(AWS_LS_MQTT_CLIENT, "id=%p: Failed to allocate timeout task.", (void *)connection);
  477. goto handle_error;
  478. }
  479. aws_channel_task_init(connack_task, s_connack_received_timeout, connection, "mqtt_connack_timeout");
  480. uint64_t now = 0;
  481. if (aws_channel_current_clock_time(channel, &now)) {
  482. AWS_LOGF_ERROR(
  483. AWS_LS_MQTT_CLIENT,
  484. "static: Failed to setting MQTT handler into slot on channel %p, error %d (%s).",
  485. (void *)channel,
  486. aws_last_error(),
  487. aws_error_name(aws_last_error()));
  488. goto handle_error;
  489. }
  490. now += connection->ping_timeout_ns;
  491. aws_channel_schedule_task_future(channel, connack_task, now);
  492. struct aws_byte_cursor client_id_cursor = aws_byte_cursor_from_buf(&connection->client_id);
  493. AWS_LOGF_DEBUG(
  494. AWS_LS_MQTT_CLIENT,
  495. "id=%p: MQTT Connection initializing CONNECT packet for client-id '" PRInSTR "'",
  496. (void *)connection,
  497. AWS_BYTE_CURSOR_PRI(client_id_cursor));
  498. /* Send the connect packet */
  499. struct aws_mqtt_packet_connect connect;
  500. aws_mqtt_packet_connect_init(
  501. &connect, client_id_cursor, connection->clean_session, connection->keep_alive_time_secs);
  502. if (connection->will.topic.buffer) {
  503. /* Add will if present */
  504. struct aws_byte_cursor topic_cur = aws_byte_cursor_from_buf(&connection->will.topic);
  505. struct aws_byte_cursor payload_cur = aws_byte_cursor_from_buf(&connection->will.payload);
  506. AWS_LOGF_DEBUG(
  507. AWS_LS_MQTT_CLIENT,
  508. "id=%p: Adding will to connection on " PRInSTR " with payload " PRInSTR,
  509. (void *)connection,
  510. AWS_BYTE_CURSOR_PRI(topic_cur),
  511. AWS_BYTE_CURSOR_PRI(payload_cur));
  512. aws_mqtt_packet_connect_add_will(
  513. &connect, topic_cur, connection->will.qos, connection->will.retain, payload_cur);
  514. }
  515. if (connection->username) {
  516. struct aws_byte_cursor username_cur = aws_byte_cursor_from_string(connection->username);
  517. AWS_LOGF_DEBUG(
  518. AWS_LS_MQTT_CLIENT,
  519. "id=%p: Adding username " PRInSTR " to connection",
  520. (void *)connection,
  521. AWS_BYTE_CURSOR_PRI(username_cur));
  522. struct aws_byte_cursor password_cur = {
  523. .ptr = NULL,
  524. .len = 0,
  525. };
  526. if (connection->password) {
  527. password_cur = aws_byte_cursor_from_string(connection->password);
  528. }
  529. aws_mqtt_packet_connect_add_credentials(&connect, username_cur, password_cur);
  530. }
  531. message = mqtt_get_message_for_packet(connection, &connect.fixed_header);
  532. if (!message) {
  533. AWS_LOGF_ERROR(AWS_LS_MQTT_CLIENT, "id=%p: Failed to get message from pool", (void *)connection);
  534. goto handle_error;
  535. }
  536. if (aws_mqtt_packet_connect_encode(&message->message_data, &connect)) {
  537. AWS_LOGF_ERROR(AWS_LS_MQTT_CLIENT, "id=%p: Failed to encode CONNECT packet", (void *)connection);
  538. goto handle_error;
  539. }
  540. if (aws_channel_slot_send_message(connection->slot, message, AWS_CHANNEL_DIR_WRITE)) {
  541. AWS_LOGF_ERROR(AWS_LS_MQTT_CLIENT, "id=%p: Failed to send encoded CONNECT packet upstream", (void *)connection);
  542. goto handle_error;
  543. }
  544. return;
  545. handle_error:
  546. MQTT_CLIENT_CALL_CALLBACK_ARGS(connection, on_connection_complete, aws_last_error(), 0, false);
  547. aws_channel_shutdown(channel, aws_last_error());
  548. if (message) {
  549. aws_mem_release(message->allocator, message);
  550. }
  551. }
  552. static void s_attempt_reconnect(struct aws_task *task, void *userdata, enum aws_task_status status) {
  553. (void)task;
  554. struct aws_mqtt_reconnect_task *reconnect = userdata;
  555. struct aws_mqtt_client_connection *connection = aws_atomic_load_ptr(&reconnect->connection_ptr);
  556. if (status == AWS_TASK_STATUS_RUN_READY && connection) {
  557. /* If the task is not cancelled and a connection has not succeeded, attempt reconnect */
  558. mqtt_connection_lock_synced_data(connection);
  559. /* Check before multiplying to avoid potential overflow */
  560. if (connection->reconnect_timeouts.current_sec > connection->reconnect_timeouts.max_sec / 2) {
  561. connection->reconnect_timeouts.current_sec = connection->reconnect_timeouts.max_sec;
  562. } else {
  563. connection->reconnect_timeouts.current_sec *= 2;
  564. }
  565. AWS_LOGF_TRACE(
  566. AWS_LS_MQTT_CLIENT,
  567. "id=%p: Attempting reconnect, if it fails next attempt will be in %" PRIu64 " seconds",
  568. (void *)connection,
  569. connection->reconnect_timeouts.current_sec);
  570. mqtt_connection_unlock_synced_data(connection);
  571. if (s_mqtt_client_connect(
  572. connection, connection->on_connection_complete, connection->on_connection_complete_ud)) {
  573. /* If reconnect attempt failed, schedule the next attempt */
  574. s_aws_mqtt_schedule_reconnect_task(connection);
  575. } else {
  576. /* Ideally, it would be nice to move this inside the lock, but I'm unsure of the correctness */
  577. connection->reconnect_task->task.timestamp = 0;
  578. }
  579. } else {
  580. aws_mem_release(reconnect->allocator, reconnect);
  581. }
  582. }
  583. void aws_create_reconnect_task(struct aws_mqtt_client_connection *connection) {
  584. if (connection->reconnect_task == NULL) {
  585. connection->reconnect_task = aws_mem_calloc(connection->allocator, 1, sizeof(struct aws_mqtt_reconnect_task));
  586. AWS_FATAL_ASSERT(connection->reconnect_task != NULL);
  587. aws_atomic_init_ptr(&connection->reconnect_task->connection_ptr, connection);
  588. connection->reconnect_task->allocator = connection->allocator;
  589. aws_task_init(
  590. &connection->reconnect_task->task, s_attempt_reconnect, connection->reconnect_task, "mqtt_reconnect");
  591. }
  592. }
  593. static uint64_t s_hash_uint16_t(const void *item) {
  594. return *(uint16_t *)item;
  595. }
  596. static bool s_uint16_t_eq(const void *a, const void *b) {
  597. return *(uint16_t *)a == *(uint16_t *)b;
  598. }
  599. static void s_mqtt_client_connection_destroy_final(struct aws_mqtt_client_connection *connection) {
  600. AWS_PRECONDITION(!connection || connection->allocator);
  601. if (!connection) {
  602. return;
  603. }
  604. /* If the slot is not NULL, the connection is still connected, which should be prevented from calling this function
  605. */
  606. AWS_ASSERT(!connection->slot);
  607. AWS_LOGF_DEBUG(AWS_LS_MQTT_CLIENT, "id=%p: Destroying connection", (void *)connection);
  608. /* If the reconnect_task isn't freed, free it */
  609. if (connection->reconnect_task) {
  610. aws_mem_release(connection->reconnect_task->allocator, connection->reconnect_task);
  611. }
  612. aws_string_destroy(connection->host_name);
  613. /* Clear the credentials */
  614. if (connection->username) {
  615. aws_string_destroy_secure(connection->username);
  616. }
  617. if (connection->password) {
  618. aws_string_destroy_secure(connection->password);
  619. }
  620. /* Clean up the will */
  621. aws_byte_buf_clean_up(&connection->will.topic);
  622. aws_byte_buf_clean_up(&connection->will.payload);
  623. /* Clear the client_id */
  624. aws_byte_buf_clean_up(&connection->client_id);
  625. /* Free all of the active subscriptions */
  626. aws_mqtt_topic_tree_clean_up(&connection->thread_data.subscriptions);
  627. aws_hash_table_clean_up(&connection->synced_data.outstanding_requests_table);
  628. /* clean up the pending_requests if it's not empty */
  629. while (!aws_linked_list_empty(&connection->synced_data.pending_requests_list)) {
  630. struct aws_linked_list_node *node = aws_linked_list_pop_front(&connection->synced_data.pending_requests_list);
  631. struct aws_mqtt_request *request = AWS_CONTAINER_OF(node, struct aws_mqtt_request, list_node);
  632. /* Fire the callback and clean up the memory, as the connection get destroyed. */
  633. if (request->on_complete) {
  634. request->on_complete(
  635. connection, request->packet_id, AWS_ERROR_MQTT_CONNECTION_DESTROYED, request->on_complete_ud);
  636. }
  637. aws_memory_pool_release(&connection->synced_data.requests_pool, request);
  638. }
  639. aws_memory_pool_clean_up(&connection->synced_data.requests_pool);
  640. aws_mutex_clean_up(&connection->synced_data.lock);
  641. aws_tls_connection_options_clean_up(&connection->tls_options);
  642. /* Clean up the websocket proxy options */
  643. if (connection->http_proxy_config) {
  644. aws_http_proxy_config_destroy(connection->http_proxy_config);
  645. connection->http_proxy_config = NULL;
  646. }
  647. aws_mqtt_client_release(connection->client);
  648. /* Frees all allocated memory */
  649. aws_mem_release(connection->allocator, connection);
  650. }
  651. static void s_on_final_disconnect(struct aws_mqtt_client_connection *connection, void *userdata) {
  652. (void)userdata;
  653. s_mqtt_client_connection_destroy_final(connection);
  654. }
  655. static void s_mqtt_client_connection_start_destroy(struct aws_mqtt_client_connection *connection) {
  656. bool call_destroy_final = false;
  657. AWS_LOGF_DEBUG(
  658. AWS_LS_MQTT_CLIENT,
  659. "id=%p: Last refcount on connection has been released, start destroying the connection.",
  660. (void *)connection);
  661. { /* BEGIN CRITICAL SECTION */
  662. mqtt_connection_lock_synced_data(connection);
  663. if (connection->synced_data.state != AWS_MQTT_CLIENT_STATE_DISCONNECTED) {
  664. /*
  665. * We don't call the on_disconnect callback until we've transitioned to the DISCONNECTED state. So it's
  666. * safe to change it now while we hold the lock since we know we're not DISCONNECTED yet.
  667. */
  668. connection->on_disconnect = s_on_final_disconnect;
  669. if (connection->synced_data.state != AWS_MQTT_CLIENT_STATE_DISCONNECTING) {
  670. mqtt_disconnect_impl(connection, AWS_ERROR_SUCCESS);
  671. AWS_LOGF_DEBUG(
  672. AWS_LS_MQTT_CLIENT,
  673. "id=%p: final refcount has been released, switch state to DISCONNECTING.",
  674. (void *)connection);
  675. mqtt_connection_set_state(connection, AWS_MQTT_CLIENT_STATE_DISCONNECTING);
  676. }
  677. } else {
  678. call_destroy_final = true;
  679. }
  680. mqtt_connection_unlock_synced_data(connection);
  681. } /* END CRITICAL SECTION */
  682. if (call_destroy_final) {
  683. s_mqtt_client_connection_destroy_final(connection);
  684. }
  685. }
  686. struct aws_mqtt_client_connection *aws_mqtt_client_connection_new(struct aws_mqtt_client *client) {
  687. AWS_PRECONDITION(client);
  688. struct aws_mqtt_client_connection *connection =
  689. aws_mem_calloc(client->allocator, 1, sizeof(struct aws_mqtt_client_connection));
  690. if (!connection) {
  691. return NULL;
  692. }
  693. AWS_LOGF_DEBUG(AWS_LS_MQTT_CLIENT, "id=%p: Creating new connection", (void *)connection);
  694. /* Initialize the client */
  695. connection->allocator = client->allocator;
  696. aws_ref_count_init(
  697. &connection->ref_count, connection, (aws_simple_completion_callback *)s_mqtt_client_connection_start_destroy);
  698. connection->client = aws_mqtt_client_acquire(client);
  699. AWS_ZERO_STRUCT(connection->synced_data);
  700. connection->synced_data.state = AWS_MQTT_CLIENT_STATE_DISCONNECTED;
  701. connection->reconnect_timeouts.min_sec = 1;
  702. connection->reconnect_timeouts.current_sec = 1;
  703. connection->reconnect_timeouts.max_sec = 128;
  704. aws_linked_list_init(&connection->synced_data.pending_requests_list);
  705. aws_linked_list_init(&connection->thread_data.ongoing_requests_list);
  706. s_init_statistics(&connection->operation_statistics_impl);
  707. if (aws_mutex_init(&connection->synced_data.lock)) {
  708. AWS_LOGF_ERROR(
  709. AWS_LS_MQTT_CLIENT,
  710. "id=%p: Failed to initialize mutex, error %d (%s)",
  711. (void *)connection,
  712. aws_last_error(),
  713. aws_error_name(aws_last_error()));
  714. goto failed_init_mutex;
  715. }
  716. if (aws_mqtt_topic_tree_init(&connection->thread_data.subscriptions, connection->allocator)) {
  717. AWS_LOGF_ERROR(
  718. AWS_LS_MQTT_CLIENT,
  719. "id=%p: Failed to initialize subscriptions topic_tree, error %d (%s)",
  720. (void *)connection,
  721. aws_last_error(),
  722. aws_error_name(aws_last_error()));
  723. goto failed_init_subscriptions;
  724. }
  725. if (aws_memory_pool_init(
  726. &connection->synced_data.requests_pool, connection->allocator, 32, sizeof(struct aws_mqtt_request))) {
  727. AWS_LOGF_ERROR(
  728. AWS_LS_MQTT_CLIENT,
  729. "id=%p: Failed to initialize request pool, error %d (%s)",
  730. (void *)connection,
  731. aws_last_error(),
  732. aws_error_name(aws_last_error()));
  733. goto failed_init_requests_pool;
  734. }
  735. if (aws_hash_table_init(
  736. &connection->synced_data.outstanding_requests_table,
  737. connection->allocator,
  738. sizeof(struct aws_mqtt_request *),
  739. s_hash_uint16_t,
  740. s_uint16_t_eq,
  741. NULL,
  742. NULL)) {
  743. AWS_LOGF_ERROR(
  744. AWS_LS_MQTT_CLIENT,
  745. "id=%p: Failed to initialize outstanding requests table, error %d (%s)",
  746. (void *)connection,
  747. aws_last_error(),
  748. aws_error_name(aws_last_error()));
  749. goto failed_init_outstanding_requests_table;
  750. }
  751. connection->loop = aws_event_loop_group_get_next_loop(client->bootstrap->event_loop_group);
  752. /* Initialize the handler */
  753. connection->handler.alloc = connection->allocator;
  754. connection->handler.vtable = aws_mqtt_get_client_channel_vtable();
  755. connection->handler.impl = connection;
  756. return connection;
  757. failed_init_outstanding_requests_table:
  758. aws_memory_pool_clean_up(&connection->synced_data.requests_pool);
  759. failed_init_requests_pool:
  760. aws_mqtt_topic_tree_clean_up(&connection->thread_data.subscriptions);
  761. failed_init_subscriptions:
  762. aws_mutex_clean_up(&connection->synced_data.lock);
  763. failed_init_mutex:
  764. aws_mem_release(client->allocator, connection);
  765. return NULL;
  766. }
  767. struct aws_mqtt_client_connection *aws_mqtt_client_connection_acquire(struct aws_mqtt_client_connection *connection) {
  768. if (connection != NULL) {
  769. aws_ref_count_acquire(&connection->ref_count);
  770. }
  771. return connection;
  772. }
  773. void aws_mqtt_client_connection_release(struct aws_mqtt_client_connection *connection) {
  774. if (connection != NULL) {
  775. aws_ref_count_release(&connection->ref_count);
  776. }
  777. }
  778. /*******************************************************************************
  779. * Connection Configuration
  780. ******************************************************************************/
  781. /* To configure the connection, ensure the state is DISCONNECTED or CONNECTED */
  782. static int s_check_connection_state_for_configuration(struct aws_mqtt_client_connection *connection) {
  783. int result = AWS_OP_SUCCESS;
  784. { /* BEGIN CRITICAL SECTION */
  785. mqtt_connection_lock_synced_data(connection);
  786. if (connection->synced_data.state != AWS_MQTT_CLIENT_STATE_DISCONNECTED &&
  787. connection->synced_data.state != AWS_MQTT_CLIENT_STATE_CONNECTED) {
  788. AWS_LOGF_ERROR(
  789. AWS_LS_MQTT_CLIENT,
  790. "id=%p: Connection is currently pending connect/disconnect. Unable to make configuration changes until "
  791. "pending operation completes.",
  792. (void *)connection);
  793. result = AWS_OP_ERR;
  794. }
  795. mqtt_connection_unlock_synced_data(connection);
  796. } /* END CRITICAL SECTION */
  797. return result;
  798. }
  799. int aws_mqtt_client_connection_set_will(
  800. struct aws_mqtt_client_connection *connection,
  801. const struct aws_byte_cursor *topic,
  802. enum aws_mqtt_qos qos,
  803. bool retain,
  804. const struct aws_byte_cursor *payload) {
  805. AWS_PRECONDITION(connection);
  806. AWS_PRECONDITION(topic);
  807. if (s_check_connection_state_for_configuration(connection)) {
  808. return aws_raise_error(AWS_ERROR_INVALID_STATE);
  809. }
  810. int result = AWS_OP_ERR;
  811. AWS_LOGF_TRACE(
  812. AWS_LS_MQTT_CLIENT,
  813. "id=%p: Setting last will with topic \"" PRInSTR "\"",
  814. (void *)connection,
  815. AWS_BYTE_CURSOR_PRI(*topic));
  816. if (!aws_mqtt_is_valid_topic(topic)) {
  817. AWS_LOGF_ERROR(AWS_LS_MQTT_CLIENT, "id=%p: Will topic is invalid", (void *)connection);
  818. return aws_raise_error(AWS_ERROR_MQTT_INVALID_TOPIC);
  819. }
  820. struct aws_byte_buf local_topic_buf;
  821. struct aws_byte_buf local_payload_buf;
  822. AWS_ZERO_STRUCT(local_topic_buf);
  823. AWS_ZERO_STRUCT(local_payload_buf);
  824. struct aws_byte_buf topic_buf = aws_byte_buf_from_array(topic->ptr, topic->len);
  825. if (aws_byte_buf_init_copy(&local_topic_buf, connection->allocator, &topic_buf)) {
  826. AWS_LOGF_ERROR(AWS_LS_MQTT_CLIENT, "id=%p: Failed to copy will topic", (void *)connection);
  827. goto cleanup;
  828. }
  829. connection->will.qos = qos;
  830. connection->will.retain = retain;
  831. struct aws_byte_buf payload_buf = aws_byte_buf_from_array(payload->ptr, payload->len);
  832. if (aws_byte_buf_init_copy(&local_payload_buf, connection->allocator, &payload_buf)) {
  833. AWS_LOGF_ERROR(AWS_LS_MQTT_CLIENT, "id=%p: Failed to copy will body", (void *)connection);
  834. goto cleanup;
  835. }
  836. if (connection->will.topic.len) {
  837. AWS_LOGF_TRACE(AWS_LS_MQTT_CLIENT, "id=%p: Will has been set before, resetting it.", (void *)connection);
  838. }
  839. /* Succeed. */
  840. result = AWS_OP_SUCCESS;
  841. /* swap the local buffer with connection */
  842. struct aws_byte_buf temp = local_topic_buf;
  843. local_topic_buf = connection->will.topic;
  844. connection->will.topic = temp;
  845. temp = local_payload_buf;
  846. local_payload_buf = connection->will.payload;
  847. connection->will.payload = temp;
  848. cleanup:
  849. aws_byte_buf_clean_up(&local_topic_buf);
  850. aws_byte_buf_clean_up(&local_payload_buf);
  851. return result;
  852. }
  853. int aws_mqtt_client_connection_set_login(
  854. struct aws_mqtt_client_connection *connection,
  855. const struct aws_byte_cursor *username,
  856. const struct aws_byte_cursor *password) {
  857. AWS_PRECONDITION(connection);
  858. AWS_PRECONDITION(username);
  859. if (s_check_connection_state_for_configuration(connection)) {
  860. return aws_raise_error(AWS_ERROR_INVALID_STATE);
  861. }
  862. int result = AWS_OP_ERR;
  863. AWS_LOGF_TRACE(AWS_LS_MQTT_CLIENT, "id=%p: Setting username and password", (void *)connection);
  864. struct aws_string *username_string = NULL;
  865. struct aws_string *password_string = NULL;
  866. username_string = aws_string_new_from_array(connection->allocator, username->ptr, username->len);
  867. if (!username_string) {
  868. AWS_LOGF_ERROR(AWS_LS_MQTT_CLIENT, "id=%p: Failed to copy username", (void *)connection);
  869. goto cleanup;
  870. }
  871. if (password) {
  872. password_string = aws_string_new_from_array(connection->allocator, password->ptr, password->len);
  873. if (!password_string) {
  874. AWS_LOGF_ERROR(AWS_LS_MQTT_CLIENT, "id=%p: Failed to copy password", (void *)connection);
  875. goto cleanup;
  876. }
  877. }
  878. if (connection->username) {
  879. AWS_LOGF_TRACE(
  880. AWS_LS_MQTT_CLIENT, "id=%p: Login information has been set before, resetting it.", (void *)connection);
  881. }
  882. /* Succeed. */
  883. result = AWS_OP_SUCCESS;
  884. /* swap the local string with connection */
  885. struct aws_string *temp = username_string;
  886. username_string = connection->username;
  887. connection->username = temp;
  888. temp = password_string;
  889. password_string = connection->password;
  890. connection->password = temp;
  891. cleanup:
  892. aws_string_destroy_secure(username_string);
  893. aws_string_destroy_secure(password_string);
  894. return result;
  895. }
  896. int aws_mqtt_client_connection_set_reconnect_timeout(
  897. struct aws_mqtt_client_connection *connection,
  898. uint64_t min_timeout,
  899. uint64_t max_timeout) {
  900. AWS_PRECONDITION(connection);
  901. if (s_check_connection_state_for_configuration(connection)) {
  902. return aws_raise_error(AWS_ERROR_INVALID_STATE);
  903. }
  904. AWS_LOGF_TRACE(
  905. AWS_LS_MQTT_CLIENT,
  906. "id=%p: Setting reconnect timeouts min: %" PRIu64 " max: %" PRIu64,
  907. (void *)connection,
  908. min_timeout,
  909. max_timeout);
  910. connection->reconnect_timeouts.min_sec = min_timeout;
  911. connection->reconnect_timeouts.max_sec = max_timeout;
  912. connection->reconnect_timeouts.current_sec = min_timeout;
  913. return AWS_OP_SUCCESS;
  914. }
  915. int aws_mqtt_client_connection_set_connection_interruption_handlers(
  916. struct aws_mqtt_client_connection *connection,
  917. aws_mqtt_client_on_connection_interrupted_fn *on_interrupted,
  918. void *on_interrupted_ud,
  919. aws_mqtt_client_on_connection_resumed_fn *on_resumed,
  920. void *on_resumed_ud) {
  921. AWS_PRECONDITION(connection);
  922. if (s_check_connection_state_for_configuration(connection)) {
  923. return aws_raise_error(AWS_ERROR_INVALID_STATE);
  924. }
  925. AWS_LOGF_TRACE(
  926. AWS_LS_MQTT_CLIENT, "id=%p: Setting connection interrupted and resumed handlers", (void *)connection);
  927. connection->on_interrupted = on_interrupted;
  928. connection->on_interrupted_ud = on_interrupted_ud;
  929. connection->on_resumed = on_resumed;
  930. connection->on_resumed_ud = on_resumed_ud;
  931. return AWS_OP_SUCCESS;
  932. }
  933. int aws_mqtt_client_connection_set_connection_closed_handler(
  934. struct aws_mqtt_client_connection *connection,
  935. aws_mqtt_client_on_connection_closed_fn *on_closed,
  936. void *on_closed_ud) {
  937. AWS_PRECONDITION(connection);
  938. if (s_check_connection_state_for_configuration(connection)) {
  939. return aws_raise_error(AWS_ERROR_INVALID_STATE);
  940. }
  941. AWS_LOGF_TRACE(AWS_LS_MQTT_CLIENT, "id=%p: Setting connection closed handler", (void *)connection);
  942. connection->on_closed = on_closed;
  943. connection->on_closed_ud = on_closed_ud;
  944. return AWS_OP_SUCCESS;
  945. }
  946. int aws_mqtt_client_connection_set_on_any_publish_handler(
  947. struct aws_mqtt_client_connection *connection,
  948. aws_mqtt_client_publish_received_fn *on_any_publish,
  949. void *on_any_publish_ud) {
  950. AWS_PRECONDITION(connection);
  951. { /* BEGIN CRITICAL SECTION */
  952. mqtt_connection_lock_synced_data(connection);
  953. if (connection->synced_data.state == AWS_MQTT_CLIENT_STATE_CONNECTED) {
  954. mqtt_connection_unlock_synced_data(connection);
  955. AWS_LOGF_ERROR(
  956. AWS_LS_MQTT_CLIENT,
  957. "id=%p: Connection is connected, publishes may arrive anytime. Unable to set publish handler until "
  958. "offline.",
  959. (void *)connection);
  960. return aws_raise_error(AWS_ERROR_INVALID_STATE);
  961. }
  962. mqtt_connection_unlock_synced_data(connection);
  963. } /* END CRITICAL SECTION */
  964. AWS_LOGF_TRACE(AWS_LS_MQTT_CLIENT, "id=%p: Setting on_any_publish handler", (void *)connection);
  965. connection->on_any_publish = on_any_publish;
  966. connection->on_any_publish_ud = on_any_publish_ud;
  967. return AWS_OP_SUCCESS;
  968. }
  969. /*******************************************************************************
  970. * Websockets
  971. ******************************************************************************/
  972. #ifdef AWS_MQTT_WITH_WEBSOCKETS
  973. int aws_mqtt_client_connection_use_websockets(
  974. struct aws_mqtt_client_connection *connection,
  975. aws_mqtt_transform_websocket_handshake_fn *transformer,
  976. void *transformer_ud,
  977. aws_mqtt_validate_websocket_handshake_fn *validator,
  978. void *validator_ud) {
  979. connection->websocket.handshake_transformer = transformer;
  980. connection->websocket.handshake_transformer_ud = transformer_ud;
  981. connection->websocket.handshake_validator = validator;
  982. connection->websocket.handshake_validator_ud = validator_ud;
  983. connection->websocket.enabled = true;
  984. AWS_LOGF_TRACE(AWS_LS_MQTT_CLIENT, "id=%p: Using websockets", (void *)connection);
  985. return AWS_OP_SUCCESS;
  986. }
  987. int aws_mqtt_client_connection_set_http_proxy_options(
  988. struct aws_mqtt_client_connection *connection,
  989. struct aws_http_proxy_options *proxy_options) {
  990. /* If there is existing proxy options, nuke em */
  991. if (connection->http_proxy_config) {
  992. aws_http_proxy_config_destroy(connection->http_proxy_config);
  993. connection->http_proxy_config = NULL;
  994. }
  995. connection->http_proxy_config =
  996. aws_http_proxy_config_new_tunneling_from_proxy_options(connection->allocator, proxy_options);
  997. return connection->http_proxy_config != NULL ? AWS_OP_SUCCESS : AWS_OP_ERR;
  998. }
  999. static void s_on_websocket_shutdown(struct aws_websocket *websocket, int error_code, void *user_data) {
  1000. struct aws_mqtt_client_connection *connection = user_data;
  1001. struct aws_channel *channel = connection->slot ? connection->slot->channel : NULL;
  1002. s_mqtt_client_shutdown(connection->client->bootstrap, error_code, channel, connection);
  1003. if (websocket) {
  1004. aws_websocket_release(websocket);
  1005. }
  1006. }
  1007. static void s_on_websocket_setup(const struct aws_websocket_on_connection_setup_data *setup, void *user_data) {
  1008. /* Setup callback contract is: if error_code is non-zero then websocket is NULL. */
  1009. AWS_FATAL_ASSERT((setup->error_code != 0) == (setup->websocket == NULL));
  1010. struct aws_mqtt_client_connection *connection = user_data;
  1011. struct aws_channel *channel = NULL;
  1012. if (connection->websocket.handshake_request) {
  1013. aws_http_message_release(connection->websocket.handshake_request);
  1014. connection->websocket.handshake_request = NULL;
  1015. }
  1016. if (setup->websocket) {
  1017. channel = aws_websocket_get_channel(setup->websocket);
  1018. AWS_FATAL_ASSERT(channel);
  1019. AWS_FATAL_ASSERT(aws_channel_get_event_loop(channel) == connection->loop);
  1020. /* Websocket must be "converted" before the MQTT handler can be installed next to it. */
  1021. if (aws_websocket_convert_to_midchannel_handler(setup->websocket)) {
  1022. AWS_LOGF_ERROR(
  1023. AWS_LS_MQTT_CLIENT,
  1024. "id=%p: Failed converting websocket, error %d (%s)",
  1025. (void *)connection,
  1026. aws_last_error(),
  1027. aws_error_name(aws_last_error()));
  1028. aws_channel_shutdown(channel, aws_last_error());
  1029. return;
  1030. }
  1031. /* If validation callback is set, let the user accept/reject the handshake */
  1032. if (connection->websocket.handshake_validator) {
  1033. AWS_LOGF_TRACE(AWS_LS_MQTT_CLIENT, "id=%p: Validating websocket handshake response.", (void *)connection);
  1034. if (connection->websocket.handshake_validator(
  1035. connection,
  1036. setup->handshake_response_header_array,
  1037. setup->num_handshake_response_headers,
  1038. connection->websocket.handshake_validator_ud)) {
  1039. AWS_LOGF_ERROR(
  1040. AWS_LS_MQTT_CLIENT,
  1041. "id=%p: Failure reported by websocket handshake validator callback, error %d (%s)",
  1042. (void *)connection,
  1043. aws_last_error(),
  1044. aws_error_name(aws_last_error()));
  1045. aws_channel_shutdown(channel, aws_last_error());
  1046. return;
  1047. }
  1048. AWS_LOGF_TRACE(
  1049. AWS_LS_MQTT_CLIENT, "id=%p: Done validating websocket handshake response.", (void *)connection);
  1050. }
  1051. }
  1052. /* Call into the channel-setup callback, the rest of the logic is the same. */
  1053. s_mqtt_client_init(connection->client->bootstrap, setup->error_code, channel, connection);
  1054. }
  1055. static aws_mqtt_transform_websocket_handshake_complete_fn s_websocket_handshake_transform_complete; /* fwd declare */
  1056. static int s_websocket_connect(struct aws_mqtt_client_connection *connection) {
  1057. AWS_ASSERT(connection->websocket.enabled);
  1058. /* Build websocket handshake request */
  1059. connection->websocket.handshake_request = aws_http_message_new_websocket_handshake_request(
  1060. connection->allocator, *g_websocket_handshake_default_path, aws_byte_cursor_from_string(connection->host_name));
  1061. if (!connection->websocket.handshake_request) {
  1062. AWS_LOGF_ERROR(AWS_LS_MQTT_CLIENT, "id=%p: Failed to generate websocket handshake request", (void *)connection);
  1063. goto error;
  1064. }
  1065. if (aws_http_message_add_header(
  1066. connection->websocket.handshake_request, *g_websocket_handshake_default_protocol_header)) {
  1067. AWS_LOGF_ERROR(AWS_LS_MQTT_CLIENT, "id=%p: Failed to generate websocket handshake request", (void *)connection);
  1068. goto error;
  1069. }
  1070. /* If user registered a transform callback, call it and wait for transform_complete() to be called.
  1071. * If no callback registered, call the transform_complete() function ourselves. */
  1072. if (connection->websocket.handshake_transformer) {
  1073. AWS_LOGF_TRACE(AWS_LS_MQTT_CLIENT, "id=%p: Transforming websocket handshake request.", (void *)connection);
  1074. connection->websocket.handshake_transformer(
  1075. connection->websocket.handshake_request,
  1076. connection->websocket.handshake_transformer_ud,
  1077. s_websocket_handshake_transform_complete,
  1078. connection);
  1079. } else {
  1080. s_websocket_handshake_transform_complete(
  1081. connection->websocket.handshake_request, AWS_ERROR_SUCCESS, connection);
  1082. }
  1083. return AWS_OP_SUCCESS;
  1084. error:
  1085. aws_http_message_release(connection->websocket.handshake_request);
  1086. connection->websocket.handshake_request = NULL;
  1087. return AWS_OP_ERR;
  1088. }
  1089. static void s_websocket_handshake_transform_complete(
  1090. struct aws_http_message *handshake_request,
  1091. int error_code,
  1092. void *complete_ctx) {
  1093. struct aws_mqtt_client_connection *connection = complete_ctx;
  1094. if (error_code) {
  1095. AWS_LOGF_ERROR(
  1096. AWS_LS_MQTT_CLIENT,
  1097. "id=%p: Failure reported by websocket handshake transform callback.",
  1098. (void *)connection);
  1099. goto error;
  1100. }
  1101. if (connection->websocket.handshake_transformer) {
  1102. AWS_LOGF_TRACE(AWS_LS_MQTT_CLIENT, "id=%p: Done transforming websocket handshake request.", (void *)connection);
  1103. }
  1104. /* Call websocket connect() */
  1105. struct aws_websocket_client_connection_options websocket_options = {
  1106. .allocator = connection->allocator,
  1107. .bootstrap = connection->client->bootstrap,
  1108. .socket_options = &connection->socket_options,
  1109. .tls_options = connection->tls_options.ctx ? &connection->tls_options : NULL,
  1110. .host = aws_byte_cursor_from_string(connection->host_name),
  1111. .port = connection->port,
  1112. .handshake_request = handshake_request,
  1113. .initial_window_size = 0, /* Prevent websocket data from arriving before the MQTT handler is installed */
  1114. .user_data = connection,
  1115. .on_connection_setup = s_on_websocket_setup,
  1116. .on_connection_shutdown = s_on_websocket_shutdown,
  1117. .requested_event_loop = connection->loop,
  1118. };
  1119. struct aws_http_proxy_options proxy_options;
  1120. AWS_ZERO_STRUCT(proxy_options);
  1121. if (connection->http_proxy_config != NULL) {
  1122. aws_http_proxy_options_init_from_config(&proxy_options, connection->http_proxy_config);
  1123. websocket_options.proxy_options = &proxy_options;
  1124. }
  1125. if (aws_websocket_client_connect(&websocket_options)) {
  1126. AWS_LOGF_ERROR(AWS_LS_MQTT_CLIENT, "id=%p: Failed to initiate websocket connection.", (void *)connection);
  1127. error_code = aws_last_error();
  1128. goto error;
  1129. }
  1130. /* Success */
  1131. return;
  1132. error:;
  1133. /* Proceed to next step, telling it that we failed. */
  1134. struct aws_websocket_on_connection_setup_data websocket_setup = {.error_code = error_code};
  1135. s_on_websocket_setup(&websocket_setup, connection);
  1136. }
  1137. #else /* AWS_MQTT_WITH_WEBSOCKETS */
  1138. int aws_mqtt_client_connection_use_websockets(
  1139. struct aws_mqtt_client_connection *connection,
  1140. aws_mqtt_transform_websocket_handshake_fn *transformer,
  1141. void *transformer_ud,
  1142. aws_mqtt_validate_websocket_handshake_fn *validator,
  1143. void *validator_ud) {
  1144. (void)connection;
  1145. (void)transformer;
  1146. (void)transformer_ud;
  1147. (void)validator;
  1148. (void)validator_ud;
  1149. AWS_LOGF_ERROR(
  1150. AWS_LS_MQTT_CLIENT,
  1151. "id=%p: Cannot use websockets unless library is built with MQTT_WITH_WEBSOCKETS option.",
  1152. (void *)connection);
  1153. return aws_raise_error(AWS_ERROR_MQTT_BUILT_WITHOUT_WEBSOCKETS);
  1154. }
  1155. int aws_mqtt_client_connection_set_websocket_proxy_options(
  1156. struct aws_mqtt_client_connection *connection,
  1157. struct aws_http_proxy_options *proxy_options) {
  1158. (void)connection;
  1159. (void)proxy_options;
  1160. AWS_LOGF_ERROR(
  1161. AWS_LS_MQTT_CLIENT,
  1162. "id=%p: Cannot use websockets unless library is built with MQTT_WITH_WEBSOCKETS option.",
  1163. (void *)connection);
  1164. return aws_raise_error(AWS_ERROR_MQTT_BUILT_WITHOUT_WEBSOCKETS);
  1165. }
  1166. #endif /* AWS_MQTT_WITH_WEBSOCKETS */
  1167. /*******************************************************************************
  1168. * Connect
  1169. ******************************************************************************/
  1170. int aws_mqtt_client_connection_connect(
  1171. struct aws_mqtt_client_connection *connection,
  1172. const struct aws_mqtt_connection_options *connection_options) {
  1173. /* TODO: Do we need to support resuming the connection if user connect to the same connection & endpoint and the
  1174. * clean_session is false?
  1175. * If not, the broker will resume the connection in this case, and we pretend we are making a new connection, which
  1176. * may cause some confusing behavior. This is basically what we have now. NOTE: The topic_tree is living with the
  1177. * connection right now, which is really confusing.
  1178. * If yes, an edge case will be: User disconnected from the connection with clean_session
  1179. * being false, then connect to another endpoint with the same connection object, we probably need to clear all
  1180. * those states from last connection and create a new "connection". Problem is what if user finish the second
  1181. * connection and reconnect to the first endpoint. There is no way for us to resume the connection in this case. */
  1182. AWS_LOGF_TRACE(AWS_LS_MQTT_CLIENT, "id=%p: Opening connection", (void *)connection);
  1183. { /* BEGIN CRITICAL SECTION */
  1184. mqtt_connection_lock_synced_data(connection);
  1185. if (connection->synced_data.state != AWS_MQTT_CLIENT_STATE_DISCONNECTED) {
  1186. mqtt_connection_unlock_synced_data(connection);
  1187. return aws_raise_error(AWS_ERROR_MQTT_ALREADY_CONNECTED);
  1188. }
  1189. mqtt_connection_set_state(connection, AWS_MQTT_CLIENT_STATE_CONNECTING);
  1190. AWS_LOGF_DEBUG(
  1191. AWS_LS_MQTT_CLIENT, "id=%p: Begin connecting process, switch state to CONNECTING.", (void *)connection);
  1192. mqtt_connection_unlock_synced_data(connection);
  1193. } /* END CRITICAL SECTION */
  1194. if (connection->host_name) {
  1195. aws_string_destroy(connection->host_name);
  1196. }
  1197. connection->host_name = aws_string_new_from_array(
  1198. connection->allocator, connection_options->host_name.ptr, connection_options->host_name.len);
  1199. connection->port = connection_options->port;
  1200. connection->socket_options = *connection_options->socket_options;
  1201. connection->clean_session = connection_options->clean_session;
  1202. connection->keep_alive_time_secs = connection_options->keep_alive_time_secs;
  1203. connection->connection_count = 0;
  1204. if (!connection->keep_alive_time_secs) {
  1205. connection->keep_alive_time_secs = s_default_keep_alive_sec;
  1206. }
  1207. if (!connection_options->protocol_operation_timeout_ms) {
  1208. connection->operation_timeout_ns = UINT64_MAX;
  1209. } else {
  1210. connection->operation_timeout_ns = aws_timestamp_convert(
  1211. (uint64_t)connection_options->protocol_operation_timeout_ms,
  1212. AWS_TIMESTAMP_MILLIS,
  1213. AWS_TIMESTAMP_NANOS,
  1214. NULL);
  1215. }
  1216. if (!connection_options->ping_timeout_ms) {
  1217. connection->ping_timeout_ns = s_default_ping_timeout_ns;
  1218. } else {
  1219. connection->ping_timeout_ns = aws_timestamp_convert(
  1220. (uint64_t)connection_options->ping_timeout_ms, AWS_TIMESTAMP_MILLIS, AWS_TIMESTAMP_NANOS, NULL);
  1221. }
  1222. /* Keep alive time should always be greater than the timeouts. */
  1223. if (AWS_UNLIKELY(connection->keep_alive_time_secs * (uint64_t)AWS_TIMESTAMP_NANOS <= connection->ping_timeout_ns)) {
  1224. AWS_LOGF_FATAL(
  1225. AWS_LS_MQTT_CLIENT,
  1226. "id=%p: Illegal configuration, Connection keep alive %" PRIu64
  1227. "ns must be greater than the request timeouts %" PRIu64 "ns.",
  1228. (void *)connection,
  1229. (uint64_t)connection->keep_alive_time_secs * (uint64_t)AWS_TIMESTAMP_NANOS,
  1230. connection->ping_timeout_ns);
  1231. AWS_FATAL_ASSERT(
  1232. connection->keep_alive_time_secs * (uint64_t)AWS_TIMESTAMP_NANOS > connection->ping_timeout_ns);
  1233. }
  1234. AWS_LOGF_INFO(
  1235. AWS_LS_MQTT_CLIENT,
  1236. "id=%p: using ping timeout of %" PRIu64 " ns",
  1237. (void *)connection,
  1238. connection->ping_timeout_ns);
  1239. /* Cheat and set the tls_options host_name to our copy if they're the same */
  1240. if (connection_options->tls_options) {
  1241. connection->use_tls = true;
  1242. if (aws_tls_connection_options_copy(&connection->tls_options, connection_options->tls_options)) {
  1243. AWS_LOGF_ERROR(
  1244. AWS_LS_MQTT_CLIENT, "id=%p: Failed to copy TLS Connection Options into connection", (void *)connection);
  1245. return AWS_OP_ERR;
  1246. }
  1247. if (!connection_options->tls_options->server_name) {
  1248. struct aws_byte_cursor host_name_cur = aws_byte_cursor_from_string(connection->host_name);
  1249. if (aws_tls_connection_options_set_server_name(
  1250. &connection->tls_options, connection->allocator, &host_name_cur)) {
  1251. AWS_LOGF_ERROR(
  1252. AWS_LS_MQTT_CLIENT, "id=%p: Failed to set TLS Connection Options server name", (void *)connection);
  1253. goto error;
  1254. }
  1255. }
  1256. } else {
  1257. AWS_ZERO_STRUCT(connection->tls_options);
  1258. }
  1259. /* Clean up old client_id */
  1260. if (connection->client_id.buffer) {
  1261. aws_byte_buf_clean_up(&connection->client_id);
  1262. }
  1263. /* Only set connection->client_id if a new one was provided */
  1264. struct aws_byte_buf client_id_buf =
  1265. aws_byte_buf_from_array(connection_options->client_id.ptr, connection_options->client_id.len);
  1266. if (aws_byte_buf_init_copy(&connection->client_id, connection->allocator, &client_id_buf)) {
  1267. AWS_LOGF_ERROR(AWS_LS_MQTT_CLIENT, "id=%p: Failed to copy client_id into connection", (void *)connection);
  1268. goto error;
  1269. }
  1270. struct aws_linked_list cancelling_requests;
  1271. aws_linked_list_init(&cancelling_requests);
  1272. { /* BEGIN CRITICAL SECTION */
  1273. mqtt_connection_lock_synced_data(connection);
  1274. if (connection->clean_session) {
  1275. AWS_LOGF_TRACE(
  1276. AWS_LS_MQTT_CLIENT,
  1277. "id=%p: a clean session connection requested, all the previous requests will fail",
  1278. (void *)connection);
  1279. aws_linked_list_swap_contents(&connection->synced_data.pending_requests_list, &cancelling_requests);
  1280. }
  1281. mqtt_connection_unlock_synced_data(connection);
  1282. } /* END CRITICAL SECTION */
  1283. if (!aws_linked_list_empty(&cancelling_requests)) {
  1284. struct aws_linked_list_node *current = aws_linked_list_front(&cancelling_requests);
  1285. const struct aws_linked_list_node *end = aws_linked_list_end(&cancelling_requests);
  1286. /* invoke all the complete callback for requests from previous session */
  1287. while (current != end) {
  1288. struct aws_mqtt_request *request = AWS_CONTAINER_OF(current, struct aws_mqtt_request, list_node);
  1289. AWS_LOGF_TRACE(
  1290. AWS_LS_MQTT_CLIENT,
  1291. "id=%p: Establishing a new clean session connection, discard the previous request %" PRIu16,
  1292. (void *)connection,
  1293. request->packet_id);
  1294. if (request->on_complete) {
  1295. request->on_complete(
  1296. connection,
  1297. request->packet_id,
  1298. AWS_ERROR_MQTT_CANCELLED_FOR_CLEAN_SESSION,
  1299. request->on_complete_ud);
  1300. }
  1301. current = current->next;
  1302. }
  1303. /* free the resource */
  1304. { /* BEGIN CRITICAL SECTION */
  1305. mqtt_connection_lock_synced_data(connection);
  1306. while (!aws_linked_list_empty(&cancelling_requests)) {
  1307. struct aws_linked_list_node *node = aws_linked_list_pop_front(&cancelling_requests);
  1308. struct aws_mqtt_request *request = AWS_CONTAINER_OF(node, struct aws_mqtt_request, list_node);
  1309. aws_hash_table_remove(
  1310. &connection->synced_data.outstanding_requests_table, &request->packet_id, NULL, NULL);
  1311. aws_memory_pool_release(&connection->synced_data.requests_pool, request);
  1312. }
  1313. mqtt_connection_unlock_synced_data(connection);
  1314. } /* END CRITICAL SECTION */
  1315. }
  1316. /* Begin the connecting process, acquire the connection to keep it alive until we disconnected */
  1317. aws_mqtt_client_connection_acquire(connection);
  1318. if (s_mqtt_client_connect(connection, connection_options->on_connection_complete, connection_options->user_data)) {
  1319. /*
  1320. * An error calling s_mqtt_client_connect should (must) be mutually exclusive with s_mqtt_client_shutdown().
  1321. * So it should be safe and correct to call release now to undo the pinning we did a few lines above.
  1322. */
  1323. aws_mqtt_client_connection_release(connection);
  1324. /* client_id has been updated with something but it will get cleaned up when the connection gets cleaned up
  1325. * so we don't need to worry about it here*/
  1326. if (connection->clean_session) {
  1327. AWS_LOGF_WARN(
  1328. AWS_LS_MQTT_CLIENT, "id=%p: The previous session has been cleaned up and losted!", (void *)connection);
  1329. }
  1330. goto error;
  1331. }
  1332. return AWS_OP_SUCCESS;
  1333. error:
  1334. aws_tls_connection_options_clean_up(&connection->tls_options);
  1335. AWS_ZERO_STRUCT(connection->tls_options);
  1336. { /* BEGIN CRITICAL SECTION */
  1337. mqtt_connection_lock_synced_data(connection);
  1338. mqtt_connection_set_state(connection, AWS_MQTT_CLIENT_STATE_DISCONNECTED);
  1339. mqtt_connection_unlock_synced_data(connection);
  1340. } /* END CRITICAL SECTION */
  1341. return AWS_OP_ERR;
  1342. }
  1343. static int s_mqtt_client_connect(
  1344. struct aws_mqtt_client_connection *connection,
  1345. aws_mqtt_client_on_connection_complete_fn *on_connection_complete,
  1346. void *userdata) {
  1347. connection->on_connection_complete = on_connection_complete;
  1348. connection->on_connection_complete_ud = userdata;
  1349. int result = 0;
  1350. #ifdef AWS_MQTT_WITH_WEBSOCKETS
  1351. if (connection->websocket.enabled) {
  1352. result = s_websocket_connect(connection);
  1353. } else
  1354. #endif /* AWS_MQTT_WITH_WEBSOCKETS */
  1355. {
  1356. struct aws_socket_channel_bootstrap_options channel_options;
  1357. AWS_ZERO_STRUCT(channel_options);
  1358. channel_options.bootstrap = connection->client->bootstrap;
  1359. channel_options.host_name = aws_string_c_str(connection->host_name);
  1360. channel_options.port = connection->port;
  1361. channel_options.socket_options = &connection->socket_options;
  1362. channel_options.tls_options = connection->use_tls ? &connection->tls_options : NULL;
  1363. channel_options.setup_callback = &s_mqtt_client_init;
  1364. channel_options.shutdown_callback = &s_mqtt_client_shutdown;
  1365. channel_options.user_data = connection;
  1366. channel_options.requested_event_loop = connection->loop;
  1367. if (connection->http_proxy_config == NULL) {
  1368. result = aws_client_bootstrap_new_socket_channel(&channel_options);
  1369. } else {
  1370. struct aws_http_proxy_options proxy_options;
  1371. AWS_ZERO_STRUCT(proxy_options);
  1372. aws_http_proxy_options_init_from_config(&proxy_options, connection->http_proxy_config);
  1373. result = aws_http_proxy_new_socket_channel(&channel_options, &proxy_options);
  1374. }
  1375. }
  1376. if (result) {
  1377. /* Connection attempt failed */
  1378. AWS_LOGF_ERROR(
  1379. AWS_LS_MQTT_CLIENT,
  1380. "id=%p: Failed to begin connection routine, error %d (%s).",
  1381. (void *)connection,
  1382. aws_last_error(),
  1383. aws_error_name(aws_last_error()));
  1384. return AWS_OP_ERR;
  1385. }
  1386. return AWS_OP_SUCCESS;
  1387. }
  1388. /*******************************************************************************
  1389. * Reconnect DEPRECATED
  1390. ******************************************************************************/
  1391. int aws_mqtt_client_connection_reconnect(
  1392. struct aws_mqtt_client_connection *connection,
  1393. aws_mqtt_client_on_connection_complete_fn *on_connection_complete,
  1394. void *userdata) {
  1395. (void)connection;
  1396. (void)on_connection_complete;
  1397. (void)userdata;
  1398. /* DEPRECATED, connection will reconnect automatically now. */
  1399. AWS_LOGF_ERROR(AWS_LS_MQTT_CLIENT, "aws_mqtt_client_connection_reconnect has been DEPRECATED.");
  1400. return aws_raise_error(AWS_ERROR_UNSUPPORTED_OPERATION);
  1401. }
  1402. /*******************************************************************************
  1403. * Disconnect
  1404. ******************************************************************************/
  1405. int aws_mqtt_client_connection_disconnect(
  1406. struct aws_mqtt_client_connection *connection,
  1407. aws_mqtt_client_on_disconnect_fn *on_disconnect,
  1408. void *userdata) {
  1409. AWS_LOGF_DEBUG(AWS_LS_MQTT_CLIENT, "id=%p: user called disconnect.", (void *)connection);
  1410. { /* BEGIN CRITICAL SECTION */
  1411. mqtt_connection_lock_synced_data(connection);
  1412. if (connection->synced_data.state != AWS_MQTT_CLIENT_STATE_CONNECTED &&
  1413. connection->synced_data.state != AWS_MQTT_CLIENT_STATE_RECONNECTING) {
  1414. mqtt_connection_unlock_synced_data(connection);
  1415. AWS_LOGF_ERROR(
  1416. AWS_LS_MQTT_CLIENT, "id=%p: Connection is not open, and may not be closed", (void *)connection);
  1417. aws_raise_error(AWS_ERROR_MQTT_NOT_CONNECTED);
  1418. return AWS_OP_ERR;
  1419. }
  1420. mqtt_connection_set_state(connection, AWS_MQTT_CLIENT_STATE_DISCONNECTING);
  1421. AWS_LOGF_DEBUG(
  1422. AWS_LS_MQTT_CLIENT,
  1423. "id=%p: User requests disconnecting, switch state to DISCONNECTING.",
  1424. (void *)connection);
  1425. connection->on_disconnect = on_disconnect;
  1426. connection->on_disconnect_ud = userdata;
  1427. mqtt_connection_unlock_synced_data(connection);
  1428. } /* END CRITICAL SECTION */
  1429. AWS_LOGF_DEBUG(AWS_LS_MQTT_CLIENT, "id=%p: Closing connection", (void *)connection);
  1430. mqtt_disconnect_impl(connection, AWS_OP_SUCCESS);
  1431. return AWS_OP_SUCCESS;
  1432. }
  1433. /*******************************************************************************
  1434. * Subscribe
  1435. ******************************************************************************/
  1436. static void s_on_publish_client_wrapper(
  1437. const struct aws_byte_cursor *topic,
  1438. const struct aws_byte_cursor *payload,
  1439. bool dup,
  1440. enum aws_mqtt_qos qos,
  1441. bool retain,
  1442. void *userdata) {
  1443. struct subscribe_task_topic *task_topic = userdata;
  1444. /* Call out to the user callback */
  1445. if (task_topic->request.on_publish) {
  1446. task_topic->request.on_publish(
  1447. task_topic->connection, topic, payload, dup, qos, retain, task_topic->request.on_publish_ud);
  1448. }
  1449. }
  1450. static void s_task_topic_release(void *userdata) {
  1451. struct subscribe_task_topic *task_topic = userdata;
  1452. if (task_topic != NULL) {
  1453. aws_ref_count_release(&task_topic->ref_count);
  1454. }
  1455. }
  1456. static void s_task_topic_clean_up(void *userdata) {
  1457. struct subscribe_task_topic *task_topic = userdata;
  1458. if (task_topic->request.on_cleanup) {
  1459. task_topic->request.on_cleanup(task_topic->request.on_publish_ud);
  1460. }
  1461. aws_string_destroy(task_topic->filter);
  1462. aws_mem_release(task_topic->connection->allocator, task_topic);
  1463. }
  1464. static enum aws_mqtt_client_request_state s_subscribe_send(uint16_t packet_id, bool is_first_attempt, void *userdata) {
  1465. (void)is_first_attempt;
  1466. struct subscribe_task_arg *task_arg = userdata;
  1467. bool initing_packet = task_arg->subscribe.fixed_header.packet_type == 0;
  1468. struct aws_io_message *message = NULL;
  1469. AWS_LOGF_TRACE(
  1470. AWS_LS_MQTT_CLIENT,
  1471. "id=%p: Attempting send of subscribe %" PRIu16 " (%s)",
  1472. (void *)task_arg->connection,
  1473. packet_id,
  1474. is_first_attempt ? "first attempt" : "resend");
  1475. if (initing_packet) {
  1476. /* Init the subscribe packet */
  1477. if (aws_mqtt_packet_subscribe_init(&task_arg->subscribe, task_arg->connection->allocator, packet_id)) {
  1478. return AWS_MQTT_CLIENT_REQUEST_ERROR;
  1479. }
  1480. }
  1481. const size_t num_topics = aws_array_list_length(&task_arg->topics);
  1482. if (num_topics <= 0) {
  1483. aws_raise_error(AWS_ERROR_MQTT_INVALID_TOPIC);
  1484. return AWS_MQTT_CLIENT_REQUEST_ERROR;
  1485. }
  1486. AWS_VARIABLE_LENGTH_ARRAY(uint8_t, transaction_buf, num_topics * aws_mqtt_topic_tree_action_size);
  1487. struct aws_array_list transaction;
  1488. aws_array_list_init_static(&transaction, transaction_buf, num_topics, aws_mqtt_topic_tree_action_size);
  1489. for (size_t i = 0; i < num_topics; ++i) {
  1490. struct subscribe_task_topic *topic = NULL;
  1491. aws_array_list_get_at(&task_arg->topics, &topic, i);
  1492. AWS_ASSUME(topic); /* We know we're within bounds */
  1493. if (initing_packet) {
  1494. if (aws_mqtt_packet_subscribe_add_topic(&task_arg->subscribe, topic->request.topic, topic->request.qos)) {
  1495. goto handle_error;
  1496. }
  1497. }
  1498. if (!task_arg->tree_updated) {
  1499. if (aws_mqtt_topic_tree_transaction_insert(
  1500. &task_arg->connection->thread_data.subscriptions,
  1501. &transaction,
  1502. topic->filter,
  1503. topic->request.qos,
  1504. s_on_publish_client_wrapper,
  1505. s_task_topic_release,
  1506. topic)) {
  1507. goto handle_error;
  1508. }
  1509. /* If insert succeed, acquire the refcount */
  1510. aws_ref_count_acquire(&topic->ref_count);
  1511. }
  1512. }
  1513. message = mqtt_get_message_for_packet(task_arg->connection, &task_arg->subscribe.fixed_header);
  1514. if (!message) {
  1515. goto handle_error;
  1516. }
  1517. if (aws_mqtt_packet_subscribe_encode(&message->message_data, &task_arg->subscribe)) {
  1518. goto handle_error;
  1519. }
  1520. /* This is not necessarily a fatal error; if the subscribe fails, it'll just retry. Still need to clean up though.
  1521. */
  1522. if (aws_channel_slot_send_message(task_arg->connection->slot, message, AWS_CHANNEL_DIR_WRITE)) {
  1523. aws_mem_release(message->allocator, message);
  1524. }
  1525. if (!task_arg->tree_updated) {
  1526. aws_mqtt_topic_tree_transaction_commit(&task_arg->connection->thread_data.subscriptions, &transaction);
  1527. task_arg->tree_updated = true;
  1528. }
  1529. aws_array_list_clean_up(&transaction);
  1530. return AWS_MQTT_CLIENT_REQUEST_ONGOING;
  1531. handle_error:
  1532. if (message) {
  1533. aws_mem_release(message->allocator, message);
  1534. }
  1535. if (!task_arg->tree_updated) {
  1536. aws_mqtt_topic_tree_transaction_roll_back(&task_arg->connection->thread_data.subscriptions, &transaction);
  1537. }
  1538. aws_array_list_clean_up(&transaction);
  1539. return AWS_MQTT_CLIENT_REQUEST_ERROR;
  1540. }
  1541. static void s_subscribe_complete(
  1542. struct aws_mqtt_client_connection *connection,
  1543. uint16_t packet_id,
  1544. int error_code,
  1545. void *userdata) {
  1546. struct subscribe_task_arg *task_arg = userdata;
  1547. struct subscribe_task_topic *topic = NULL;
  1548. aws_array_list_get_at(&task_arg->topics, &topic, 0);
  1549. AWS_ASSUME(topic);
  1550. AWS_LOGF_DEBUG(
  1551. AWS_LS_MQTT_CLIENT,
  1552. "id=%p: Subscribe %" PRIu16 " completed with error_code %d",
  1553. (void *)connection,
  1554. packet_id,
  1555. error_code);
  1556. size_t list_len = aws_array_list_length(&task_arg->topics);
  1557. if (task_arg->on_suback.multi) {
  1558. /* create a list of aws_mqtt_topic_subscription pointers from topics for the callback */
  1559. AWS_VARIABLE_LENGTH_ARRAY(uint8_t, cb_list_buf, list_len * sizeof(void *));
  1560. struct aws_array_list cb_list;
  1561. aws_array_list_init_static(&cb_list, cb_list_buf, list_len, sizeof(void *));
  1562. int err = 0;
  1563. for (size_t i = 0; i < list_len; i++) {
  1564. err |= aws_array_list_get_at(&task_arg->topics, &topic, i);
  1565. struct aws_mqtt_topic_subscription *subscription = &topic->request;
  1566. err |= aws_array_list_push_back(&cb_list, &subscription);
  1567. }
  1568. AWS_ASSUME(!err);
  1569. task_arg->on_suback.multi(connection, packet_id, &cb_list, error_code, task_arg->on_suback_ud);
  1570. aws_array_list_clean_up(&cb_list);
  1571. } else if (task_arg->on_suback.single) {
  1572. task_arg->on_suback.single(
  1573. connection, packet_id, &topic->request.topic, topic->request.qos, error_code, task_arg->on_suback_ud);
  1574. }
  1575. for (size_t i = 0; i < list_len; i++) {
  1576. aws_array_list_get_at(&task_arg->topics, &topic, i);
  1577. s_task_topic_release(topic);
  1578. }
  1579. aws_array_list_clean_up(&task_arg->topics);
  1580. aws_mqtt_packet_subscribe_clean_up(&task_arg->subscribe);
  1581. aws_mem_release(task_arg->connection->allocator, task_arg);
  1582. }
  1583. uint16_t aws_mqtt_client_connection_subscribe_multiple(
  1584. struct aws_mqtt_client_connection *connection,
  1585. const struct aws_array_list *topic_filters,
  1586. aws_mqtt_suback_multi_fn *on_suback,
  1587. void *on_suback_ud) {
  1588. AWS_PRECONDITION(connection);
  1589. struct subscribe_task_arg *task_arg = aws_mem_calloc(connection->allocator, 1, sizeof(struct subscribe_task_arg));
  1590. if (!task_arg) {
  1591. return 0;
  1592. }
  1593. task_arg->connection = connection;
  1594. task_arg->on_suback.multi = on_suback;
  1595. task_arg->on_suback_ud = on_suback_ud;
  1596. const size_t num_topics = aws_array_list_length(topic_filters);
  1597. if (aws_array_list_init_dynamic(&task_arg->topics, connection->allocator, num_topics, sizeof(void *))) {
  1598. goto handle_error;
  1599. }
  1600. AWS_LOGF_DEBUG(AWS_LS_MQTT_CLIENT, "id=%p: Starting multi-topic subscribe", (void *)connection);
  1601. /* Calculate the size of the subscribe packet
  1602. * The fixed header is 2 bytes and the packet ID is 2 bytes.
  1603. * Note: The size of the topic filter(s) are calculated in the loop below */
  1604. uint64_t subscribe_packet_size = 4;
  1605. for (size_t i = 0; i < num_topics; ++i) {
  1606. struct aws_mqtt_topic_subscription *request = NULL;
  1607. aws_array_list_get_at_ptr(topic_filters, (void **)&request, i);
  1608. if (!aws_mqtt_is_valid_topic_filter(&request->topic)) {
  1609. aws_raise_error(AWS_ERROR_MQTT_INVALID_TOPIC);
  1610. goto handle_error;
  1611. }
  1612. struct subscribe_task_topic *task_topic =
  1613. aws_mem_calloc(connection->allocator, 1, sizeof(struct subscribe_task_topic));
  1614. if (!task_topic) {
  1615. goto handle_error;
  1616. }
  1617. aws_ref_count_init(&task_topic->ref_count, task_topic, (aws_simple_completion_callback *)s_task_topic_clean_up);
  1618. task_topic->connection = connection;
  1619. task_topic->request = *request;
  1620. task_topic->filter = aws_string_new_from_array(
  1621. connection->allocator, task_topic->request.topic.ptr, task_topic->request.topic.len);
  1622. if (!task_topic->filter) {
  1623. aws_mem_release(connection->allocator, task_topic);
  1624. goto handle_error;
  1625. }
  1626. /* Update request topic cursor to refer to owned string */
  1627. task_topic->request.topic = aws_byte_cursor_from_string(task_topic->filter);
  1628. AWS_LOGF_DEBUG(
  1629. AWS_LS_MQTT_CLIENT,
  1630. "id=%p: Adding topic \"" PRInSTR "\"",
  1631. (void *)connection,
  1632. AWS_BYTE_CURSOR_PRI(task_topic->request.topic));
  1633. /* Push into the list */
  1634. aws_array_list_push_back(&task_arg->topics, &task_topic);
  1635. /* Subscribe topic filter is: always 3 bytes (1 for QoS, 2 for Length MSB/LSB) + the size of the topic filter */
  1636. subscribe_packet_size += 3 + task_topic->request.topic.len;
  1637. }
  1638. uint16_t packet_id = mqtt_create_request(
  1639. task_arg->connection,
  1640. &s_subscribe_send,
  1641. task_arg,
  1642. &s_subscribe_complete,
  1643. task_arg,
  1644. false, /* noRetry */
  1645. subscribe_packet_size);
  1646. if (packet_id == 0) {
  1647. AWS_LOGF_ERROR(
  1648. AWS_LS_MQTT_CLIENT,
  1649. "id=%p: Failed to kick off multi-topic subscribe, with error %s",
  1650. (void *)connection,
  1651. aws_error_debug_str(aws_last_error()));
  1652. goto handle_error;
  1653. }
  1654. AWS_LOGF_DEBUG(AWS_LS_MQTT_CLIENT, "id=%p: Sending multi-topic subscribe %" PRIu16, (void *)connection, packet_id);
  1655. return packet_id;
  1656. handle_error:
  1657. if (task_arg) {
  1658. if (task_arg->topics.data) {
  1659. const size_t num_added_topics = aws_array_list_length(&task_arg->topics);
  1660. for (size_t i = 0; i < num_added_topics; ++i) {
  1661. struct subscribe_task_topic *task_topic = NULL;
  1662. aws_array_list_get_at(&task_arg->topics, (void **)&task_topic, i);
  1663. AWS_ASSUME(task_topic);
  1664. aws_string_destroy(task_topic->filter);
  1665. aws_mem_release(connection->allocator, task_topic);
  1666. }
  1667. aws_array_list_clean_up(&task_arg->topics);
  1668. }
  1669. aws_mem_release(connection->allocator, task_arg);
  1670. }
  1671. return 0;
  1672. }
  1673. /*******************************************************************************
  1674. * Subscribe Single
  1675. ******************************************************************************/
  1676. static void s_subscribe_single_complete(
  1677. struct aws_mqtt_client_connection *connection,
  1678. uint16_t packet_id,
  1679. int error_code,
  1680. void *userdata) {
  1681. struct subscribe_task_arg *task_arg = userdata;
  1682. AWS_LOGF_DEBUG(
  1683. AWS_LS_MQTT_CLIENT,
  1684. "id=%p: Subscribe %" PRIu16 " completed with error code %d",
  1685. (void *)connection,
  1686. packet_id,
  1687. error_code);
  1688. AWS_ASSERT(aws_array_list_length(&task_arg->topics) == 1);
  1689. struct subscribe_task_topic *topic = NULL;
  1690. aws_array_list_get_at(&task_arg->topics, &topic, 0);
  1691. AWS_ASSUME(topic); /* There needs to be exactly 1 topic in this list */
  1692. if (task_arg->on_suback.single) {
  1693. AWS_ASSUME(aws_string_is_valid(topic->filter));
  1694. aws_mqtt_suback_fn *suback = task_arg->on_suback.single;
  1695. suback(connection, packet_id, &topic->request.topic, topic->request.qos, error_code, task_arg->on_suback_ud);
  1696. }
  1697. s_task_topic_release(topic);
  1698. aws_array_list_clean_up(&task_arg->topics);
  1699. aws_mqtt_packet_subscribe_clean_up(&task_arg->subscribe);
  1700. aws_mem_release(task_arg->connection->allocator, task_arg);
  1701. }
  1702. uint16_t aws_mqtt_client_connection_subscribe(
  1703. struct aws_mqtt_client_connection *connection,
  1704. const struct aws_byte_cursor *topic_filter,
  1705. enum aws_mqtt_qos qos,
  1706. aws_mqtt_client_publish_received_fn *on_publish,
  1707. void *on_publish_ud,
  1708. aws_mqtt_userdata_cleanup_fn *on_ud_cleanup,
  1709. aws_mqtt_suback_fn *on_suback,
  1710. void *on_suback_ud) {
  1711. AWS_PRECONDITION(connection);
  1712. if (!aws_mqtt_is_valid_topic_filter(topic_filter)) {
  1713. aws_raise_error(AWS_ERROR_MQTT_INVALID_TOPIC);
  1714. return 0;
  1715. }
  1716. /* Because we know we're only going to have 1 topic, we can cheat and allocate the array_list in the same block as
  1717. * the task argument. */
  1718. void *task_topic_storage = NULL;
  1719. struct subscribe_task_topic *task_topic = NULL;
  1720. struct subscribe_task_arg *task_arg = aws_mem_acquire_many(
  1721. connection->allocator,
  1722. 2,
  1723. &task_arg,
  1724. sizeof(struct subscribe_task_arg),
  1725. &task_topic_storage,
  1726. sizeof(struct subscribe_task_topic *));
  1727. if (!task_arg) {
  1728. goto handle_error;
  1729. }
  1730. AWS_ZERO_STRUCT(*task_arg);
  1731. task_arg->connection = connection;
  1732. task_arg->on_suback.single = on_suback;
  1733. task_arg->on_suback_ud = on_suback_ud;
  1734. /* It stores the pointer */
  1735. aws_array_list_init_static(&task_arg->topics, task_topic_storage, 1, sizeof(void *));
  1736. /* Allocate the topic and push into the list */
  1737. task_topic = aws_mem_calloc(connection->allocator, 1, sizeof(struct subscribe_task_topic));
  1738. if (!task_topic) {
  1739. goto handle_error;
  1740. }
  1741. aws_ref_count_init(&task_topic->ref_count, task_topic, (aws_simple_completion_callback *)s_task_topic_clean_up);
  1742. aws_array_list_push_back(&task_arg->topics, &task_topic);
  1743. task_topic->filter = aws_string_new_from_array(connection->allocator, topic_filter->ptr, topic_filter->len);
  1744. if (!task_topic->filter) {
  1745. goto handle_error;
  1746. }
  1747. task_topic->connection = connection;
  1748. task_topic->request.topic = aws_byte_cursor_from_string(task_topic->filter);
  1749. task_topic->request.qos = qos;
  1750. task_topic->request.on_publish = on_publish;
  1751. task_topic->request.on_cleanup = on_ud_cleanup;
  1752. task_topic->request.on_publish_ud = on_publish_ud;
  1753. /* Calculate the size of the (single) subscribe packet
  1754. * The fixed header is 2 bytes,
  1755. * the topic filter is always at least 3 bytes (1 for QoS, 2 for Length MSB/LSB)
  1756. * - plus the size of the topic filter
  1757. * and finally the packet ID is 2 bytes */
  1758. uint64_t subscribe_packet_size = 7 + topic_filter->len;
  1759. uint16_t packet_id = mqtt_create_request(
  1760. task_arg->connection,
  1761. &s_subscribe_send,
  1762. task_arg,
  1763. &s_subscribe_single_complete,
  1764. task_arg,
  1765. false, /* noRetry */
  1766. subscribe_packet_size);
  1767. if (packet_id == 0) {
  1768. AWS_LOGF_ERROR(
  1769. AWS_LS_MQTT_CLIENT,
  1770. "id=%p: Failed to start subscribe on topic " PRInSTR " with error %s",
  1771. (void *)connection,
  1772. AWS_BYTE_CURSOR_PRI(task_topic->request.topic),
  1773. aws_error_debug_str(aws_last_error()));
  1774. goto handle_error;
  1775. }
  1776. AWS_LOGF_DEBUG(
  1777. AWS_LS_MQTT_CLIENT,
  1778. "id=%p: Starting subscribe %" PRIu16 " on topic " PRInSTR,
  1779. (void *)connection,
  1780. packet_id,
  1781. AWS_BYTE_CURSOR_PRI(task_topic->request.topic));
  1782. return packet_id;
  1783. handle_error:
  1784. if (task_topic) {
  1785. if (task_topic->filter) {
  1786. aws_string_destroy(task_topic->filter);
  1787. }
  1788. aws_mem_release(connection->allocator, task_topic);
  1789. }
  1790. if (task_arg) {
  1791. aws_mem_release(connection->allocator, task_arg);
  1792. }
  1793. return 0;
  1794. }
  1795. /*******************************************************************************
  1796. * Subscribe Local
  1797. ******************************************************************************/
  1798. /* The lifetime of this struct is from subscribe -> suback */
  1799. struct subscribe_local_task_arg {
  1800. struct aws_mqtt_client_connection *connection;
  1801. struct subscribe_task_topic *task_topic;
  1802. aws_mqtt_suback_fn *on_suback;
  1803. void *on_suback_ud;
  1804. };
  1805. static enum aws_mqtt_client_request_state s_subscribe_local_send(
  1806. uint16_t packet_id,
  1807. bool is_first_attempt,
  1808. void *userdata) {
  1809. (void)is_first_attempt;
  1810. struct subscribe_local_task_arg *task_arg = userdata;
  1811. AWS_LOGF_TRACE(
  1812. AWS_LS_MQTT_CLIENT,
  1813. "id=%p: Attempting save of local subscribe %" PRIu16 " (%s)",
  1814. (void *)task_arg->connection,
  1815. packet_id,
  1816. is_first_attempt ? "first attempt" : "redo");
  1817. struct subscribe_task_topic *topic = task_arg->task_topic;
  1818. if (aws_mqtt_topic_tree_insert(
  1819. &task_arg->connection->thread_data.subscriptions,
  1820. topic->filter,
  1821. topic->request.qos,
  1822. s_on_publish_client_wrapper,
  1823. s_task_topic_release,
  1824. topic)) {
  1825. return AWS_MQTT_CLIENT_REQUEST_ERROR;
  1826. }
  1827. aws_ref_count_acquire(&topic->ref_count);
  1828. return AWS_MQTT_CLIENT_REQUEST_COMPLETE;
  1829. }
  1830. static void s_subscribe_local_complete(
  1831. struct aws_mqtt_client_connection *connection,
  1832. uint16_t packet_id,
  1833. int error_code,
  1834. void *userdata) {
  1835. struct subscribe_local_task_arg *task_arg = userdata;
  1836. AWS_LOGF_DEBUG(
  1837. AWS_LS_MQTT_CLIENT,
  1838. "id=%p: Local subscribe %" PRIu16 " completed with error code %d",
  1839. (void *)connection,
  1840. packet_id,
  1841. error_code);
  1842. struct subscribe_task_topic *topic = task_arg->task_topic;
  1843. if (task_arg->on_suback) {
  1844. aws_mqtt_suback_fn *suback = task_arg->on_suback;
  1845. suback(connection, packet_id, &topic->request.topic, topic->request.qos, error_code, task_arg->on_suback_ud);
  1846. }
  1847. s_task_topic_release(topic);
  1848. aws_mem_release(task_arg->connection->allocator, task_arg);
  1849. }
  1850. uint16_t aws_mqtt_client_connection_subscribe_local(
  1851. struct aws_mqtt_client_connection *connection,
  1852. const struct aws_byte_cursor *topic_filter,
  1853. aws_mqtt_client_publish_received_fn *on_publish,
  1854. void *on_publish_ud,
  1855. aws_mqtt_userdata_cleanup_fn *on_ud_cleanup,
  1856. aws_mqtt_suback_fn *on_suback,
  1857. void *on_suback_ud) {
  1858. AWS_PRECONDITION(connection);
  1859. if (!aws_mqtt_is_valid_topic_filter(topic_filter)) {
  1860. aws_raise_error(AWS_ERROR_MQTT_INVALID_TOPIC);
  1861. return 0;
  1862. }
  1863. struct subscribe_task_topic *task_topic = NULL;
  1864. struct subscribe_local_task_arg *task_arg =
  1865. aws_mem_calloc(connection->allocator, 1, sizeof(struct subscribe_local_task_arg));
  1866. if (!task_arg) {
  1867. goto handle_error;
  1868. }
  1869. AWS_ZERO_STRUCT(*task_arg);
  1870. task_arg->connection = connection;
  1871. task_arg->on_suback = on_suback;
  1872. task_arg->on_suback_ud = on_suback_ud;
  1873. task_topic = aws_mem_calloc(connection->allocator, 1, sizeof(struct subscribe_task_topic));
  1874. if (!task_topic) {
  1875. goto handle_error;
  1876. }
  1877. aws_ref_count_init(&task_topic->ref_count, task_topic, (aws_simple_completion_callback *)s_task_topic_clean_up);
  1878. task_arg->task_topic = task_topic;
  1879. task_topic->filter = aws_string_new_from_array(connection->allocator, topic_filter->ptr, topic_filter->len);
  1880. if (!task_topic->filter) {
  1881. goto handle_error;
  1882. }
  1883. task_topic->connection = connection;
  1884. task_topic->is_local = true;
  1885. task_topic->request.topic = aws_byte_cursor_from_string(task_topic->filter);
  1886. task_topic->request.on_publish = on_publish;
  1887. task_topic->request.on_cleanup = on_ud_cleanup;
  1888. task_topic->request.on_publish_ud = on_publish_ud;
  1889. /* Calculate the size of the (local) subscribe packet
  1890. * The fixed header is 2 bytes, the packet ID is 2 bytes
  1891. * the topic filter is always 3 bytes (1 for QoS, 2 for Length MSB/LSB)
  1892. * - plus the size of the topic filter */
  1893. uint64_t subscribe_packet_size = 7 + topic_filter->len;
  1894. uint16_t packet_id = mqtt_create_request(
  1895. task_arg->connection,
  1896. s_subscribe_local_send,
  1897. task_arg,
  1898. &s_subscribe_local_complete,
  1899. task_arg,
  1900. false, /* noRetry */
  1901. subscribe_packet_size);
  1902. if (packet_id == 0) {
  1903. AWS_LOGF_ERROR(
  1904. AWS_LS_MQTT_CLIENT,
  1905. "id=%p: Failed to start local subscribe on topic " PRInSTR " with error %s",
  1906. (void *)connection,
  1907. AWS_BYTE_CURSOR_PRI(task_topic->request.topic),
  1908. aws_error_debug_str(aws_last_error()));
  1909. goto handle_error;
  1910. }
  1911. AWS_LOGF_DEBUG(
  1912. AWS_LS_MQTT_CLIENT,
  1913. "id=%p: Starting local subscribe %" PRIu16 " on topic " PRInSTR,
  1914. (void *)connection,
  1915. packet_id,
  1916. AWS_BYTE_CURSOR_PRI(task_topic->request.topic));
  1917. return packet_id;
  1918. handle_error:
  1919. if (task_topic) {
  1920. if (task_topic->filter) {
  1921. aws_string_destroy(task_topic->filter);
  1922. }
  1923. aws_mem_release(connection->allocator, task_topic);
  1924. }
  1925. if (task_arg) {
  1926. aws_mem_release(connection->allocator, task_arg);
  1927. }
  1928. return 0;
  1929. }
  1930. /*******************************************************************************
  1931. * Resubscribe
  1932. ******************************************************************************/
  1933. static bool s_reconnect_resub_iterator(const struct aws_byte_cursor *topic, enum aws_mqtt_qos qos, void *user_data) {
  1934. struct subscribe_task_arg *task_arg = user_data;
  1935. struct subscribe_task_topic *task_topic =
  1936. aws_mem_calloc(task_arg->connection->allocator, 1, sizeof(struct subscribe_task_topic));
  1937. struct aws_mqtt_topic_subscription sub;
  1938. AWS_ZERO_STRUCT(sub);
  1939. sub.topic = *topic;
  1940. sub.qos = qos;
  1941. task_topic->request = sub;
  1942. task_topic->connection = task_arg->connection;
  1943. aws_array_list_push_back(&task_arg->topics, &task_topic);
  1944. aws_ref_count_init(&task_topic->ref_count, task_topic, (aws_simple_completion_callback *)s_task_topic_clean_up);
  1945. return true;
  1946. }
  1947. static bool s_reconnect_resub_operation_statistics_iterator(
  1948. const struct aws_byte_cursor *topic,
  1949. enum aws_mqtt_qos qos,
  1950. void *user_data) {
  1951. (void)qos;
  1952. uint64_t *packet_size = user_data;
  1953. /* Always 3 bytes (1 for QoS, 2 for length MSB and LSB respectively) */
  1954. *packet_size += 3;
  1955. /* The size of the topic filter */
  1956. *packet_size += topic->len;
  1957. return true;
  1958. }
  1959. static enum aws_mqtt_client_request_state s_resubscribe_send(
  1960. uint16_t packet_id,
  1961. bool is_first_attempt,
  1962. void *userdata) {
  1963. struct subscribe_task_arg *task_arg = userdata;
  1964. bool initing_packet = task_arg->subscribe.fixed_header.packet_type == 0;
  1965. struct aws_io_message *message = NULL;
  1966. const size_t sub_count = aws_mqtt_topic_tree_get_sub_count(&task_arg->connection->thread_data.subscriptions);
  1967. /* Init the topics list even if there are no topics because the s_resubscribe_complete callback will always run. */
  1968. if (aws_array_list_init_dynamic(&task_arg->topics, task_arg->connection->allocator, sub_count, sizeof(void *))) {
  1969. goto handle_error;
  1970. }
  1971. if (sub_count == 0) {
  1972. AWS_LOGF_TRACE(
  1973. AWS_LS_MQTT_CLIENT,
  1974. "id=%p: Not subscribed to any topics. Resubscribe is unnecessary, no packet will be sent.",
  1975. (void *)task_arg->connection);
  1976. return AWS_MQTT_CLIENT_REQUEST_COMPLETE;
  1977. }
  1978. aws_mqtt_topic_tree_iterate(&task_arg->connection->thread_data.subscriptions, s_reconnect_resub_iterator, task_arg);
  1979. AWS_LOGF_TRACE(
  1980. AWS_LS_MQTT_CLIENT,
  1981. "id=%p: Attempting send of resubscribe %" PRIu16 " (%s)",
  1982. (void *)task_arg->connection,
  1983. packet_id,
  1984. is_first_attempt ? "first attempt" : "resend");
  1985. if (initing_packet) {
  1986. /* Init the subscribe packet */
  1987. if (aws_mqtt_packet_subscribe_init(&task_arg->subscribe, task_arg->connection->allocator, packet_id)) {
  1988. return AWS_MQTT_CLIENT_REQUEST_ERROR;
  1989. }
  1990. const size_t num_topics = aws_array_list_length(&task_arg->topics);
  1991. if (num_topics <= 0) {
  1992. aws_raise_error(AWS_ERROR_MQTT_INVALID_TOPIC);
  1993. return AWS_MQTT_CLIENT_REQUEST_ERROR;
  1994. }
  1995. for (size_t i = 0; i < num_topics; ++i) {
  1996. struct subscribe_task_topic *topic = NULL;
  1997. aws_array_list_get_at(&task_arg->topics, &topic, i);
  1998. AWS_ASSUME(topic); /* We know we're within bounds */
  1999. if (aws_mqtt_packet_subscribe_add_topic(&task_arg->subscribe, topic->request.topic, topic->request.qos)) {
  2000. goto handle_error;
  2001. }
  2002. }
  2003. }
  2004. message = mqtt_get_message_for_packet(task_arg->connection, &task_arg->subscribe.fixed_header);
  2005. if (!message) {
  2006. goto handle_error;
  2007. }
  2008. if (aws_mqtt_packet_subscribe_encode(&message->message_data, &task_arg->subscribe)) {
  2009. goto handle_error;
  2010. }
  2011. /* This is not necessarily a fatal error; if the send fails, it'll just retry. Still need to clean up though. */
  2012. if (aws_channel_slot_send_message(task_arg->connection->slot, message, AWS_CHANNEL_DIR_WRITE)) {
  2013. aws_mem_release(message->allocator, message);
  2014. }
  2015. return AWS_MQTT_CLIENT_REQUEST_ONGOING;
  2016. handle_error:
  2017. if (message) {
  2018. aws_mem_release(message->allocator, message);
  2019. }
  2020. return AWS_MQTT_CLIENT_REQUEST_ERROR;
  2021. }
  2022. static void s_resubscribe_complete(
  2023. struct aws_mqtt_client_connection *connection,
  2024. uint16_t packet_id,
  2025. int error_code,
  2026. void *userdata) {
  2027. struct subscribe_task_arg *task_arg = userdata;
  2028. const size_t list_len = aws_array_list_length(&task_arg->topics);
  2029. if (list_len <= 0) {
  2030. goto clean_up;
  2031. }
  2032. struct subscribe_task_topic *topic = NULL;
  2033. aws_array_list_get_at(&task_arg->topics, &topic, 0);
  2034. AWS_ASSUME(topic);
  2035. AWS_LOGF_DEBUG(
  2036. AWS_LS_MQTT_CLIENT,
  2037. "id=%p: Subscribe %" PRIu16 " completed with error_code %d",
  2038. (void *)connection,
  2039. packet_id,
  2040. error_code);
  2041. if (task_arg->on_suback.multi) {
  2042. /* create a list of aws_mqtt_topic_subscription pointers from topics for the callback */
  2043. AWS_VARIABLE_LENGTH_ARRAY(uint8_t, cb_list_buf, list_len * sizeof(void *));
  2044. struct aws_array_list cb_list;
  2045. aws_array_list_init_static(&cb_list, cb_list_buf, list_len, sizeof(void *));
  2046. int err = 0;
  2047. for (size_t i = 0; i < list_len; i++) {
  2048. err |= aws_array_list_get_at(&task_arg->topics, &topic, i);
  2049. struct aws_mqtt_topic_subscription *subscription = &topic->request;
  2050. err |= aws_array_list_push_back(&cb_list, &subscription);
  2051. }
  2052. AWS_ASSUME(!err);
  2053. task_arg->on_suback.multi(connection, packet_id, &cb_list, error_code, task_arg->on_suback_ud);
  2054. aws_array_list_clean_up(&cb_list);
  2055. } else if (task_arg->on_suback.single) {
  2056. task_arg->on_suback.single(
  2057. connection, packet_id, &topic->request.topic, topic->request.qos, error_code, task_arg->on_suback_ud);
  2058. }
  2059. clean_up:
  2060. /* We need to cleanup the subscribe_task_topics, since they are not inserted into the topic tree by resubscribe. We
  2061. * take the ownership to clean it up */
  2062. for (size_t i = 0; i < list_len; i++) {
  2063. aws_array_list_get_at(&task_arg->topics, &topic, i);
  2064. s_task_topic_release(topic);
  2065. }
  2066. aws_array_list_clean_up(&task_arg->topics);
  2067. aws_mqtt_packet_subscribe_clean_up(&task_arg->subscribe);
  2068. aws_mem_release(task_arg->connection->allocator, task_arg);
  2069. }
  2070. uint16_t aws_mqtt_resubscribe_existing_topics(
  2071. struct aws_mqtt_client_connection *connection,
  2072. aws_mqtt_suback_multi_fn *on_suback,
  2073. void *on_suback_ud) {
  2074. struct subscribe_task_arg *task_arg = aws_mem_calloc(connection->allocator, 1, sizeof(struct subscribe_task_arg));
  2075. if (!task_arg) {
  2076. AWS_LOGF_ERROR(
  2077. AWS_LS_MQTT_CLIENT, "id=%p: failed to allocate storage for resubscribe arguments", (void *)connection);
  2078. return 0;
  2079. }
  2080. AWS_ZERO_STRUCT(*task_arg);
  2081. task_arg->connection = connection;
  2082. task_arg->on_suback.multi = on_suback;
  2083. task_arg->on_suback_ud = on_suback_ud;
  2084. /* Calculate the size of the packet.
  2085. * The fixed header is 2 bytes and the packet ID is 2 bytes
  2086. * plus the size of each topic in the topic tree */
  2087. uint64_t resubscribe_packet_size = 4;
  2088. /* Get the length of each subscription we are going to resubscribe with */
  2089. aws_mqtt_topic_tree_iterate(
  2090. &connection->thread_data.subscriptions,
  2091. s_reconnect_resub_operation_statistics_iterator,
  2092. &resubscribe_packet_size);
  2093. uint16_t packet_id = mqtt_create_request(
  2094. task_arg->connection,
  2095. &s_resubscribe_send,
  2096. task_arg,
  2097. &s_resubscribe_complete,
  2098. task_arg,
  2099. false, /* noRetry */
  2100. resubscribe_packet_size);
  2101. if (packet_id == 0) {
  2102. AWS_LOGF_ERROR(
  2103. AWS_LS_MQTT_CLIENT,
  2104. "id=%p: Failed to send multi-topic resubscribe with error %s",
  2105. (void *)connection,
  2106. aws_error_name(aws_last_error()));
  2107. goto handle_error;
  2108. }
  2109. AWS_LOGF_DEBUG(
  2110. AWS_LS_MQTT_CLIENT, "id=%p: Sending multi-topic resubscribe %" PRIu16, (void *)connection, packet_id);
  2111. return packet_id;
  2112. handle_error:
  2113. aws_mem_release(connection->allocator, task_arg);
  2114. return 0;
  2115. }
  2116. /*******************************************************************************
  2117. * Unsubscribe
  2118. ******************************************************************************/
  2119. struct unsubscribe_task_arg {
  2120. struct aws_mqtt_client_connection *connection;
  2121. struct aws_string *filter_string;
  2122. struct aws_byte_cursor filter;
  2123. bool is_local;
  2124. /* Packet to populate */
  2125. struct aws_mqtt_packet_unsubscribe unsubscribe;
  2126. /* true if transaction was committed to the topic tree, false requires a retry */
  2127. bool tree_updated;
  2128. aws_mqtt_op_complete_fn *on_unsuback;
  2129. void *on_unsuback_ud;
  2130. struct request_timeout_wrapper timeout_wrapper;
  2131. };
  2132. static enum aws_mqtt_client_request_state s_unsubscribe_send(
  2133. uint16_t packet_id,
  2134. bool is_first_attempt,
  2135. void *userdata) {
  2136. (void)is_first_attempt;
  2137. struct unsubscribe_task_arg *task_arg = userdata;
  2138. struct aws_io_message *message = NULL;
  2139. AWS_LOGF_TRACE(
  2140. AWS_LS_MQTT_CLIENT,
  2141. "id=%p: Attempting send of unsubscribe %" PRIu16 " %s",
  2142. (void *)task_arg->connection,
  2143. packet_id,
  2144. is_first_attempt ? "first attempt" : "resend");
  2145. static const size_t num_topics = 1;
  2146. AWS_VARIABLE_LENGTH_ARRAY(uint8_t, transaction_buf, num_topics * aws_mqtt_topic_tree_action_size);
  2147. struct aws_array_list transaction;
  2148. aws_array_list_init_static(&transaction, transaction_buf, num_topics, aws_mqtt_topic_tree_action_size);
  2149. if (!task_arg->tree_updated) {
  2150. struct subscribe_task_topic *topic;
  2151. if (aws_mqtt_topic_tree_transaction_remove(
  2152. &task_arg->connection->thread_data.subscriptions, &transaction, &task_arg->filter, (void **)&topic)) {
  2153. goto handle_error;
  2154. }
  2155. task_arg->is_local = topic ? topic->is_local : false;
  2156. }
  2157. if (!task_arg->is_local) {
  2158. if (task_arg->unsubscribe.fixed_header.packet_type == 0) {
  2159. /* If unsubscribe packet is uninitialized, init it */
  2160. if (aws_mqtt_packet_unsubscribe_init(&task_arg->unsubscribe, task_arg->connection->allocator, packet_id)) {
  2161. goto handle_error;
  2162. }
  2163. if (aws_mqtt_packet_unsubscribe_add_topic(&task_arg->unsubscribe, task_arg->filter)) {
  2164. goto handle_error;
  2165. }
  2166. }
  2167. message = mqtt_get_message_for_packet(task_arg->connection, &task_arg->unsubscribe.fixed_header);
  2168. if (!message) {
  2169. goto handle_error;
  2170. }
  2171. if (aws_mqtt_packet_unsubscribe_encode(&message->message_data, &task_arg->unsubscribe)) {
  2172. goto handle_error;
  2173. }
  2174. if (aws_channel_slot_send_message(task_arg->connection->slot, message, AWS_CHANNEL_DIR_WRITE)) {
  2175. goto handle_error;
  2176. }
  2177. /* TODO: timing should start from the message written into the socket, which is aws_io_message->on_completion
  2178. * invoked, but there are bugs in the websocket handler (and maybe also the h1 handler?) where we don't properly
  2179. * fire the on_completion callbacks. */
  2180. struct request_timeout_task_arg *timeout_task_arg = s_schedule_timeout_task(task_arg->connection, packet_id);
  2181. if (!timeout_task_arg) {
  2182. return AWS_MQTT_CLIENT_REQUEST_ERROR;
  2183. }
  2184. /*
  2185. * Set up mutual references between the operation task args and the timeout task args. Whoever runs first
  2186. * "wins", does its logic, and then breaks the connection between the two.
  2187. */
  2188. task_arg->timeout_wrapper.timeout_task_arg = timeout_task_arg;
  2189. timeout_task_arg->task_arg_wrapper = &task_arg->timeout_wrapper;
  2190. }
  2191. if (!task_arg->tree_updated) {
  2192. aws_mqtt_topic_tree_transaction_commit(&task_arg->connection->thread_data.subscriptions, &transaction);
  2193. task_arg->tree_updated = true;
  2194. }
  2195. aws_array_list_clean_up(&transaction);
  2196. /* If the subscribe is local-only, don't wait for a SUBACK to come back. */
  2197. return task_arg->is_local ? AWS_MQTT_CLIENT_REQUEST_COMPLETE : AWS_MQTT_CLIENT_REQUEST_ONGOING;
  2198. handle_error:
  2199. if (message) {
  2200. aws_mem_release(message->allocator, message);
  2201. }
  2202. if (!task_arg->tree_updated) {
  2203. aws_mqtt_topic_tree_transaction_roll_back(&task_arg->connection->thread_data.subscriptions, &transaction);
  2204. }
  2205. aws_array_list_clean_up(&transaction);
  2206. return AWS_MQTT_CLIENT_REQUEST_ERROR;
  2207. }
  2208. static void s_unsubscribe_complete(
  2209. struct aws_mqtt_client_connection *connection,
  2210. uint16_t packet_id,
  2211. int error_code,
  2212. void *userdata) {
  2213. struct unsubscribe_task_arg *task_arg = userdata;
  2214. AWS_LOGF_DEBUG(AWS_LS_MQTT_CLIENT, "id=%p: Unsubscribe %" PRIu16 " complete", (void *)connection, packet_id);
  2215. /*
  2216. * If we have a forward pointer to a timeout task, then that means the timeout task has not run yet. So we should
  2217. * follow it and zero out the back pointer to us, because we're going away now. The timeout task will run later
  2218. * and be harmless (even vs. future operations with the same packet id) because it only cancels if it has a back
  2219. * pointer.
  2220. */
  2221. if (task_arg->timeout_wrapper.timeout_task_arg) {
  2222. task_arg->timeout_wrapper.timeout_task_arg->task_arg_wrapper = NULL;
  2223. task_arg->timeout_wrapper.timeout_task_arg = NULL;
  2224. }
  2225. if (task_arg->on_unsuback) {
  2226. task_arg->on_unsuback(connection, packet_id, error_code, task_arg->on_unsuback_ud);
  2227. }
  2228. aws_string_destroy(task_arg->filter_string);
  2229. aws_mqtt_packet_unsubscribe_clean_up(&task_arg->unsubscribe);
  2230. aws_mem_release(task_arg->connection->allocator, task_arg);
  2231. }
  2232. uint16_t aws_mqtt_client_connection_unsubscribe(
  2233. struct aws_mqtt_client_connection *connection,
  2234. const struct aws_byte_cursor *topic_filter,
  2235. aws_mqtt_op_complete_fn *on_unsuback,
  2236. void *on_unsuback_ud) {
  2237. AWS_PRECONDITION(connection);
  2238. if (!aws_mqtt_is_valid_topic_filter(topic_filter)) {
  2239. aws_raise_error(AWS_ERROR_MQTT_INVALID_TOPIC);
  2240. return 0;
  2241. }
  2242. struct unsubscribe_task_arg *task_arg =
  2243. aws_mem_calloc(connection->allocator, 1, sizeof(struct unsubscribe_task_arg));
  2244. if (!task_arg) {
  2245. return 0;
  2246. }
  2247. task_arg->connection = connection;
  2248. task_arg->filter_string = aws_string_new_from_array(connection->allocator, topic_filter->ptr, topic_filter->len);
  2249. task_arg->filter = aws_byte_cursor_from_string(task_arg->filter_string);
  2250. task_arg->on_unsuback = on_unsuback;
  2251. task_arg->on_unsuback_ud = on_unsuback_ud;
  2252. /* Calculate the size of the unsubscribe packet.
  2253. * The fixed header is always 2 bytes, the packet ID is always 2 bytes
  2254. * plus the size of the topic filter */
  2255. uint64_t unsubscribe_packet_size = 4 + task_arg->filter.len;
  2256. uint16_t packet_id = mqtt_create_request(
  2257. connection,
  2258. &s_unsubscribe_send,
  2259. task_arg,
  2260. s_unsubscribe_complete,
  2261. task_arg,
  2262. false, /* noRetry */
  2263. unsubscribe_packet_size);
  2264. if (packet_id == 0) {
  2265. AWS_LOGF_DEBUG(
  2266. AWS_LS_MQTT_CLIENT,
  2267. "id=%p: Failed to start unsubscribe, with error %s",
  2268. (void *)connection,
  2269. aws_error_debug_str(aws_last_error()));
  2270. goto handle_error;
  2271. }
  2272. AWS_LOGF_DEBUG(AWS_LS_MQTT_CLIENT, "id=%p: Starting unsubscribe %" PRIu16, (void *)connection, packet_id);
  2273. return packet_id;
  2274. handle_error:
  2275. aws_string_destroy(task_arg->filter_string);
  2276. aws_mem_release(connection->allocator, task_arg);
  2277. return 0;
  2278. }
  2279. /*******************************************************************************
  2280. * Publish
  2281. ******************************************************************************/
  2282. struct publish_task_arg {
  2283. struct aws_mqtt_client_connection *connection;
  2284. struct aws_string *topic_string;
  2285. struct aws_byte_cursor topic;
  2286. enum aws_mqtt_qos qos;
  2287. bool retain;
  2288. struct aws_byte_cursor payload;
  2289. struct aws_byte_buf payload_buf;
  2290. /* Packet to populate */
  2291. struct aws_mqtt_packet_publish publish;
  2292. aws_mqtt_op_complete_fn *on_complete;
  2293. void *userdata;
  2294. struct request_timeout_wrapper timeout_wrapper;
  2295. };
  2296. /* should only be called by tests */
  2297. static int s_get_stuff_from_outstanding_requests_table(
  2298. struct aws_mqtt_client_connection *connection,
  2299. uint16_t packet_id,
  2300. struct aws_allocator *allocator,
  2301. struct aws_byte_buf *result_buf,
  2302. struct aws_string **result_string) {
  2303. int err = AWS_OP_SUCCESS;
  2304. aws_mutex_lock(&connection->synced_data.lock);
  2305. struct aws_hash_element *elem = NULL;
  2306. aws_hash_table_find(&connection->synced_data.outstanding_requests_table, &packet_id, &elem);
  2307. if (elem) {
  2308. struct aws_mqtt_request *request = elem->value;
  2309. struct publish_task_arg *pub = (struct publish_task_arg *)request->send_request_ud;
  2310. if (result_buf != NULL) {
  2311. if (aws_byte_buf_init_copy(result_buf, allocator, &pub->payload_buf)) {
  2312. err = AWS_OP_ERR;
  2313. }
  2314. } else if (result_string != NULL) {
  2315. *result_string = aws_string_new_from_string(allocator, pub->topic_string);
  2316. if (*result_string == NULL) {
  2317. err = AWS_OP_ERR;
  2318. }
  2319. }
  2320. } else {
  2321. /* So lovely that this error is defined, but hashtable never actually raises it */
  2322. err = aws_raise_error(AWS_ERROR_HASHTBL_ITEM_NOT_FOUND);
  2323. }
  2324. aws_mutex_unlock(&connection->synced_data.lock);
  2325. return err;
  2326. }
  2327. /* should only be called by tests */
  2328. int aws_mqtt_client_get_payload_for_outstanding_publish_packet(
  2329. struct aws_mqtt_client_connection *connection,
  2330. uint16_t packet_id,
  2331. struct aws_allocator *allocator,
  2332. struct aws_byte_buf *result) {
  2333. AWS_ZERO_STRUCT(*result);
  2334. return s_get_stuff_from_outstanding_requests_table(connection, packet_id, allocator, result, NULL);
  2335. }
  2336. /* should only be called by tests */
  2337. int aws_mqtt_client_get_topic_for_outstanding_publish_packet(
  2338. struct aws_mqtt_client_connection *connection,
  2339. uint16_t packet_id,
  2340. struct aws_allocator *allocator,
  2341. struct aws_string **result) {
  2342. *result = NULL;
  2343. return s_get_stuff_from_outstanding_requests_table(connection, packet_id, allocator, NULL, result);
  2344. }
  2345. static enum aws_mqtt_client_request_state s_publish_send(uint16_t packet_id, bool is_first_attempt, void *userdata) {
  2346. struct publish_task_arg *task_arg = userdata;
  2347. struct aws_mqtt_client_connection *connection = task_arg->connection;
  2348. AWS_LOGF_TRACE(
  2349. AWS_LS_MQTT_CLIENT,
  2350. "id=%p: Attempting send of publish %" PRIu16 " %s",
  2351. (void *)task_arg->connection,
  2352. packet_id,
  2353. is_first_attempt ? "first attempt" : "resend");
  2354. bool is_qos_0 = task_arg->qos == AWS_MQTT_QOS_AT_MOST_ONCE;
  2355. if (is_qos_0) {
  2356. packet_id = 0;
  2357. }
  2358. if (is_first_attempt) {
  2359. if (aws_mqtt_packet_publish_init(
  2360. &task_arg->publish,
  2361. task_arg->retain,
  2362. task_arg->qos,
  2363. !is_first_attempt,
  2364. task_arg->topic,
  2365. packet_id,
  2366. task_arg->payload)) {
  2367. return AWS_MQTT_CLIENT_REQUEST_ERROR;
  2368. }
  2369. }
  2370. struct aws_io_message *message = mqtt_get_message_for_packet(task_arg->connection, &task_arg->publish.fixed_header);
  2371. if (!message) {
  2372. return AWS_MQTT_CLIENT_REQUEST_ERROR;
  2373. }
  2374. /* Encode the headers, and everything but the payload */
  2375. if (aws_mqtt_packet_publish_encode_headers(&message->message_data, &task_arg->publish)) {
  2376. return AWS_MQTT_CLIENT_REQUEST_ERROR;
  2377. }
  2378. struct aws_byte_cursor payload_cur = task_arg->payload;
  2379. {
  2380. write_payload_chunk:
  2381. (void)NULL;
  2382. const size_t left_in_message = message->message_data.capacity - message->message_data.len;
  2383. const size_t to_write = payload_cur.len < left_in_message ? payload_cur.len : left_in_message;
  2384. if (to_write) {
  2385. /* Write this chunk */
  2386. struct aws_byte_cursor to_write_cur = aws_byte_cursor_advance(&payload_cur, to_write);
  2387. AWS_ASSERT(to_write_cur.ptr); /* to_write is guaranteed to be inside the bounds of payload_cur */
  2388. if (!aws_byte_buf_write_from_whole_cursor(&message->message_data, to_write_cur)) {
  2389. aws_mem_release(message->allocator, message);
  2390. return AWS_MQTT_CLIENT_REQUEST_ERROR;
  2391. }
  2392. }
  2393. if (aws_channel_slot_send_message(task_arg->connection->slot, message, AWS_CHANNEL_DIR_WRITE)) {
  2394. aws_mem_release(message->allocator, message);
  2395. /* If it's QoS 0, telling user that the message haven't been sent, else, the message will be resent once the
  2396. * connection is back */
  2397. return is_qos_0 ? AWS_MQTT_CLIENT_REQUEST_ERROR : AWS_MQTT_CLIENT_REQUEST_ONGOING;
  2398. }
  2399. /* If there's still payload left, get a new message and start again. */
  2400. if (payload_cur.len) {
  2401. message = mqtt_get_message_for_packet(task_arg->connection, &task_arg->publish.fixed_header);
  2402. goto write_payload_chunk;
  2403. }
  2404. }
  2405. if (!is_qos_0 && connection->operation_timeout_ns != UINT64_MAX) {
  2406. /* TODO: timing should start from the message written into the socket, which is aws_io_message->on_completion
  2407. * invoked, but there are bugs in the websocket handler (and maybe also the h1 handler?) where we don't properly
  2408. * fire fire the on_completion callbacks. */
  2409. struct request_timeout_task_arg *timeout_task_arg = s_schedule_timeout_task(connection, packet_id);
  2410. if (!timeout_task_arg) {
  2411. return AWS_MQTT_CLIENT_REQUEST_ERROR;
  2412. }
  2413. /*
  2414. * Set up mutual references between the operation task args and the timeout task args. Whoever runs first
  2415. * "wins", does its logic, and then breaks the connection between the two.
  2416. */
  2417. task_arg->timeout_wrapper.timeout_task_arg = timeout_task_arg;
  2418. timeout_task_arg->task_arg_wrapper = &task_arg->timeout_wrapper;
  2419. }
  2420. /* If QoS == 0, there will be no ack, so consider the request done now. */
  2421. return is_qos_0 ? AWS_MQTT_CLIENT_REQUEST_COMPLETE : AWS_MQTT_CLIENT_REQUEST_ONGOING;
  2422. }
  2423. static void s_publish_complete(
  2424. struct aws_mqtt_client_connection *connection,
  2425. uint16_t packet_id,
  2426. int error_code,
  2427. void *userdata) {
  2428. struct publish_task_arg *task_arg = userdata;
  2429. AWS_LOGF_DEBUG(AWS_LS_MQTT_CLIENT, "id=%p: Publish %" PRIu16 " complete", (void *)connection, packet_id);
  2430. if (task_arg->on_complete) {
  2431. task_arg->on_complete(connection, packet_id, error_code, task_arg->userdata);
  2432. }
  2433. /*
  2434. * If we have a forward pointer to a timeout task, then that means the timeout task has not run yet. So we should
  2435. * follow it and zero out the back pointer to us, because we're going away now. The timeout task will run later
  2436. * and be harmless (even vs. future operations with the same packet id) because it only cancels if it has a back
  2437. * pointer.
  2438. */
  2439. if (task_arg->timeout_wrapper.timeout_task_arg != NULL) {
  2440. task_arg->timeout_wrapper.timeout_task_arg->task_arg_wrapper = NULL;
  2441. task_arg->timeout_wrapper.timeout_task_arg = NULL;
  2442. }
  2443. aws_byte_buf_clean_up(&task_arg->payload_buf);
  2444. aws_string_destroy(task_arg->topic_string);
  2445. aws_mem_release(connection->allocator, task_arg);
  2446. }
  2447. uint16_t aws_mqtt_client_connection_publish(
  2448. struct aws_mqtt_client_connection *connection,
  2449. const struct aws_byte_cursor *topic,
  2450. enum aws_mqtt_qos qos,
  2451. bool retain,
  2452. const struct aws_byte_cursor *payload,
  2453. aws_mqtt_op_complete_fn *on_complete,
  2454. void *userdata) {
  2455. AWS_PRECONDITION(connection);
  2456. if (!aws_mqtt_is_valid_topic(topic)) {
  2457. aws_raise_error(AWS_ERROR_MQTT_INVALID_TOPIC);
  2458. return 0;
  2459. }
  2460. struct publish_task_arg *arg = aws_mem_calloc(connection->allocator, 1, sizeof(struct publish_task_arg));
  2461. if (!arg) {
  2462. return 0;
  2463. }
  2464. arg->connection = connection;
  2465. arg->topic_string = aws_string_new_from_array(connection->allocator, topic->ptr, topic->len);
  2466. arg->topic = aws_byte_cursor_from_string(arg->topic_string);
  2467. arg->qos = qos;
  2468. arg->retain = retain;
  2469. if (aws_byte_buf_init_copy_from_cursor(&arg->payload_buf, connection->allocator, *payload)) {
  2470. goto handle_error;
  2471. }
  2472. arg->payload = aws_byte_cursor_from_buf(&arg->payload_buf);
  2473. arg->on_complete = on_complete;
  2474. arg->userdata = userdata;
  2475. /* Calculate the size of the publish packet.
  2476. * The fixed header size is 2 bytes, the packet ID is 2 bytes,
  2477. * plus the size of both the topic name and payload */
  2478. uint64_t publish_packet_size = 4 + arg->topic.len + arg->payload.len;
  2479. bool retry = qos == AWS_MQTT_QOS_AT_MOST_ONCE;
  2480. uint16_t packet_id =
  2481. mqtt_create_request(connection, &s_publish_send, arg, &s_publish_complete, arg, retry, publish_packet_size);
  2482. if (packet_id == 0) {
  2483. /* bummer, we failed to make a new request */
  2484. AWS_LOGF_ERROR(
  2485. AWS_LS_MQTT_CLIENT,
  2486. "id=%p: Failed starting publish to topic " PRInSTR ",error %d (%s)",
  2487. (void *)connection,
  2488. AWS_BYTE_CURSOR_PRI(*topic),
  2489. aws_last_error(),
  2490. aws_error_name(aws_last_error()));
  2491. goto handle_error;
  2492. }
  2493. AWS_LOGF_DEBUG(
  2494. AWS_LS_MQTT_CLIENT,
  2495. "id=%p: Starting publish %" PRIu16 " to topic " PRInSTR,
  2496. (void *)connection,
  2497. packet_id,
  2498. AWS_BYTE_CURSOR_PRI(*topic));
  2499. return packet_id;
  2500. handle_error:
  2501. /* we know arg is valid, topic_string may or may not be valid */
  2502. if (arg->topic_string) {
  2503. aws_string_destroy(arg->topic_string);
  2504. }
  2505. aws_byte_buf_clean_up(&arg->payload_buf);
  2506. aws_mem_release(connection->allocator, arg);
  2507. return 0;
  2508. }
  2509. /*******************************************************************************
  2510. * Ping
  2511. ******************************************************************************/
  2512. static void s_pingresp_received_timeout(struct aws_channel_task *channel_task, void *arg, enum aws_task_status status) {
  2513. struct aws_mqtt_client_connection *connection = arg;
  2514. if (status == AWS_TASK_STATUS_RUN_READY) {
  2515. /* Check that a pingresp has been received since pingreq was sent */
  2516. if (connection->thread_data.waiting_on_ping_response) {
  2517. connection->thread_data.waiting_on_ping_response = false;
  2518. /* It's been too long since the last ping, close the connection */
  2519. AWS_LOGF_ERROR(AWS_LS_MQTT_CLIENT, "id=%p: ping timeout detected", (void *)connection);
  2520. aws_channel_shutdown(connection->slot->channel, AWS_ERROR_MQTT_TIMEOUT);
  2521. }
  2522. }
  2523. aws_mem_release(connection->allocator, channel_task);
  2524. }
  2525. static enum aws_mqtt_client_request_state s_pingreq_send(uint16_t packet_id, bool is_first_attempt, void *userdata) {
  2526. (void)packet_id;
  2527. (void)is_first_attempt;
  2528. AWS_PRECONDITION(is_first_attempt);
  2529. struct aws_mqtt_client_connection *connection = userdata;
  2530. AWS_LOGF_TRACE(AWS_LS_MQTT_CLIENT, "id=%p: pingreq send", (void *)connection);
  2531. struct aws_mqtt_packet_connection pingreq;
  2532. aws_mqtt_packet_pingreq_init(&pingreq);
  2533. struct aws_io_message *message = mqtt_get_message_for_packet(connection, &pingreq.fixed_header);
  2534. if (!message) {
  2535. return AWS_MQTT_CLIENT_REQUEST_ERROR;
  2536. }
  2537. if (aws_mqtt_packet_connection_encode(&message->message_data, &pingreq)) {
  2538. aws_mem_release(message->allocator, message);
  2539. return AWS_MQTT_CLIENT_REQUEST_ERROR;
  2540. }
  2541. if (aws_channel_slot_send_message(connection->slot, message, AWS_CHANNEL_DIR_WRITE)) {
  2542. aws_mem_release(message->allocator, message);
  2543. return AWS_MQTT_CLIENT_REQUEST_ERROR;
  2544. }
  2545. /* Mark down that now is when the last pingreq was sent */
  2546. connection->thread_data.waiting_on_ping_response = true;
  2547. struct aws_channel_task *ping_timeout_task =
  2548. aws_mem_calloc(connection->allocator, 1, sizeof(struct aws_channel_task));
  2549. if (!ping_timeout_task) {
  2550. /* allocation failed, no log, just return error. */
  2551. goto error;
  2552. }
  2553. aws_channel_task_init(ping_timeout_task, s_pingresp_received_timeout, connection, "mqtt_pingresp_timeout");
  2554. uint64_t now = 0;
  2555. if (aws_channel_current_clock_time(connection->slot->channel, &now)) {
  2556. goto error;
  2557. }
  2558. now += connection->ping_timeout_ns;
  2559. aws_channel_schedule_task_future(connection->slot->channel, ping_timeout_task, now);
  2560. return AWS_MQTT_CLIENT_REQUEST_COMPLETE;
  2561. error:
  2562. return AWS_MQTT_CLIENT_REQUEST_ERROR;
  2563. }
  2564. int aws_mqtt_client_connection_ping(struct aws_mqtt_client_connection *connection) {
  2565. AWS_LOGF_DEBUG(AWS_LS_MQTT_CLIENT, "id=%p: Starting ping", (void *)connection);
  2566. uint16_t packet_id =
  2567. mqtt_create_request(connection, &s_pingreq_send, connection, NULL, NULL, true, /* noRetry */ 0);
  2568. AWS_LOGF_DEBUG(AWS_LS_MQTT_CLIENT, "id=%p: Starting ping with packet id %" PRIu16, (void *)connection, packet_id);
  2569. return (packet_id > 0) ? AWS_OP_SUCCESS : AWS_OP_ERR;
  2570. }
  2571. /*******************************************************************************
  2572. * Operation Statistics
  2573. ******************************************************************************/
  2574. void aws_mqtt_connection_statistics_change_operation_statistic_state(
  2575. struct aws_mqtt_client_connection *connection,
  2576. struct aws_mqtt_request *request,
  2577. enum aws_mqtt_operation_statistic_state_flags new_state_flags) {
  2578. // Error checking
  2579. if (!connection) {
  2580. AWS_LOGF_ERROR(
  2581. AWS_LS_MQTT_CLIENT, "Invalid MQTT311 connection used when trying to change operation statistic state");
  2582. return;
  2583. }
  2584. if (!request) {
  2585. AWS_LOGF_ERROR(
  2586. AWS_LS_MQTT_CLIENT, "Invalid MQTT311 request used when trying to change operation statistic state");
  2587. return;
  2588. }
  2589. uint64_t packet_size = request->packet_size;
  2590. /**
  2591. * If the packet size is zero, then just skip it as we only want to track packets we have intentially
  2592. * calculated the size of and therefore it will be non-zero (zero packets will be ACKs, Pings, etc)
  2593. */
  2594. if (packet_size <= 0) {
  2595. return;
  2596. }
  2597. enum aws_mqtt_operation_statistic_state_flags old_state_flags = request->statistic_state_flags;
  2598. if (new_state_flags == old_state_flags) {
  2599. return;
  2600. }
  2601. struct aws_mqtt_connection_operation_statistics_impl *stats = &connection->operation_statistics_impl;
  2602. if ((old_state_flags & AWS_MQTT_OSS_INCOMPLETE) != (new_state_flags & AWS_MQTT_OSS_INCOMPLETE)) {
  2603. if ((new_state_flags & AWS_MQTT_OSS_INCOMPLETE) != 0) {
  2604. aws_atomic_fetch_add(&stats->incomplete_operation_count_atomic, 1);
  2605. aws_atomic_fetch_add(&stats->incomplete_operation_size_atomic, (size_t)packet_size);
  2606. } else {
  2607. aws_atomic_fetch_sub(&stats->incomplete_operation_count_atomic, 1);
  2608. aws_atomic_fetch_sub(&stats->incomplete_operation_size_atomic, (size_t)packet_size);
  2609. }
  2610. }
  2611. if ((old_state_flags & AWS_MQTT_OSS_UNACKED) != (new_state_flags & AWS_MQTT_OSS_UNACKED)) {
  2612. if ((new_state_flags & AWS_MQTT_OSS_UNACKED) != 0) {
  2613. aws_atomic_fetch_add(&stats->unacked_operation_count_atomic, 1);
  2614. aws_atomic_fetch_add(&stats->unacked_operation_size_atomic, (size_t)packet_size);
  2615. } else {
  2616. aws_atomic_fetch_sub(&stats->unacked_operation_count_atomic, 1);
  2617. aws_atomic_fetch_sub(&stats->unacked_operation_size_atomic, (size_t)packet_size);
  2618. }
  2619. }
  2620. request->statistic_state_flags = new_state_flags;
  2621. // If the callback is defined, then call it
  2622. if (connection && connection->on_any_operation_statistics && connection->on_any_operation_statistics_ud) {
  2623. (*connection->on_any_operation_statistics)(connection, connection->on_any_operation_statistics_ud);
  2624. }
  2625. }
  2626. int aws_mqtt_client_connection_get_stats(
  2627. struct aws_mqtt_client_connection *connection,
  2628. struct aws_mqtt_connection_operation_statistics *stats) {
  2629. // Error checking
  2630. if (!connection) {
  2631. AWS_LOGF_ERROR(AWS_LS_MQTT_CLIENT, "Invalid MQTT311 connection used when trying to get operation statistics");
  2632. return aws_raise_error(AWS_ERROR_INVALID_ARGUMENT);
  2633. }
  2634. if (!stats) {
  2635. AWS_LOGF_ERROR(
  2636. AWS_LS_MQTT_CLIENT,
  2637. "id=%p: Invalid MQTT311 connection statistics struct used when trying to get operation statistics",
  2638. (void *)connection);
  2639. return aws_raise_error(AWS_ERROR_INVALID_ARGUMENT);
  2640. }
  2641. stats->incomplete_operation_count =
  2642. (uint64_t)aws_atomic_load_int(&connection->operation_statistics_impl.incomplete_operation_count_atomic);
  2643. stats->incomplete_operation_size =
  2644. (uint64_t)aws_atomic_load_int(&connection->operation_statistics_impl.incomplete_operation_size_atomic);
  2645. stats->unacked_operation_count =
  2646. (uint64_t)aws_atomic_load_int(&connection->operation_statistics_impl.unacked_operation_count_atomic);
  2647. stats->unacked_operation_size =
  2648. (uint64_t)aws_atomic_load_int(&connection->operation_statistics_impl.unacked_operation_size_atomic);
  2649. return AWS_OP_SUCCESS;
  2650. }
  2651. int aws_mqtt_client_connection_set_on_operation_statistics_handler(
  2652. struct aws_mqtt_client_connection *connection,
  2653. aws_mqtt_on_operation_statistics_fn *on_operation_statistics,
  2654. void *on_operation_statistics_ud) {
  2655. AWS_LOGF_TRACE(AWS_LS_MQTT_CLIENT, "id=%p: Setting on_operation_statistics handler", (void *)connection);
  2656. connection->on_any_operation_statistics = on_operation_statistics;
  2657. connection->on_any_operation_statistics_ud = on_operation_statistics_ud;
  2658. return AWS_OP_SUCCESS;
  2659. }