MqttClient.cpp 33 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737738739740741742743744745746747748749750751752753754755756757758759760761762763764765766767768769770771772773774775776777778779780781782783784785786787788789790791792793794795796797798799800801802803804805806807808809810811812813814815816
  1. /**
  2. * Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved.
  3. * SPDX-License-Identifier: Apache-2.0.
  4. */
  5. #include <aws/crt/mqtt/MqttClient.h>
  6. #include <aws/crt/Api.h>
  7. #include <aws/crt/StlAllocator.h>
  8. #include <aws/crt/http/HttpProxyStrategy.h>
  9. #include <aws/crt/http/HttpRequestResponse.h>
  10. #include <aws/crt/io/Bootstrap.h>
  11. #include <utility>
  12. #define AWS_MQTT_MAX_TOPIC_LENGTH 65535
  13. namespace Aws
  14. {
  15. namespace Crt
  16. {
  17. namespace Mqtt
  18. {
  19. void MqttConnection::s_onConnectionInterrupted(aws_mqtt_client_connection *, int errorCode, void *userData)
  20. {
  21. auto connWrapper = reinterpret_cast<MqttConnection *>(userData);
  22. if (connWrapper->OnConnectionInterrupted)
  23. {
  24. connWrapper->OnConnectionInterrupted(*connWrapper, errorCode);
  25. }
  26. }
  27. void MqttConnection::s_onConnectionResumed(
  28. aws_mqtt_client_connection *,
  29. ReturnCode returnCode,
  30. bool sessionPresent,
  31. void *userData)
  32. {
  33. auto connWrapper = reinterpret_cast<MqttConnection *>(userData);
  34. if (connWrapper->OnConnectionResumed)
  35. {
  36. connWrapper->OnConnectionResumed(*connWrapper, returnCode, sessionPresent);
  37. }
  38. }
  39. void MqttConnection::s_onConnectionCompleted(
  40. aws_mqtt_client_connection *,
  41. int errorCode,
  42. enum aws_mqtt_connect_return_code returnCode,
  43. bool sessionPresent,
  44. void *userData)
  45. {
  46. auto connWrapper = reinterpret_cast<MqttConnection *>(userData);
  47. if (connWrapper->OnConnectionCompleted)
  48. {
  49. connWrapper->OnConnectionCompleted(*connWrapper, errorCode, returnCode, sessionPresent);
  50. }
  51. }
  52. void MqttConnection::s_onDisconnect(aws_mqtt_client_connection *, void *userData)
  53. {
  54. auto connWrapper = reinterpret_cast<MqttConnection *>(userData);
  55. if (connWrapper->OnDisconnect)
  56. {
  57. connWrapper->OnDisconnect(*connWrapper);
  58. }
  59. }
  60. struct PubCallbackData
  61. {
  62. PubCallbackData() : connection(nullptr), allocator(nullptr) {}
  63. MqttConnection *connection;
  64. OnMessageReceivedHandler onMessageReceived;
  65. Allocator *allocator;
  66. };
  67. static void s_cleanUpOnPublishData(void *userData)
  68. {
  69. auto callbackData = reinterpret_cast<PubCallbackData *>(userData);
  70. Crt::Delete(callbackData, callbackData->allocator);
  71. }
  72. void MqttConnection::s_onPublish(
  73. aws_mqtt_client_connection *,
  74. const aws_byte_cursor *topic,
  75. const aws_byte_cursor *payload,
  76. bool dup,
  77. enum aws_mqtt_qos qos,
  78. bool retain,
  79. void *userData)
  80. {
  81. auto callbackData = reinterpret_cast<PubCallbackData *>(userData);
  82. if (callbackData->onMessageReceived)
  83. {
  84. String topicStr(reinterpret_cast<char *>(topic->ptr), topic->len);
  85. ByteBuf payloadBuf = aws_byte_buf_from_array(payload->ptr, payload->len);
  86. callbackData->onMessageReceived(
  87. *(callbackData->connection), topicStr, payloadBuf, dup, qos, retain);
  88. }
  89. }
  90. struct OpCompleteCallbackData
  91. {
  92. OpCompleteCallbackData() : connection(nullptr), topic(nullptr), allocator(nullptr) {}
  93. MqttConnection *connection;
  94. OnOperationCompleteHandler onOperationComplete;
  95. const char *topic;
  96. Allocator *allocator;
  97. };
  98. void MqttConnection::s_onOpComplete(
  99. aws_mqtt_client_connection *,
  100. uint16_t packetId,
  101. int errorCode,
  102. void *userData)
  103. {
  104. auto callbackData = reinterpret_cast<OpCompleteCallbackData *>(userData);
  105. if (callbackData->onOperationComplete)
  106. {
  107. callbackData->onOperationComplete(*callbackData->connection, packetId, errorCode);
  108. }
  109. if (callbackData->topic)
  110. {
  111. aws_mem_release(
  112. callbackData->allocator, reinterpret_cast<void *>(const_cast<char *>(callbackData->topic)));
  113. }
  114. Crt::Delete(callbackData, callbackData->allocator);
  115. }
  116. struct SubAckCallbackData
  117. {
  118. SubAckCallbackData() : connection(nullptr), topic(nullptr), allocator(nullptr) {}
  119. MqttConnection *connection;
  120. OnSubAckHandler onSubAck;
  121. const char *topic;
  122. Allocator *allocator;
  123. };
  124. void MqttConnection::s_onSubAck(
  125. aws_mqtt_client_connection *,
  126. uint16_t packetId,
  127. const struct aws_byte_cursor *topic,
  128. enum aws_mqtt_qos qos,
  129. int errorCode,
  130. void *userData)
  131. {
  132. auto callbackData = reinterpret_cast<SubAckCallbackData *>(userData);
  133. if (callbackData->onSubAck)
  134. {
  135. String topicStr(reinterpret_cast<char *>(topic->ptr), topic->len);
  136. callbackData->onSubAck(*callbackData->connection, packetId, topicStr, qos, errorCode);
  137. }
  138. if (callbackData->topic)
  139. {
  140. aws_mem_release(
  141. callbackData->allocator, reinterpret_cast<void *>(const_cast<char *>(callbackData->topic)));
  142. }
  143. Crt::Delete(callbackData, callbackData->allocator);
  144. }
  145. struct MultiSubAckCallbackData
  146. {
  147. MultiSubAckCallbackData() : connection(nullptr), topic(nullptr), allocator(nullptr) {}
  148. MqttConnection *connection;
  149. OnMultiSubAckHandler onSubAck;
  150. const char *topic;
  151. Allocator *allocator;
  152. };
  153. void MqttConnection::s_onMultiSubAck(
  154. aws_mqtt_client_connection *,
  155. uint16_t packetId,
  156. const struct aws_array_list *topicSubacks,
  157. int errorCode,
  158. void *userData)
  159. {
  160. auto callbackData = reinterpret_cast<MultiSubAckCallbackData *>(userData);
  161. if (callbackData->onSubAck)
  162. {
  163. size_t length = aws_array_list_length(topicSubacks);
  164. Vector<String> topics;
  165. topics.reserve(length);
  166. QOS qos = AWS_MQTT_QOS_AT_MOST_ONCE;
  167. for (size_t i = 0; i < length; ++i)
  168. {
  169. aws_mqtt_topic_subscription *subscription = NULL;
  170. aws_array_list_get_at(topicSubacks, &subscription, i);
  171. topics.push_back(
  172. String(reinterpret_cast<char *>(subscription->topic.ptr), subscription->topic.len));
  173. qos = subscription->qos;
  174. }
  175. callbackData->onSubAck(*callbackData->connection, packetId, topics, qos, errorCode);
  176. }
  177. if (callbackData->topic)
  178. {
  179. aws_mem_release(
  180. callbackData->allocator, reinterpret_cast<void *>(const_cast<char *>(callbackData->topic)));
  181. }
  182. Crt::Delete(callbackData, callbackData->allocator);
  183. }
  184. void MqttConnection::s_connectionInit(
  185. MqttConnection *self,
  186. const char *hostName,
  187. uint16_t port,
  188. const Io::SocketOptions &socketOptions)
  189. {
  190. self->m_hostName = String(hostName);
  191. self->m_port = port;
  192. self->m_socketOptions = socketOptions;
  193. self->m_underlyingConnection = aws_mqtt_client_connection_new(self->m_owningClient);
  194. if (self->m_underlyingConnection)
  195. {
  196. aws_mqtt_client_connection_set_connection_interruption_handlers(
  197. self->m_underlyingConnection,
  198. MqttConnection::s_onConnectionInterrupted,
  199. self,
  200. MqttConnection::s_onConnectionResumed,
  201. self);
  202. }
  203. }
  204. void MqttConnection::s_onWebsocketHandshake(
  205. struct aws_http_message *rawRequest,
  206. void *user_data,
  207. aws_mqtt_transform_websocket_handshake_complete_fn *complete_fn,
  208. void *complete_ctx)
  209. {
  210. auto connection = reinterpret_cast<MqttConnection *>(user_data);
  211. Allocator *allocator = connection->m_owningClient->allocator;
  212. // we have to do this because of private constructors.
  213. auto toSeat =
  214. reinterpret_cast<Http::HttpRequest *>(aws_mem_acquire(allocator, sizeof(Http::HttpRequest)));
  215. toSeat = new (toSeat) Http::HttpRequest(allocator, rawRequest);
  216. std::shared_ptr<Http::HttpRequest> request = std::shared_ptr<Http::HttpRequest>(
  217. toSeat, [allocator](Http::HttpRequest *ptr) { Crt::Delete(ptr, allocator); });
  218. auto onInterceptComplete =
  219. [complete_fn,
  220. complete_ctx](const std::shared_ptr<Http::HttpRequest> &transformedRequest, int errorCode) {
  221. complete_fn(transformedRequest->GetUnderlyingMessage(), errorCode, complete_ctx);
  222. };
  223. connection->WebsocketInterceptor(request, onInterceptComplete);
  224. }
  225. MqttConnection::MqttConnection(
  226. aws_mqtt_client *client,
  227. const char *hostName,
  228. uint16_t port,
  229. const Io::SocketOptions &socketOptions,
  230. const Crt::Io::TlsContext &tlsContext,
  231. bool useWebsocket) noexcept
  232. : m_owningClient(client), m_tlsContext(tlsContext), m_tlsOptions(tlsContext.NewConnectionOptions()),
  233. m_onAnyCbData(nullptr), m_useTls(true), m_useWebsocket(useWebsocket)
  234. {
  235. s_connectionInit(this, hostName, port, socketOptions);
  236. }
  237. MqttConnection::MqttConnection(
  238. aws_mqtt_client *client,
  239. const char *hostName,
  240. uint16_t port,
  241. const Io::SocketOptions &socketOptions,
  242. bool useWebsocket) noexcept
  243. : m_owningClient(client), m_onAnyCbData(nullptr), m_useTls(false), m_useWebsocket(useWebsocket)
  244. {
  245. s_connectionInit(this, hostName, port, socketOptions);
  246. }
  247. MqttConnection::~MqttConnection()
  248. {
  249. if (*this)
  250. {
  251. aws_mqtt_client_connection_release(m_underlyingConnection);
  252. if (m_onAnyCbData)
  253. {
  254. auto pubCallbackData = reinterpret_cast<PubCallbackData *>(m_onAnyCbData);
  255. Crt::Delete(pubCallbackData, pubCallbackData->allocator);
  256. }
  257. }
  258. }
  259. MqttConnection::operator bool() const noexcept { return m_underlyingConnection != nullptr; }
  260. int MqttConnection::LastError() const noexcept { return aws_last_error(); }
  261. bool MqttConnection::SetWill(const char *topic, QOS qos, bool retain, const ByteBuf &payload) noexcept
  262. {
  263. ByteBuf topicBuf = aws_byte_buf_from_c_str(topic);
  264. ByteCursor topicCur = aws_byte_cursor_from_buf(&topicBuf);
  265. ByteCursor payloadCur = aws_byte_cursor_from_buf(&payload);
  266. return aws_mqtt_client_connection_set_will(
  267. m_underlyingConnection, &topicCur, qos, retain, &payloadCur) == 0;
  268. }
  269. bool MqttConnection::SetLogin(const char *userName, const char *password) noexcept
  270. {
  271. ByteBuf userNameBuf = aws_byte_buf_from_c_str(userName);
  272. ByteCursor userNameCur = aws_byte_cursor_from_buf(&userNameBuf);
  273. ByteCursor *pwdCurPtr = nullptr;
  274. ByteCursor pwdCur;
  275. if (password)
  276. {
  277. pwdCur = ByteCursorFromCString(password);
  278. pwdCurPtr = &pwdCur;
  279. }
  280. return aws_mqtt_client_connection_set_login(m_underlyingConnection, &userNameCur, pwdCurPtr) == 0;
  281. }
  282. bool MqttConnection::SetWebsocketProxyOptions(
  283. const Http::HttpClientConnectionProxyOptions &proxyOptions) noexcept
  284. {
  285. m_proxyOptions = proxyOptions;
  286. return true;
  287. }
  288. bool MqttConnection::SetHttpProxyOptions(
  289. const Http::HttpClientConnectionProxyOptions &proxyOptions) noexcept
  290. {
  291. m_proxyOptions = proxyOptions;
  292. return true;
  293. }
  294. bool MqttConnection::SetReconnectTimeout(uint64_t min_seconds, uint64_t max_seconds) noexcept
  295. {
  296. return aws_mqtt_client_connection_set_reconnect_timeout(
  297. m_underlyingConnection, min_seconds, max_seconds) == 0;
  298. }
  299. bool MqttConnection::Connect(
  300. const char *clientId,
  301. bool cleanSession,
  302. uint16_t keepAliveTime,
  303. uint32_t pingTimeoutMs,
  304. uint32_t protocolOperationTimeoutMs) noexcept
  305. {
  306. aws_mqtt_connection_options options;
  307. AWS_ZERO_STRUCT(options);
  308. options.client_id = aws_byte_cursor_from_c_str(clientId);
  309. options.host_name = aws_byte_cursor_from_array(
  310. reinterpret_cast<const uint8_t *>(m_hostName.data()), m_hostName.length());
  311. options.tls_options =
  312. m_useTls ? const_cast<aws_tls_connection_options *>(m_tlsOptions.GetUnderlyingHandle()) : nullptr;
  313. options.port = m_port;
  314. options.socket_options = &m_socketOptions.GetImpl();
  315. options.clean_session = cleanSession;
  316. options.keep_alive_time_secs = keepAliveTime;
  317. options.ping_timeout_ms = pingTimeoutMs;
  318. options.protocol_operation_timeout_ms = protocolOperationTimeoutMs;
  319. options.on_connection_complete = MqttConnection::s_onConnectionCompleted;
  320. options.user_data = this;
  321. if (m_useWebsocket)
  322. {
  323. if (WebsocketInterceptor)
  324. {
  325. if (aws_mqtt_client_connection_use_websockets(
  326. m_underlyingConnection, MqttConnection::s_onWebsocketHandshake, this, nullptr, nullptr))
  327. {
  328. return false;
  329. }
  330. }
  331. else
  332. {
  333. if (aws_mqtt_client_connection_use_websockets(
  334. m_underlyingConnection, nullptr, nullptr, nullptr, nullptr))
  335. {
  336. return false;
  337. }
  338. }
  339. }
  340. if (m_proxyOptions)
  341. {
  342. struct aws_http_proxy_options proxyOptions;
  343. m_proxyOptions->InitializeRawProxyOptions(proxyOptions);
  344. if (aws_mqtt_client_connection_set_http_proxy_options(m_underlyingConnection, &proxyOptions))
  345. {
  346. return false;
  347. }
  348. }
  349. return aws_mqtt_client_connection_connect(m_underlyingConnection, &options) == AWS_OP_SUCCESS;
  350. }
  351. bool MqttConnection::Disconnect() noexcept
  352. {
  353. return aws_mqtt_client_connection_disconnect(
  354. m_underlyingConnection, MqttConnection::s_onDisconnect, this) == AWS_OP_SUCCESS;
  355. }
  356. aws_mqtt_client_connection *MqttConnection::GetUnderlyingConnection() noexcept
  357. {
  358. return m_underlyingConnection;
  359. }
  360. bool MqttConnection::SetOnMessageHandler(OnPublishReceivedHandler &&onPublish) noexcept
  361. {
  362. return SetOnMessageHandler(
  363. [onPublish](
  364. MqttConnection &connection, const String &topic, const ByteBuf &payload, bool, QOS, bool) {
  365. onPublish(connection, topic, payload);
  366. });
  367. }
  368. bool MqttConnection::SetOnMessageHandler(OnMessageReceivedHandler &&onMessage) noexcept
  369. {
  370. auto pubCallbackData = Aws::Crt::New<PubCallbackData>(m_owningClient->allocator);
  371. if (!pubCallbackData)
  372. {
  373. return false;
  374. }
  375. pubCallbackData->connection = this;
  376. pubCallbackData->onMessageReceived = std::move(onMessage);
  377. pubCallbackData->allocator = m_owningClient->allocator;
  378. if (!aws_mqtt_client_connection_set_on_any_publish_handler(
  379. m_underlyingConnection, s_onPublish, pubCallbackData))
  380. {
  381. m_onAnyCbData = reinterpret_cast<void *>(pubCallbackData);
  382. return true;
  383. }
  384. Aws::Crt::Delete(pubCallbackData, pubCallbackData->allocator);
  385. return false;
  386. }
  387. uint16_t MqttConnection::Subscribe(
  388. const char *topicFilter,
  389. QOS qos,
  390. OnPublishReceivedHandler &&onPublish,
  391. OnSubAckHandler &&onSubAck) noexcept
  392. {
  393. return Subscribe(
  394. topicFilter,
  395. qos,
  396. [onPublish](
  397. MqttConnection &connection, const String &topic, const ByteBuf &payload, bool, QOS, bool) {
  398. onPublish(connection, topic, payload);
  399. },
  400. std::move(onSubAck));
  401. }
  402. uint16_t MqttConnection::Subscribe(
  403. const char *topicFilter,
  404. QOS qos,
  405. OnMessageReceivedHandler &&onMessage,
  406. OnSubAckHandler &&onSubAck) noexcept
  407. {
  408. auto pubCallbackData = Crt::New<PubCallbackData>(m_owningClient->allocator);
  409. if (!pubCallbackData)
  410. {
  411. return 0;
  412. }
  413. pubCallbackData->connection = this;
  414. pubCallbackData->onMessageReceived = std::move(onMessage);
  415. pubCallbackData->allocator = m_owningClient->allocator;
  416. auto subAckCallbackData = Crt::New<SubAckCallbackData>(m_owningClient->allocator);
  417. if (!subAckCallbackData)
  418. {
  419. Crt::Delete(pubCallbackData, m_owningClient->allocator);
  420. return 0;
  421. }
  422. subAckCallbackData->connection = this;
  423. subAckCallbackData->allocator = m_owningClient->allocator;
  424. subAckCallbackData->onSubAck = std::move(onSubAck);
  425. subAckCallbackData->topic = nullptr;
  426. subAckCallbackData->allocator = m_owningClient->allocator;
  427. ByteBuf topicFilterBuf = aws_byte_buf_from_c_str(topicFilter);
  428. ByteCursor topicFilterCur = aws_byte_cursor_from_buf(&topicFilterBuf);
  429. uint16_t packetId = aws_mqtt_client_connection_subscribe(
  430. m_underlyingConnection,
  431. &topicFilterCur,
  432. qos,
  433. s_onPublish,
  434. pubCallbackData,
  435. s_cleanUpOnPublishData,
  436. s_onSubAck,
  437. subAckCallbackData);
  438. if (!packetId)
  439. {
  440. Crt::Delete(pubCallbackData, pubCallbackData->allocator);
  441. Crt::Delete(subAckCallbackData, subAckCallbackData->allocator);
  442. }
  443. return packetId;
  444. }
  445. uint16_t MqttConnection::Subscribe(
  446. const Vector<std::pair<const char *, OnPublishReceivedHandler>> &topicFilters,
  447. QOS qos,
  448. OnMultiSubAckHandler &&onSubAck) noexcept
  449. {
  450. Vector<std::pair<const char *, OnMessageReceivedHandler>> newTopicFilters;
  451. newTopicFilters.reserve(topicFilters.size());
  452. for (const auto &pair : topicFilters)
  453. {
  454. const OnPublishReceivedHandler &pubHandler = pair.second;
  455. newTopicFilters.emplace_back(
  456. pair.first,
  457. [pubHandler](
  458. MqttConnection &connection, const String &topic, const ByteBuf &payload, bool, QOS, bool) {
  459. pubHandler(connection, topic, payload);
  460. });
  461. }
  462. return Subscribe(newTopicFilters, qos, std::move(onSubAck));
  463. }
  464. uint16_t MqttConnection::Subscribe(
  465. const Vector<std::pair<const char *, OnMessageReceivedHandler>> &topicFilters,
  466. QOS qos,
  467. OnMultiSubAckHandler &&onSubAck) noexcept
  468. {
  469. uint16_t packetId = 0;
  470. auto subAckCallbackData = Crt::New<MultiSubAckCallbackData>(m_owningClient->allocator);
  471. if (!subAckCallbackData)
  472. {
  473. return 0;
  474. }
  475. aws_array_list multiPub;
  476. AWS_ZERO_STRUCT(multiPub);
  477. if (aws_array_list_init_dynamic(
  478. &multiPub, m_owningClient->allocator, topicFilters.size(), sizeof(aws_mqtt_topic_subscription)))
  479. {
  480. Crt::Delete(subAckCallbackData, m_owningClient->allocator);
  481. return 0;
  482. }
  483. for (auto &topicFilter : topicFilters)
  484. {
  485. auto pubCallbackData = Crt::New<PubCallbackData>(m_owningClient->allocator);
  486. if (!pubCallbackData)
  487. {
  488. goto clean_up;
  489. }
  490. pubCallbackData->connection = this;
  491. pubCallbackData->onMessageReceived = topicFilter.second;
  492. pubCallbackData->allocator = m_owningClient->allocator;
  493. ByteBuf topicFilterBuf = aws_byte_buf_from_c_str(topicFilter.first);
  494. ByteCursor topicFilterCur = aws_byte_cursor_from_buf(&topicFilterBuf);
  495. aws_mqtt_topic_subscription subscription;
  496. subscription.on_cleanup = s_cleanUpOnPublishData;
  497. subscription.on_publish = s_onPublish;
  498. subscription.on_publish_ud = pubCallbackData;
  499. subscription.qos = qos;
  500. subscription.topic = topicFilterCur;
  501. aws_array_list_push_back(&multiPub, reinterpret_cast<const void *>(&subscription));
  502. }
  503. subAckCallbackData->connection = this;
  504. subAckCallbackData->allocator = m_owningClient->allocator;
  505. subAckCallbackData->onSubAck = std::move(onSubAck);
  506. subAckCallbackData->topic = nullptr;
  507. subAckCallbackData->allocator = m_owningClient->allocator;
  508. packetId = aws_mqtt_client_connection_subscribe_multiple(
  509. m_underlyingConnection, &multiPub, s_onMultiSubAck, subAckCallbackData);
  510. clean_up:
  511. if (!packetId)
  512. {
  513. size_t length = aws_array_list_length(&multiPub);
  514. for (size_t i = 0; i < length; ++i)
  515. {
  516. aws_mqtt_topic_subscription *subscription = NULL;
  517. aws_array_list_get_at_ptr(&multiPub, reinterpret_cast<void **>(&subscription), i);
  518. auto pubCallbackData = reinterpret_cast<PubCallbackData *>(subscription->on_publish_ud);
  519. Crt::Delete(pubCallbackData, m_owningClient->allocator);
  520. }
  521. Crt::Delete(subAckCallbackData, m_owningClient->allocator);
  522. }
  523. aws_array_list_clean_up(&multiPub);
  524. return packetId;
  525. }
  526. uint16_t MqttConnection::Unsubscribe(
  527. const char *topicFilter,
  528. OnOperationCompleteHandler &&onOpComplete) noexcept
  529. {
  530. auto opCompleteCallbackData = Crt::New<OpCompleteCallbackData>(m_owningClient->allocator);
  531. if (!opCompleteCallbackData)
  532. {
  533. return 0;
  534. }
  535. opCompleteCallbackData->connection = this;
  536. opCompleteCallbackData->allocator = m_owningClient->allocator;
  537. opCompleteCallbackData->onOperationComplete = std::move(onOpComplete);
  538. opCompleteCallbackData->topic = nullptr;
  539. ByteBuf topicFilterBuf = aws_byte_buf_from_c_str(topicFilter);
  540. ByteCursor topicFilterCur = aws_byte_cursor_from_buf(&topicFilterBuf);
  541. uint16_t packetId = aws_mqtt_client_connection_unsubscribe(
  542. m_underlyingConnection, &topicFilterCur, s_onOpComplete, opCompleteCallbackData);
  543. if (!packetId)
  544. {
  545. Crt::Delete(opCompleteCallbackData, m_owningClient->allocator);
  546. }
  547. return packetId;
  548. }
  549. uint16_t MqttConnection::Publish(
  550. const char *topic,
  551. QOS qos,
  552. bool retain,
  553. const ByteBuf &payload,
  554. OnOperationCompleteHandler &&onOpComplete) noexcept
  555. {
  556. auto opCompleteCallbackData = Crt::New<OpCompleteCallbackData>(m_owningClient->allocator);
  557. if (!opCompleteCallbackData)
  558. {
  559. return 0;
  560. }
  561. size_t topicLen = strnlen(topic, AWS_MQTT_MAX_TOPIC_LENGTH) + 1;
  562. char *topicCpy =
  563. reinterpret_cast<char *>(aws_mem_calloc(m_owningClient->allocator, topicLen, sizeof(char)));
  564. if (!topicCpy)
  565. {
  566. Crt::Delete(opCompleteCallbackData, m_owningClient->allocator);
  567. }
  568. memcpy(topicCpy, topic, topicLen);
  569. opCompleteCallbackData->connection = this;
  570. opCompleteCallbackData->allocator = m_owningClient->allocator;
  571. opCompleteCallbackData->onOperationComplete = std::move(onOpComplete);
  572. opCompleteCallbackData->topic = topicCpy;
  573. ByteCursor topicCur = aws_byte_cursor_from_array(topicCpy, topicLen - 1);
  574. ByteCursor payloadCur = aws_byte_cursor_from_buf(&payload);
  575. uint16_t packetId = aws_mqtt_client_connection_publish(
  576. m_underlyingConnection,
  577. &topicCur,
  578. qos,
  579. retain,
  580. &payloadCur,
  581. s_onOpComplete,
  582. opCompleteCallbackData);
  583. if (!packetId)
  584. {
  585. aws_mem_release(m_owningClient->allocator, reinterpret_cast<void *>(topicCpy));
  586. Crt::Delete(opCompleteCallbackData, m_owningClient->allocator);
  587. }
  588. return packetId;
  589. }
  590. const MqttConnectionOperationStatistics &MqttConnection::GetOperationStatistics() noexcept
  591. {
  592. aws_mqtt_connection_operation_statistics m_operationStatisticsNative = {0, 0, 0, 0};
  593. if (m_underlyingConnection != nullptr)
  594. {
  595. aws_mqtt_client_connection_get_stats(m_underlyingConnection, &m_operationStatisticsNative);
  596. m_operationStatistics.incompleteOperationCount =
  597. m_operationStatisticsNative.incomplete_operation_count;
  598. m_operationStatistics.incompleteOperationSize =
  599. m_operationStatisticsNative.incomplete_operation_size;
  600. m_operationStatistics.unackedOperationCount = m_operationStatisticsNative.unacked_operation_count;
  601. m_operationStatistics.unackedOperationSize = m_operationStatisticsNative.unacked_operation_size;
  602. }
  603. return m_operationStatistics;
  604. }
  605. MqttClient::MqttClient(Io::ClientBootstrap &bootstrap, Allocator *allocator) noexcept
  606. : m_client(aws_mqtt_client_new(allocator, bootstrap.GetUnderlyingHandle()))
  607. {
  608. }
  609. MqttClient::MqttClient(Allocator *allocator) noexcept
  610. : m_client(aws_mqtt_client_new(
  611. allocator,
  612. Crt::ApiHandle::GetOrCreateStaticDefaultClientBootstrap()->GetUnderlyingHandle()))
  613. {
  614. }
  615. MqttClient::~MqttClient()
  616. {
  617. aws_mqtt_client_release(m_client);
  618. m_client = nullptr;
  619. }
  620. MqttClient::MqttClient(MqttClient &&toMove) noexcept : m_client(toMove.m_client)
  621. {
  622. toMove.m_client = nullptr;
  623. }
  624. MqttClient &MqttClient::operator=(MqttClient &&toMove) noexcept
  625. {
  626. if (&toMove != this)
  627. {
  628. m_client = toMove.m_client;
  629. toMove.m_client = nullptr;
  630. }
  631. return *this;
  632. }
  633. MqttClient::operator bool() const noexcept { return m_client != nullptr; }
  634. int MqttClient::LastError() const noexcept { return aws_last_error(); }
  635. std::shared_ptr<MqttConnection> MqttClient::NewConnection(
  636. const char *hostName,
  637. uint16_t port,
  638. const Io::SocketOptions &socketOptions,
  639. const Crt::Io::TlsContext &tlsContext,
  640. bool useWebsocket) noexcept
  641. {
  642. if (!tlsContext)
  643. {
  644. AWS_LOGF_ERROR(
  645. AWS_LS_MQTT_CLIENT,
  646. "id=%p Trying to call MqttClient::NewConnection using an invalid TlsContext.",
  647. (void *)m_client);
  648. aws_raise_error(AWS_ERROR_INVALID_ARGUMENT);
  649. return nullptr;
  650. }
  651. // If you're reading this and asking.... why is this so complicated? Why not use make_shared
  652. // or allocate_shared? Well, MqttConnection constructors are private and stl is dumb like that.
  653. // so, we do it manually.
  654. Allocator *allocator = m_client->allocator;
  655. MqttConnection *toSeat =
  656. reinterpret_cast<MqttConnection *>(aws_mem_acquire(allocator, sizeof(MqttConnection)));
  657. if (!toSeat)
  658. {
  659. return nullptr;
  660. }
  661. toSeat = new (toSeat) MqttConnection(m_client, hostName, port, socketOptions, tlsContext, useWebsocket);
  662. return std::shared_ptr<MqttConnection>(toSeat, [allocator](MqttConnection *connection) {
  663. connection->~MqttConnection();
  664. aws_mem_release(allocator, reinterpret_cast<void *>(connection));
  665. });
  666. }
  667. std::shared_ptr<MqttConnection> MqttClient::NewConnection(
  668. const char *hostName,
  669. uint16_t port,
  670. const Io::SocketOptions &socketOptions,
  671. bool useWebsocket) noexcept
  672. {
  673. // If you're reading this and asking.... why is this so complicated? Why not use make_shared
  674. // or allocate_shared? Well, MqttConnection constructors are private and stl is dumb like that.
  675. // so, we do it manually.
  676. Allocator *allocator = m_client->allocator;
  677. MqttConnection *toSeat =
  678. reinterpret_cast<MqttConnection *>(aws_mem_acquire(m_client->allocator, sizeof(MqttConnection)));
  679. if (!toSeat)
  680. {
  681. return nullptr;
  682. }
  683. toSeat = new (toSeat) MqttConnection(m_client, hostName, port, socketOptions, useWebsocket);
  684. return std::shared_ptr<MqttConnection>(toSeat, [allocator](MqttConnection *connection) {
  685. connection->~MqttConnection();
  686. aws_mem_release(allocator, reinterpret_cast<void *>(connection));
  687. });
  688. }
  689. } // namespace Mqtt
  690. } // namespace Crt
  691. } // namespace Aws