Mqtt5Client.cpp 32 KB


  1. /**
  2. * Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved.
  3. * SPDX-License-Identifier: Apache-2.0.
  4. */
  5. #include <aws/crt/mqtt/Mqtt5Client.h>
  6. #include <aws/crt/mqtt/Mqtt5Packets.h>
  7. #include <aws/crt/Api.h>
  8. #include <aws/crt/StlAllocator.h>
  9. #include <aws/crt/http/HttpProxyStrategy.h>
  10. #include <aws/crt/http/HttpRequestResponse.h>
  11. #include <aws/crt/io/Bootstrap.h>
  12. #include <aws/iot/MqttClient.h>
  13. #include <utility>
  14. namespace Aws
  15. {
  16. namespace Crt
  17. {
  18. namespace Mqtt5
  19. {
  20. struct PubAckCallbackData : public std::enable_shared_from_this<PubAckCallbackData>
  21. {
  22. PubAckCallbackData(Allocator *alloc = ApiAllocator()) : client(nullptr), allocator(alloc) {}
  23. std::shared_ptr<Mqtt5Client> client;
  24. OnPublishCompletionHandler onPublishCompletion;
  25. Allocator *allocator;
  26. };
  27. struct SubAckCallbackData
  28. {
  29. SubAckCallbackData(Allocator *alloc = ApiAllocator()) : client(nullptr), allocator(alloc) {}
  30. std::shared_ptr<Mqtt5Client> client;
  31. OnSubscribeCompletionHandler onSubscribeCompletion;
  32. Allocator *allocator;
  33. };
  34. struct UnSubAckCallbackData
  35. {
  36. UnSubAckCallbackData(Allocator *alloc = ApiAllocator()) : client(nullptr), allocator(alloc) {}
  37. std::shared_ptr<Mqtt5Client> client;
  38. OnUnsubscribeCompletionHandler onUnsubscribeCompletion;
  39. Allocator *allocator;
  40. };
  41. void Mqtt5Client::s_lifeCycleEventCallback(const struct aws_mqtt5_client_lifecycle_event *event)
  42. {
  43. Mqtt5Client *client = reinterpret_cast<Mqtt5Client *>(event->user_data);
  44. switch (event->event_type)
  45. {
  46. case AWS_MQTT5_CLET_STOPPED:
  47. AWS_LOGF_INFO(AWS_LS_MQTT5_CLIENT, "Lifecycle event: Client Stopped!");
  48. if (client->onStopped)
  49. {
  50. OnStoppedEventData eventData;
  51. client->onStopped(*client, eventData);
  52. }
  53. break;
  54. case AWS_MQTT5_CLET_ATTEMPTING_CONNECT:
  55. AWS_LOGF_INFO(AWS_LS_MQTT5_CLIENT, "Lifecycle event: Attempting Connect!");
  56. if (client->onAttemptingConnect)
  57. {
  58. OnAttemptingConnectEventData eventData;
  59. client->onAttemptingConnect(*client, eventData);
  60. }
  61. break;
  62. case AWS_MQTT5_CLET_CONNECTION_FAILURE:
  63. AWS_LOGF_INFO(AWS_LS_MQTT5_CLIENT, "Lifecycle event: Connection Failure!");
  64. AWS_LOGF_INFO(
  65. AWS_LS_MQTT5_CLIENT,
  66. " Error Code: %d(%s)",
  67. event->error_code,
  68. aws_error_debug_str(event->error_code));
  69. if (client->onConnectionFailure)
  70. {
  71. OnConnectionFailureEventData eventData;
  72. eventData.errorCode = event->error_code;
  73. std::shared_ptr<ConnAckPacket> packet = nullptr;
  74. if (event->connack_data != NULL)
  75. {
  76. packet = Aws::Crt::MakeShared<ConnAckPacket>(
  77. client->m_allocator, *event->connack_data, client->m_allocator);
  78. eventData.connAckPacket = packet;
  79. }
  80. client->onConnectionFailure(*client, eventData);
  81. }
  82. break;
  83. case AWS_MQTT5_CLET_CONNECTION_SUCCESS:
  84. AWS_LOGF_INFO(AWS_LS_MQTT5_CLIENT, "Lifecycle event: Connection Success!");
  85. if (client->onConnectionSuccess)
  86. {
  87. OnConnectionSuccessEventData eventData;
  88. std::shared_ptr<ConnAckPacket> packet = nullptr;
  89. if (event->connack_data != NULL)
  90. {
  91. packet = Aws::Crt::MakeShared<ConnAckPacket>(ApiAllocator(), *event->connack_data);
  92. }
  93. std::shared_ptr<NegotiatedSettings> neg_settings = nullptr;
  94. if (event->settings != NULL)
  95. {
  96. neg_settings =
  97. Aws::Crt::MakeShared<NegotiatedSettings>(ApiAllocator(), *event->settings);
  98. }
  99. eventData.connAckPacket = packet;
  100. eventData.negotiatedSettings = neg_settings;
  101. client->onConnectionSuccess(*client, eventData);
  102. }
  103. break;
  104. case AWS_MQTT5_CLET_DISCONNECTION:
  105. AWS_LOGF_INFO(
  106. AWS_LS_MQTT5_CLIENT,
  107. " Error Code: %d(%s)",
  108. event->error_code,
  109. aws_error_debug_str(event->error_code));
  110. if (client->onDisconnection)
  111. {
  112. OnDisconnectionEventData eventData;
  113. std::shared_ptr<DisconnectPacket> disconnection = nullptr;
  114. if (event->disconnect_data != nullptr)
  115. {
  116. disconnection = Aws::Crt::MakeShared<DisconnectPacket>(
  117. client->m_allocator, *event->disconnect_data, client->m_allocator);
  118. }
  119. eventData.errorCode = event->error_code;
  120. eventData.disconnectPacket = disconnection;
  121. client->onDisconnection(*client, eventData);
  122. }
  123. break;
  124. }
  125. }
  126. void Mqtt5Client::s_publishReceivedCallback(
  127. const struct aws_mqtt5_packet_publish_view *publish,
  128. void *user_data)
  129. {
  130. AWS_LOGF_INFO(AWS_LS_MQTT5_CLIENT, "on publish recieved callback");
  131. Mqtt5Client *client = reinterpret_cast<Mqtt5Client *>(user_data);
  132. if (client != nullptr && client->onPublishReceived != nullptr)
  133. {
  134. if (publish != NULL)
  135. {
  136. std::shared_ptr<PublishPacket> packet =
  137. std::make_shared<PublishPacket>(*publish, client->m_allocator);
  138. PublishReceivedEventData eventData;
  139. eventData.publishPacket = packet;
  140. client->onPublishReceived(*client, eventData);
  141. }
  142. else
  143. {
  144. AWS_LOGF_ERROR(AWS_LS_MQTT5_CLIENT, "Failed to access Publish packet view.");
  145. }
  146. }
  147. }
  148. void Mqtt5Client::s_publishCompletionCallback(
  149. enum aws_mqtt5_packet_type packet_type,
  150. const void *publshCompletionPacket,
  151. int error_code,
  152. void *complete_ctx)
  153. {
  154. AWS_LOGF_INFO(AWS_LS_MQTT5_CLIENT, "Publish completion callback triggered.");
  155. auto callbackData = reinterpret_cast<PubAckCallbackData *>(complete_ctx);
  156. if (callbackData)
  157. {
  158. std::shared_ptr<PublishResult> publish = nullptr;
  159. switch (packet_type)
  160. {
  161. case aws_mqtt5_packet_type::AWS_MQTT5_PT_PUBACK:
  162. {
  163. if (publshCompletionPacket != NULL)
  164. {
  165. std::shared_ptr<PubAckPacket> packet = std::make_shared<PubAckPacket>(
  166. *(aws_mqtt5_packet_puback_view *)publshCompletionPacket, callbackData->allocator);
  167. publish = std::make_shared<PublishResult>(std::move(packet));
  168. }
  169. else // This should never happened.
  170. {
  171. AWS_LOGF_INFO(AWS_LS_MQTT5_CLIENT, "The PubAck Packet is invalid.");
  172. publish = std::make_shared<PublishResult>(AWS_ERROR_INVALID_ARGUMENT);
  173. }
  174. break;
  175. }
  176. case aws_mqtt5_packet_type::AWS_MQTT5_PT_NONE:
  177. {
  178. publish = std::make_shared<PublishResult>(error_code);
  179. break;
  180. }
  181. default: // Invalid packet type
  182. {
  183. AWS_LOGF_INFO(AWS_LS_MQTT5_CLIENT, "Invalid Packet Type.");
  184. publish = std::make_shared<PublishResult>(AWS_ERROR_INVALID_ARGUMENT);
  185. break;
  186. }
  187. }
  188. if (callbackData->onPublishCompletion != NULL)
  189. {
  190. callbackData->onPublishCompletion(callbackData->client, error_code, publish);
  191. }
  192. Crt::Delete(callbackData, callbackData->allocator);
  193. }
  194. }
  195. void Mqtt5Client::s_onWebsocketHandshake(
  196. struct aws_http_message *rawRequest,
  197. void *user_data,
  198. aws_mqtt5_transform_websocket_handshake_complete_fn *complete_fn,
  199. void *complete_ctx)
  200. {
  201. auto client = reinterpret_cast<Mqtt5Client *>(user_data);
  202. Allocator *allocator = client->m_allocator;
  203. // we have to do this because of private constructors.
  204. auto toSeat =
  205. reinterpret_cast<Http::HttpRequest *>(aws_mem_acquire(allocator, sizeof(Http::HttpRequest)));
  206. toSeat = new (toSeat) Http::HttpRequest(allocator, rawRequest);
  207. std::shared_ptr<Http::HttpRequest> request = std::shared_ptr<Http::HttpRequest>(
  208. toSeat, [allocator](Http::HttpRequest *ptr) { Crt::Delete(ptr, allocator); });
  209. auto onInterceptComplete =
  210. [complete_fn,
  211. complete_ctx](const std::shared_ptr<Http::HttpRequest> &transformedRequest, int errorCode) {
  212. complete_fn(transformedRequest->GetUnderlyingMessage(), errorCode, complete_ctx);
  213. };
  214. client->websocketInterceptor(request, onInterceptComplete);
  215. }
  216. void Mqtt5Client::s_clientTerminationCompletion(void *complete_ctx)
  217. {
  218. Mqtt5Client *client = reinterpret_cast<Mqtt5Client *>(complete_ctx);
  219. std::unique_lock<std::mutex> lock(client->m_terminationMutex);
  220. client->m_terminationPredicate = true;
  221. client->m_terminationCondition.notify_all();
  222. }
  223. void Mqtt5Client::s_subscribeCompletionCallback(
  224. const aws_mqtt5_packet_suback_view *suback,
  225. int error_code,
  226. void *complete_ctx)
  227. {
  228. SubAckCallbackData *callbackData = reinterpret_cast<SubAckCallbackData *>(complete_ctx);
  229. AWS_ASSERT(callbackData != nullptr);
  230. std::shared_ptr<SubAckPacket> packet = nullptr;
  231. if (suback != nullptr)
  232. {
  233. packet = std::make_shared<SubAckPacket>(*suback, callbackData->allocator);
  234. }
  235. if (error_code != 0)
  236. {
  237. AWS_LOGF_INFO(
  238. AWS_LS_MQTT5_CLIENT,
  239. "SubscribeCompletion Failed with Error Code: %d(%s)",
  240. error_code,
  241. aws_error_debug_str(error_code));
  242. }
  243. if (callbackData->onSubscribeCompletion)
  244. {
  245. callbackData->onSubscribeCompletion(callbackData->client, error_code, packet);
  246. }
  247. Crt::Delete(callbackData, callbackData->allocator);
  248. }
  249. void Mqtt5Client::s_unsubscribeCompletionCallback(
  250. const aws_mqtt5_packet_unsuback_view *unsuback,
  251. int error_code,
  252. void *complete_ctx)
  253. {
  254. UnSubAckCallbackData *callbackData = reinterpret_cast<UnSubAckCallbackData *>(complete_ctx);
  255. AWS_ASSERT(callbackData != nullptr);
  256. std::shared_ptr<UnSubAckPacket> packet = nullptr;
  257. if (unsuback != nullptr)
  258. {
  259. packet = std::make_shared<UnSubAckPacket>(*unsuback, callbackData->allocator);
  260. }
  261. if (error_code != 0)
  262. {
  263. AWS_LOGF_INFO(
  264. AWS_LS_MQTT5_CLIENT,
  265. "UnsubscribeCompletion Failed with Error Code: %d(%s)",
  266. error_code,
  267. aws_error_debug_str(error_code));
  268. }
  269. if (callbackData->onUnsubscribeCompletion != NULL)
  270. {
  271. callbackData->onUnsubscribeCompletion(callbackData->client, error_code, packet);
  272. }
  273. Crt::Delete(callbackData, callbackData->allocator);
  274. }
  275. Mqtt5Client::Mqtt5Client(const Mqtt5ClientOptions &options, Allocator *allocator) noexcept
  276. : m_client(nullptr), m_allocator(allocator)
  277. {
  278. aws_mqtt5_client_options clientOptions;
  279. options.initializeRawOptions(clientOptions);
  280. /* Setup Callbacks */
  281. if (options.websocketHandshakeTransform)
  282. {
  283. this->websocketInterceptor = options.websocketHandshakeTransform;
  284. clientOptions.websocket_handshake_transform = &Mqtt5Client::s_onWebsocketHandshake;
  285. clientOptions.websocket_handshake_transform_user_data = this;
  286. }
  287. if (options.onConnectionFailure)
  288. {
  289. this->onConnectionFailure = options.onConnectionFailure;
  290. }
  291. if (options.onConnectionSuccess)
  292. {
  293. this->onConnectionSuccess = options.onConnectionSuccess;
  294. }
  295. if (options.onDisconnection)
  296. {
  297. this->onDisconnection = options.onDisconnection;
  298. }
  299. if (options.onPublishReceived)
  300. {
  301. this->onPublishReceived = options.onPublishReceived;
  302. }
  303. if (options.onStopped)
  304. {
  305. this->onStopped = options.onStopped;
  306. }
  307. if (options.onAttemptingConnect)
  308. {
  309. this->onAttemptingConnect = options.onAttemptingConnect;
  310. }
  311. clientOptions.publish_received_handler_user_data = this;
  312. clientOptions.publish_received_handler = &Mqtt5Client::s_publishReceivedCallback;
  313. clientOptions.lifecycle_event_handler = &Mqtt5Client::s_lifeCycleEventCallback;
  314. clientOptions.lifecycle_event_handler_user_data = this;
  315. clientOptions.client_termination_handler = &Mqtt5Client::s_clientTerminationCompletion;
  316. clientOptions.client_termination_handler_user_data = this;
  317. m_client = aws_mqtt5_client_new(allocator, &clientOptions);
  318. }
  319. Mqtt5Client::~Mqtt5Client()
  320. {
  321. if (m_client != nullptr)
  322. {
  323. aws_mqtt5_client_release(m_client);
  324. std::unique_lock<std::mutex> lock(m_terminationMutex);
  325. m_terminationCondition.wait(lock, [this] { return m_terminationPredicate == true; });
  326. m_client = nullptr;
  327. }
  328. }
  329. std::shared_ptr<Mqtt5Client> Mqtt5Client::NewMqtt5Client(
  330. const Mqtt5ClientOptions &options,
  331. Allocator *allocator) noexcept
  332. {
  333. /* Copied from MqttClient.cpp:ln754 */
  334. // As the constructor is private, make share would not work here. We do make_share manually.
  335. Mqtt5Client *toSeat = reinterpret_cast<Mqtt5Client *>(aws_mem_acquire(allocator, sizeof(Mqtt5Client)));
  336. if (!toSeat)
  337. {
  338. return nullptr;
  339. }
  340. toSeat = new (toSeat) Mqtt5Client(options, allocator);
  341. return std::shared_ptr<Mqtt5Client>(
  342. toSeat, [allocator](Mqtt5Client *client) { Crt::Delete(client, allocator); });
  343. }
  344. Mqtt5Client::operator bool() const noexcept { return m_client != nullptr; }
  345. int Mqtt5Client::LastError() const noexcept { return aws_last_error(); }
  346. bool Mqtt5Client::Start() const noexcept { return aws_mqtt5_client_start(m_client) == AWS_OP_SUCCESS; }
  347. bool Mqtt5Client::Stop() noexcept { return aws_mqtt5_client_stop(m_client, NULL, NULL) == AWS_OP_SUCCESS; }
  348. bool Mqtt5Client::Stop(std::shared_ptr<DisconnectPacket> disconnectOptions) noexcept
  349. {
  350. if (disconnectOptions == nullptr)
  351. {
  352. return Stop();
  353. }
  354. aws_mqtt5_packet_disconnect_view disconnect_packet;
  355. AWS_ZERO_STRUCT(disconnect_packet);
  356. if (disconnectOptions->initializeRawOptions(disconnect_packet) == false)
  357. {
  358. return false;
  359. }
  360. return aws_mqtt5_client_stop(m_client, &disconnect_packet, NULL) == AWS_OP_SUCCESS;
  361. }
  362. bool Mqtt5Client::Publish(
  363. std::shared_ptr<PublishPacket> publishOptions,
  364. OnPublishCompletionHandler onPublishCmpletionCallback) noexcept
  365. {
  366. if (publishOptions == nullptr)
  367. {
  368. return false;
  369. }
  370. aws_mqtt5_packet_publish_view publish;
  371. publishOptions->initializeRawOptions(publish);
  372. PubAckCallbackData *pubCallbackData = Aws::Crt::New<PubAckCallbackData>(m_allocator);
  373. pubCallbackData->client = this->getptr();
  374. pubCallbackData->allocator = m_allocator;
  375. pubCallbackData->onPublishCompletion = onPublishCmpletionCallback;
  376. aws_mqtt5_publish_completion_options options;
  377. options.completion_callback = Mqtt5Client::s_publishCompletionCallback;
  378. options.completion_user_data = pubCallbackData;
  379. int result = aws_mqtt5_client_publish(m_client, &publish, &options);
  380. if (result != AWS_OP_SUCCESS)
  381. {
  382. Crt::Delete(pubCallbackData, pubCallbackData->allocator);
  383. return false;
  384. }
  385. return true;
  386. }
  387. bool Mqtt5Client::Subscribe(
  388. std::shared_ptr<SubscribePacket> subscribeOptions,
  389. OnSubscribeCompletionHandler onSubscribeCompletionCallback) noexcept
  390. {
  391. if (subscribeOptions == nullptr)
  392. {
  393. return false;
  394. }
  395. /* Setup packet_subscribe */
  396. aws_mqtt5_packet_subscribe_view subscribe;
  397. subscribeOptions->initializeRawOptions(subscribe);
  398. /* Setup subscription Completion callback*/
  399. SubAckCallbackData *subCallbackData = Aws::Crt::New<SubAckCallbackData>(m_allocator);
  400. subCallbackData->client = this->getptr();
  401. subCallbackData->allocator = m_allocator;
  402. subCallbackData->onSubscribeCompletion = onSubscribeCompletionCallback;
  403. aws_mqtt5_subscribe_completion_options options;
  404. options.completion_callback = Mqtt5Client::s_subscribeCompletionCallback;
  405. options.completion_user_data = subCallbackData;
  406. /* Subscribe to topic */
  407. int result = aws_mqtt5_client_subscribe(m_client, &subscribe, &options);
  408. if (result != AWS_OP_SUCCESS)
  409. {
  410. Crt::Delete(subCallbackData, subCallbackData->allocator);
  411. return false;
  412. }
  413. return result == AWS_OP_SUCCESS;
  414. }
  415. bool Mqtt5Client::Unsubscribe(
  416. std::shared_ptr<UnsubscribePacket> unsubscribeOptions,
  417. OnUnsubscribeCompletionHandler onUnsubscribeCompletionCallback) noexcept
  418. {
  419. if (unsubscribeOptions == nullptr)
  420. {
  421. return false;
  422. }
  423. aws_mqtt5_packet_unsubscribe_view unsubscribe;
  424. unsubscribeOptions->initializeRawOptions(unsubscribe);
  425. UnSubAckCallbackData *unSubCallbackData = Aws::Crt::New<UnSubAckCallbackData>(m_allocator);
  426. unSubCallbackData->client = this->getptr();
  427. unSubCallbackData->allocator = m_allocator;
  428. unSubCallbackData->onUnsubscribeCompletion = onUnsubscribeCompletionCallback;
  429. aws_mqtt5_unsubscribe_completion_options options;
  430. options.completion_callback = Mqtt5Client::s_unsubscribeCompletionCallback;
  431. options.completion_user_data = unSubCallbackData;
  432. int result = aws_mqtt5_client_unsubscribe(m_client, &unsubscribe, &options);
  433. if (result != AWS_OP_SUCCESS)
  434. {
  435. Crt::Delete(unSubCallbackData, unSubCallbackData->allocator);
  436. return false;
  437. }
  438. return result == AWS_OP_SUCCESS;
  439. }
  440. const Mqtt5ClientOperationStatistics &Mqtt5Client::GetOperationStatistics() noexcept
  441. {
  442. aws_mqtt5_client_operation_statistics m_operationStatisticsNative = {0, 0, 0, 0};
  443. if (m_client != nullptr)
  444. {
  445. aws_mqtt5_client_get_stats(m_client, &m_operationStatisticsNative);
  446. m_operationStatistics.incompleteOperationCount =
  447. m_operationStatisticsNative.incomplete_operation_count;
  448. m_operationStatistics.incompleteOperationSize =
  449. m_operationStatisticsNative.incomplete_operation_size;
  450. m_operationStatistics.unackedOperationCount = m_operationStatisticsNative.unacked_operation_count;
  451. m_operationStatistics.unackedOperationSize = m_operationStatisticsNative.unacked_operation_size;
  452. }
  453. return m_operationStatistics;
  454. }
  455. /*****************************************************
  456. *
  457. * Mqtt5ClientOptions
  458. *
  459. *****************************************************/
  460. /**
  461. * Mqtt5ClientOptions
  462. */
  463. Mqtt5ClientOptions::Mqtt5ClientOptions(Crt::Allocator *allocator) noexcept
  464. : m_bootstrap(nullptr), m_sessionBehavior(ClientSessionBehaviorType::AWS_MQTT5_CSBT_DEFAULT),
  465. m_extendedValidationAndFlowControlOptions(AWS_MQTT5_EVAFCO_AWS_IOT_CORE_DEFAULTS),
  466. m_offlineQueueBehavior(AWS_MQTT5_COQBT_DEFAULT),
  467. m_reconnectionOptions({AWS_EXPONENTIAL_BACKOFF_JITTER_DEFAULT, 0, 0, 0}), m_pingTimeoutMs(0),
  468. m_connackTimeoutMs(0), m_ackTimeoutSec(0), m_allocator(allocator)
  469. {
  470. m_socketOptions.SetSocketType(Io::SocketType::Stream);
  471. AWS_ZERO_STRUCT(m_packetConnectViewStorage);
  472. AWS_ZERO_STRUCT(m_httpProxyOptionsStorage);
  473. }
  474. bool Mqtt5ClientOptions::initializeRawOptions(aws_mqtt5_client_options &raw_options) const noexcept
  475. {
  476. AWS_ZERO_STRUCT(raw_options);
  477. raw_options.host_name = ByteCursorFromString(m_hostName);
  478. raw_options.port = m_port;
  479. if (m_bootstrap == nullptr)
  480. {
  481. raw_options.bootstrap = ApiHandle::GetOrCreateStaticDefaultClientBootstrap()->GetUnderlyingHandle();
  482. }
  483. else
  484. {
  485. raw_options.bootstrap = m_bootstrap->GetUnderlyingHandle();
  486. }
  487. raw_options.socket_options = &m_socketOptions.GetImpl();
  488. if (m_tlsConnectionOptions.has_value())
  489. {
  490. raw_options.tls_options = m_tlsConnectionOptions.value().GetUnderlyingHandle();
  491. }
  492. if (m_proxyOptions.has_value())
  493. {
  494. raw_options.http_proxy_options = &m_httpProxyOptionsStorage;
  495. }
  496. raw_options.connect_options = &m_packetConnectViewStorage;
  497. raw_options.session_behavior = m_sessionBehavior;
  498. raw_options.extended_validation_and_flow_control_options = m_extendedValidationAndFlowControlOptions;
  499. raw_options.offline_queue_behavior = m_offlineQueueBehavior;
  500. raw_options.retry_jitter_mode = m_reconnectionOptions.m_reconnectMode;
  501. raw_options.max_reconnect_delay_ms = m_reconnectionOptions.m_maxReconnectDelayMs;
  502. raw_options.min_reconnect_delay_ms = m_reconnectionOptions.m_minReconnectDelayMs;
  503. raw_options.min_connected_time_to_reset_reconnect_delay_ms =
  504. m_reconnectionOptions.m_minConnectedTimeToResetReconnectDelayMs;
  505. raw_options.ping_timeout_ms = m_pingTimeoutMs;
  506. raw_options.connack_timeout_ms = m_connackTimeoutMs;
  507. raw_options.ack_timeout_seconds = m_ackTimeoutSec;
  508. return true;
  509. }
  510. Mqtt5ClientOptions::~Mqtt5ClientOptions() {}
  511. Mqtt5ClientOptions &Mqtt5ClientOptions::withHostName(Crt::String hostname)
  512. {
  513. m_hostName = std::move(hostname);
  514. return *this;
  515. }
  516. Mqtt5ClientOptions &Mqtt5ClientOptions::withPort(uint16_t port) noexcept
  517. {
  518. m_port = port;
  519. return *this;
  520. }
  521. Mqtt5ClientOptions &Mqtt5ClientOptions::withBootstrap(Io::ClientBootstrap *bootStrap) noexcept
  522. {
  523. m_bootstrap = bootStrap;
  524. return *this;
  525. }
  526. Mqtt5ClientOptions &Mqtt5ClientOptions::withSocketOptions(Io::SocketOptions socketOptions) noexcept
  527. {
  528. m_socketOptions = std::move(socketOptions);
  529. return *this;
  530. }
  531. Mqtt5ClientOptions &Mqtt5ClientOptions::withTlsConnectionOptions(
  532. const Io::TlsConnectionOptions &tslOptions) noexcept
  533. {
  534. m_tlsConnectionOptions = tslOptions;
  535. return *this;
  536. }
  537. Mqtt5ClientOptions &Mqtt5ClientOptions::withHttpProxyOptions(
  538. const Crt::Http::HttpClientConnectionProxyOptions &proxyOptions) noexcept
  539. {
  540. m_proxyOptions = proxyOptions;
  541. m_proxyOptions->InitializeRawProxyOptions(m_httpProxyOptionsStorage);
  542. return *this;
  543. }
  544. Mqtt5ClientOptions &Mqtt5ClientOptions::withConnectOptions(
  545. std::shared_ptr<ConnectPacket> packetConnect) noexcept
  546. {
  547. m_connectOptions = packetConnect;
  548. m_connectOptions->initializeRawOptions(m_packetConnectViewStorage, m_allocator);
  549. return *this;
  550. }
  551. Mqtt5ClientOptions &Mqtt5ClientOptions::withSessionBehavior(
  552. ClientSessionBehaviorType sessionBehavior) noexcept
  553. {
  554. m_sessionBehavior = sessionBehavior;
  555. return *this;
  556. }
  557. Mqtt5ClientOptions &Mqtt5ClientOptions::withClientExtendedValidationAndFlowControl(
  558. ClientExtendedValidationAndFlowControl clientExtendedValidationAndFlowControl) noexcept
  559. {
  560. m_extendedValidationAndFlowControlOptions = clientExtendedValidationAndFlowControl;
  561. return *this;
  562. }
  563. Mqtt5ClientOptions &Mqtt5ClientOptions::withOfflineQueueBehavior(
  564. ClientOperationQueueBehaviorType offlineQueueBehavior) noexcept
  565. {
  566. m_offlineQueueBehavior = offlineQueueBehavior;
  567. return *this;
  568. }
  569. Mqtt5ClientOptions &Mqtt5ClientOptions::withReconnectOptions(ReconnectOptions reconnectOptions) noexcept
  570. {
  571. m_reconnectionOptions = reconnectOptions;
  572. return *this;
  573. }
  574. Mqtt5ClientOptions &Mqtt5ClientOptions::withPingTimeoutMs(uint32_t pingTimeoutMs) noexcept
  575. {
  576. m_pingTimeoutMs = pingTimeoutMs;
  577. return *this;
  578. }
  579. Mqtt5ClientOptions &Mqtt5ClientOptions::withConnackTimeoutMs(uint32_t connackTimeoutMs) noexcept
  580. {
  581. m_connackTimeoutMs = connackTimeoutMs;
  582. return *this;
  583. }
  584. Mqtt5ClientOptions &Mqtt5ClientOptions::withAckTimeoutSeconds(uint32_t ackTimeoutSeconds) noexcept
  585. {
  586. m_ackTimeoutSec = ackTimeoutSeconds;
  587. return *this;
  588. }
  589. Mqtt5ClientOptions &Mqtt5ClientOptions::withWebsocketHandshakeTransformCallback(
  590. OnWebSocketHandshakeIntercept callback) noexcept
  591. {
  592. websocketHandshakeTransform = std::move(callback);
  593. return *this;
  594. }
  595. Mqtt5ClientOptions &Mqtt5ClientOptions::withClientConnectionSuccessCallback(
  596. OnConnectionSuccessHandler callback) noexcept
  597. {
  598. onConnectionSuccess = std::move(callback);
  599. return *this;
  600. }
  601. Mqtt5ClientOptions &Mqtt5ClientOptions::withClientConnectionFailureCallback(
  602. OnConnectionFailureHandler callback) noexcept
  603. {
  604. onConnectionFailure = std::move(callback);
  605. return *this;
  606. }
  607. Mqtt5ClientOptions &Mqtt5ClientOptions::withClientDisconnectionCallback(
  608. OnDisconnectionHandler callback) noexcept
  609. {
  610. onDisconnection = std::move(callback);
  611. return *this;
  612. }
  613. Mqtt5ClientOptions &Mqtt5ClientOptions::withClientStoppedCallback(OnStoppedHandler callback) noexcept
  614. {
  615. onStopped = std::move(callback);
  616. return *this;
  617. }
  618. Mqtt5ClientOptions &Mqtt5ClientOptions::withClientAttemptingConnectCallback(
  619. OnAttemptingConnectHandler callback) noexcept
  620. {
  621. onAttemptingConnect = std::move(callback);
  622. return *this;
  623. }
  624. Mqtt5ClientOptions &Mqtt5ClientOptions::withPublishReceivedCallback(
  625. OnPublishReceivedHandler callback) noexcept
  626. {
  627. onPublishReceived = std::move(callback);
  628. return *this;
  629. }
  630. } // namespace Mqtt5
  631. } // namespace Crt
  632. } // namespace Aws