123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737738739740741742743744745746747748749750751752753754755756757758759760761762763764765766767768769770771772773774775776777778779780781782783784785786787788789790791792793794795796797798799800801802803804805806807808809810811812813814815816817818819820821822823824825826827828829830831832833834835836837838839840841842843844845846847848849850851852853854855856857858859860861862863864865866867868869870871872873874875876877878879880881882883884885886887888889890891892893894895896897898899900901902903904905906907908909910911912913914915916917918919920921922923924925926927928929930931932933934935936937938939940941942943944945946947948949950951952953954955956957958959960961962963964965966967968969970971972973974975976977978979980981982983984985986987988989990991992993994995996997998999100010011002100310041005100610071008100910101011101210131014101510161017101810191020102110221023102410251026102710281029103010311032103310341035103610371038103910401041104210431044104510461047104810491050105110521053105410551056105710581059106010611062106310641065106610671068106910701071107210731074107510761077107810791080108110821083108410851086108710881089109010911092109310941095109610971098109911001101110211031104110511061107110811091110111111121113111411151116111711181119112011211122112311241125112611271128112911301131113211331134113511361137113811391140114111421143114411451146114711481149115011511152115311541155115611571158115911601161116211631164116511661167116811691170117111721173117411751176117711781179118011811182118311841185118611871188118911901191119211931194119511961197119811991200120112021203120412051206120712081209121012111212121312141215121612171218121912201221122212231224122512261227122812291230123112321233123412351236 |
- /**
- * Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved.
- * SPDX-License-Identifier: Apache-2.0.
- */
- #include <aws/crt/mqtt/Mqtt5Client.h>
- #include <aws/crt/mqtt/Mqtt5Packets.h>
- namespace Aws
- {
- namespace Crt
- {
- namespace Mqtt5
- {
- template <typename T> void setPacketVector(Vector<T> &vector, const T *values, size_t length)
- {
- vector.clear();
- for (size_t i = 0; i < length; ++i)
- {
- vector.push_back(values[i]);
- }
- }
- template <typename T> void setPacketOptional(Optional<T> &optional, const T *value)
- {
- if (value != nullptr)
- {
- optional = *value;
- }
- else
- {
- optional.reset();
- }
- }
- void setPacketStringOptional(
- Optional<aws_byte_cursor> &optional,
- Crt::String &optionalStorage,
- const aws_byte_cursor *value)
- {
- if (value != nullptr)
- {
- optionalStorage = Crt::String((const char *)value->ptr, value->len);
- struct aws_byte_cursor optional_cursor;
- optional_cursor.ptr = (uint8_t *)optionalStorage.c_str();
- optional_cursor.len = optionalStorage.size();
- optional = optional_cursor;
- }
- }
- void setPacketStringOptional(Optional<Crt::String> &optional, const aws_byte_cursor *value)
- {
- if (value != nullptr)
- {
- optional = Crt::String((const char *)value->ptr, value->len);
- }
- else
- {
- optional.reset();
- }
- }
- void setPacketStringOptional(Optional<Crt::String> &optional, Crt::String &&toMove)
- {
- if (!toMove.empty())
- {
- optional = std::move(toMove);
- }
- else
- {
- optional.reset();
- }
- }
- void setPacketByteBufOptional(
- Optional<aws_byte_cursor> &optional,
- ByteBuf &optionalStorage,
- Allocator *allocator,
- const aws_byte_cursor *value)
- {
- aws_byte_buf_clean_up(&optionalStorage);
- AWS_ZERO_STRUCT(optionalStorage);
- if (value != nullptr)
- {
- aws_byte_buf_init_copy_from_cursor(&optionalStorage, allocator, *value);
- optional = aws_byte_cursor_from_buf(&optionalStorage);
- }
- else
- {
- optional.reset();
- }
- }
- void setUserProperties(
- Vector<UserProperty> &userProperties,
- const struct aws_mqtt5_user_property *properties,
- size_t propertyCount)
- {
- for (size_t i = 0; i < propertyCount; ++i)
- {
- userProperties.push_back(UserProperty(
- Aws::Crt::String((const char *)properties[i].name.ptr, properties[i].name.len),
- Aws::Crt::String((const char *)properties[i].value.ptr, properties[i].value.len)));
- }
- }
- template <typename T> void setNullableFromOptional(const T *&nullable, const Optional<T> &optional)
- {
- if (optional.has_value())
- {
- nullable = &optional.value();
- }
- }
- void s_AllocateUnderlyingUserProperties(
- aws_mqtt5_user_property *&dst,
- const Crt::Vector<UserProperty> &userProperties,
- Allocator *allocator)
- {
- if (dst != nullptr)
- {
- aws_mem_release(allocator, (void *)dst);
- dst = nullptr;
- }
- if (userProperties.size() > 0)
- {
- dst = reinterpret_cast<struct aws_mqtt5_user_property *>(
- aws_mem_calloc(allocator, userProperties.size(), sizeof(aws_mqtt5_user_property)));
- AWS_ZERO_STRUCT(*dst);
- for (size_t index = 0; index < userProperties.size(); ++index)
- {
- (dst + index)->name = aws_byte_cursor_from_array(
- userProperties[index].getName().c_str(), userProperties[index].getName().length());
- (dst + index)->value = aws_byte_cursor_from_array(
- userProperties[index].getValue().c_str(), userProperties[index].getValue().length());
- }
- }
- }
- void s_AllocateStringVector(
- aws_array_list &dst,
- const Crt::Vector<String> &stringVector,
- Allocator *allocator)
- {
- AWS_ZERO_STRUCT(dst);
- if (aws_array_list_init_dynamic(&dst, allocator, stringVector.size(), sizeof(aws_byte_cursor)) !=
- AWS_OP_SUCCESS)
- {
- return;
- }
- for (auto &topic : stringVector)
- {
- ByteCursor topicCursor = ByteCursorFromString(topic);
- aws_array_list_push_back(&dst, reinterpret_cast<const void *>(&topicCursor));
- }
- }
- void s_AllocateUnderlyingSubscription(
- aws_mqtt5_subscription_view *&dst,
- const Crt::Vector<Subscription> &subscriptions,
- Allocator *allocator)
- {
- if (dst != nullptr)
- {
- aws_mem_release(allocator, dst);
- dst = nullptr;
- }
- aws_array_list subscription_list;
- AWS_ZERO_STRUCT(subscription_list);
- if (aws_array_list_init_dynamic(
- &subscription_list, allocator, subscriptions.size(), sizeof(aws_mqtt5_subscription_view)) !=
- AWS_OP_SUCCESS)
- {
- return;
- }
- for (auto &subscription : subscriptions)
- {
- aws_mqtt5_subscription_view underlying_subscription;
- if (subscription.initializeRawOptions(underlying_subscription) != true)
- {
- goto clean_up;
- }
- aws_array_list_push_back(
- &subscription_list, reinterpret_cast<const void *>(&underlying_subscription));
- }
- dst = static_cast<aws_mqtt5_subscription_view *>(subscription_list.data);
- return;
- clean_up:
- aws_array_list_clean_up(&subscription_list);
- }
- ConnectPacket::ConnectPacket(Allocator *allocator) noexcept
- : m_allocator(allocator), m_keepAliveIntervalSec(1200), m_userPropertiesStorage(nullptr)
- {
- // m_clientId.clear();
- AWS_ZERO_STRUCT(m_usernameCursor);
- AWS_ZERO_STRUCT(m_passowrdStorage);
- AWS_ZERO_STRUCT(m_willStorage);
- }
- ConnectPacket &ConnectPacket::withKeepAliveIntervalSec(uint16_t second) noexcept
- {
- m_keepAliveIntervalSec = second;
- return *this;
- }
- ConnectPacket &ConnectPacket::withClientId(Crt::String client_id) noexcept
- {
- m_clientId = std::move(client_id);
- return *this;
- }
- ConnectPacket &ConnectPacket::withUserName(Crt::String username) noexcept
- {
- m_username = std::move(username);
- m_usernameCursor = ByteCursorFromString(m_username.value());
- return *this;
- }
- ConnectPacket &ConnectPacket::withPassword(Crt::ByteCursor password) noexcept
- {
- setPacketByteBufOptional(m_password, m_passowrdStorage, m_allocator, &password);
- return *this;
- }
- ConnectPacket &ConnectPacket::withSessionExpiryIntervalSec(uint32_t sessionExpiryIntervalSec) noexcept
- {
- m_sessionExpiryIntervalSec = sessionExpiryIntervalSec;
- return *this;
- }
- ConnectPacket &ConnectPacket::withRequestResponseInformation(bool requestResponseInformation) noexcept
- {
- m_requestResponseInformation = requestResponseInformation;
- return *this;
- }
- ConnectPacket &ConnectPacket::withRequestProblemInformation(bool requestProblemInformation) noexcept
- {
- m_requestProblemInformation = requestProblemInformation;
- return *this;
- }
- ConnectPacket &ConnectPacket::withReceiveMaximum(uint16_t receiveMaximum) noexcept
- {
- m_receiveMaximum = receiveMaximum;
- return *this;
- }
- ConnectPacket &ConnectPacket::withMaximumPacketSizeBytes(uint32_t maximumPacketSizeBytes) noexcept
- {
- m_maximumPacketSizeBytes = maximumPacketSizeBytes;
- return *this;
- }
- ConnectPacket &ConnectPacket::withWillDelayIntervalSec(uint32_t willDelayIntervalSec) noexcept
- {
- m_willDelayIntervalSeconds = willDelayIntervalSec;
- return *this;
- }
- ConnectPacket &ConnectPacket::withWill(std::shared_ptr<PublishPacket> will) noexcept
- {
- m_will = will;
- m_will.value()->initializeRawOptions(m_willStorage);
- return *this;
- }
- ConnectPacket &ConnectPacket::withUserProperties(const Vector<UserProperty> &userProperties) noexcept
- {
- m_userProperties = userProperties;
- return *this;
- }
- ConnectPacket &ConnectPacket::withUserProperties(Vector<UserProperty> &&userProperties) noexcept
- {
- m_userProperties = userProperties;
- return *this;
- }
- ConnectPacket &ConnectPacket::withUserProperty(UserProperty &&property) noexcept
- {
- m_userProperties.push_back(std::move(property));
- return *this;
- }
- bool ConnectPacket::initializeRawOptions(
- aws_mqtt5_packet_connect_view &raw_options,
- Allocator * /*allocator*/) noexcept
- {
- AWS_ZERO_STRUCT(raw_options);
- raw_options.keep_alive_interval_seconds = m_keepAliveIntervalSec;
- raw_options.client_id = ByteCursorFromString(m_clientId);
- if (m_username.has_value())
- {
- raw_options.username = &m_usernameCursor;
- }
- if (m_password.has_value())
- {
- raw_options.password = &m_password.value();
- }
- if (m_sessionExpiryIntervalSec.has_value())
- {
- raw_options.session_expiry_interval_seconds = &m_sessionExpiryIntervalSec.value();
- }
- if (m_requestProblemInformation.has_value())
- {
- m_requestResponseInformationStorage = m_requestResponseInformation.value() ? 1 : 0;
- raw_options.request_response_information = &m_requestResponseInformationStorage;
- }
- if (m_requestProblemInformation.has_value())
- {
- m_requestProblemInformationStorage = m_requestProblemInformation.value() ? 1 : 0;
- raw_options.request_problem_information = &m_requestProblemInformationStorage;
- }
- if (m_receiveMaximum.has_value())
- {
- raw_options.receive_maximum = &m_receiveMaximum.value();
- }
- if (m_maximumPacketSizeBytes.has_value())
- {
- raw_options.maximum_packet_size_bytes = &m_maximumPacketSizeBytes.value();
- }
- if (m_willDelayIntervalSeconds.has_value())
- {
- raw_options.will_delay_interval_seconds = &m_willDelayIntervalSeconds.value();
- }
- if (m_will.has_value())
- {
- raw_options.will = &m_willStorage;
- }
- s_AllocateUnderlyingUserProperties(m_userPropertiesStorage, m_userProperties, m_allocator);
- raw_options.user_properties = m_userPropertiesStorage;
- raw_options.user_property_count = m_userProperties.size();
- return true;
- }
- ConnectPacket::~ConnectPacket()
- {
- if (m_userPropertiesStorage != nullptr)
- {
- aws_mem_release(m_allocator, m_userPropertiesStorage);
- m_userProperties.clear();
- }
- aws_byte_buf_clean_up(&m_passowrdStorage);
- }
- uint16_t ConnectPacket::getKeepAliveIntervalSec() const noexcept { return m_keepAliveIntervalSec; }
- const Crt::String &ConnectPacket::getClientId() const noexcept { return m_clientId; }
- const Crt::Optional<Crt::String> &ConnectPacket::getUsername() const noexcept { return m_username; }
- const Crt::Optional<Crt::ByteCursor> &ConnectPacket::getPassword() const noexcept { return m_password; }
- const Crt::Optional<uint32_t> &ConnectPacket::getSessionExpiryIntervalSec() const noexcept
- {
- return m_sessionExpiryIntervalSec;
- }
- const Crt::Optional<bool> &ConnectPacket::getRequestResponseInformation() const noexcept
- {
- return m_requestResponseInformation;
- }
- const Crt::Optional<bool> &ConnectPacket::getRequestProblemInformation() const noexcept
- {
- return m_requestProblemInformation;
- }
- const Crt::Optional<uint16_t> &ConnectPacket::getReceiveMaximum() const noexcept
- {
- return m_receiveMaximum;
- }
- const Crt::Optional<uint32_t> &ConnectPacket::getMaximumPacketSizeBytes() const noexcept
- {
- return m_maximumPacketSizeBytes;
- }
- const Crt::Optional<uint32_t> &ConnectPacket::getWillDelayIntervalSec() const noexcept
- {
- return m_willDelayIntervalSeconds;
- }
- const Crt::Optional<std::shared_ptr<PublishPacket>> &ConnectPacket::getWill() const noexcept
- {
- return m_will;
- }
- const Crt::Vector<UserProperty> &ConnectPacket::getUserProperties() const noexcept
- {
- return m_userProperties;
- }
- UserProperty::UserProperty(Crt::String name, Crt::String value) noexcept
- : m_name(std::move(name)), m_value(std::move(value))
- {
- }
- UserProperty::~UserProperty() noexcept {}
- UserProperty::UserProperty(const UserProperty &toCopy) noexcept
- : m_name(toCopy.getName()), m_value(toCopy.getValue())
- {
- }
- UserProperty::UserProperty(UserProperty &&toMove) noexcept
- : m_name(std::move(toMove.m_name)), m_value(std::move(toMove.m_value))
- {
- }
- UserProperty &UserProperty::operator=(const UserProperty &toCopy) noexcept
- {
- if (&toCopy != this)
- {
- m_name = toCopy.getName();
- m_value = toCopy.getValue();
- }
- return *this;
- }
- UserProperty &UserProperty::operator=(UserProperty &&toMove) noexcept
- {
- if (&toMove != this)
- {
- m_name = std::move(toMove.m_name);
- m_value = std::move(toMove.m_value);
- }
- return *this;
- }
- PublishPacket::PublishPacket(const aws_mqtt5_packet_publish_view &packet, Allocator *allocator) noexcept
- : m_allocator(allocator), m_qos(packet.qos), m_retain(packet.retain),
- m_topicName((const char *)packet.topic.ptr, packet.topic.len), m_userPropertiesStorage(nullptr)
- {
- AWS_ZERO_STRUCT(m_payloadStorage);
- AWS_ZERO_STRUCT(m_contentTypeStorage);
- AWS_ZERO_STRUCT(m_correlationDataStorage);
- AWS_ZERO_STRUCT(m_payload);
- withPayload(packet.payload);
- setPacketOptional(m_payloadFormatIndicator, packet.payload_format);
- setPacketOptional(m_messageExpiryIntervalSec, packet.message_expiry_interval_seconds);
- setPacketStringOptional(m_responseTopic, m_responseTopicString, packet.response_topic);
- setPacketByteBufOptional(
- m_correlationData, m_correlationDataStorage, allocator, packet.correlation_data);
- setPacketByteBufOptional(m_contentType, m_contentTypeStorage, allocator, packet.content_type);
- setPacketVector(
- m_subscriptionIdentifiers, packet.subscription_identifiers, packet.subscription_identifier_count);
- setUserProperties(m_userProperties, packet.user_properties, packet.user_property_count);
- }
- /* Default constructor */
- PublishPacket::PublishPacket(Allocator *allocator) noexcept
- : m_allocator(allocator), m_qos(QOS::AWS_MQTT5_QOS_AT_MOST_ONCE), m_retain(false), m_topicName(""),
- m_userPropertiesStorage(nullptr)
- {
- AWS_ZERO_STRUCT(m_payloadStorage);
- AWS_ZERO_STRUCT(m_contentTypeStorage);
- AWS_ZERO_STRUCT(m_correlationDataStorage);
- AWS_ZERO_STRUCT(m_payload);
- }
- PublishPacket::PublishPacket(
- Crt::String topic,
- ByteCursor payload,
- Mqtt5::QOS qos,
- Allocator *allocator) noexcept
- : m_allocator(allocator), m_qos(qos), m_retain(false), m_topicName(std::move(topic)),
- m_userPropertiesStorage(nullptr)
- {
- AWS_ZERO_STRUCT(m_payloadStorage);
- AWS_ZERO_STRUCT(m_contentTypeStorage);
- AWS_ZERO_STRUCT(m_correlationDataStorage);
- AWS_ZERO_STRUCT(m_payload);
- // Setup message payload, sync with PublishPacket::withPayload
- aws_byte_buf_clean_up(&m_payloadStorage);
- aws_byte_buf_init_copy_from_cursor(&m_payloadStorage, m_allocator, payload);
- m_payload = aws_byte_cursor_from_buf(&m_payloadStorage);
- }
- PublishPacket &PublishPacket::withPayload(ByteCursor payload) noexcept
- {
- aws_byte_buf_clean_up(&m_payloadStorage);
- aws_byte_buf_init_copy_from_cursor(&m_payloadStorage, m_allocator, payload);
- m_payload = aws_byte_cursor_from_buf(&m_payloadStorage);
- return *this;
- }
- PublishPacket &PublishPacket::withQOS(Mqtt5::QOS qos) noexcept
- {
- m_qos = qos;
- return *this;
- }
- PublishPacket &PublishPacket::withRetain(bool retain) noexcept
- {
- m_retain = retain;
- return *this;
- }
- PublishPacket &PublishPacket::withTopic(Crt::String topic) noexcept
- {
- m_topicName = std::move(topic);
- return *this;
- }
- PublishPacket &PublishPacket::withPayloadFormatIndicator(PayloadFormatIndicator format) noexcept
- {
- m_payloadFormatIndicator = format;
- return *this;
- }
- PublishPacket &PublishPacket::withMessageExpiryIntervalSec(uint32_t second) noexcept
- {
- m_messageExpiryIntervalSec = second;
- return *this;
- }
- PublishPacket &PublishPacket::withResponseTopic(ByteCursor responseTopic) noexcept
- {
- setPacketStringOptional(m_responseTopic, m_responseTopicString, &responseTopic);
- return *this;
- }
- PublishPacket &PublishPacket::withCorrelationData(ByteCursor correlationData) noexcept
- {
- setPacketByteBufOptional(m_correlationData, m_correlationDataStorage, m_allocator, &correlationData);
- return *this;
- }
- PublishPacket &PublishPacket::withUserProperties(const Vector<UserProperty> &userProperties) noexcept
- {
- m_userProperties = userProperties;
- return *this;
- }
- PublishPacket &PublishPacket::withUserProperties(Vector<UserProperty> &&userProperties) noexcept
- {
- m_userProperties = userProperties;
- return *this;
- }
- PublishPacket &PublishPacket::withUserProperty(UserProperty &&property) noexcept
- {
- m_userProperties.push_back(std::move(property));
- return *this;
- }
- bool PublishPacket::initializeRawOptions(aws_mqtt5_packet_publish_view &raw_options) noexcept
- {
- AWS_ZERO_STRUCT(raw_options);
- raw_options.payload = m_payload;
- raw_options.qos = m_qos;
- raw_options.retain = m_retain;
- raw_options.topic = ByteCursorFromString(m_topicName);
- if (m_payloadFormatIndicator.has_value())
- {
- raw_options.payload_format =
- (aws_mqtt5_payload_format_indicator *)&m_payloadFormatIndicator.value();
- }
- if (m_messageExpiryIntervalSec.has_value())
- {
- raw_options.message_expiry_interval_seconds = &m_messageExpiryIntervalSec.value();
- }
- if (m_responseTopic.has_value())
- {
- raw_options.response_topic = &m_responseTopic.value();
- }
- if (m_correlationData.has_value())
- {
- raw_options.correlation_data = &m_correlationData.value();
- }
- s_AllocateUnderlyingUserProperties(m_userPropertiesStorage, m_userProperties, m_allocator);
- raw_options.user_properties = m_userPropertiesStorage;
- raw_options.user_property_count = m_userProperties.size();
- return true;
- }
- const ByteCursor &PublishPacket::getPayload() const noexcept { return m_payload; }
- Mqtt5::QOS PublishPacket::getQOS() const noexcept { return m_qos; }
- bool PublishPacket::getRetain() const noexcept { return m_retain; }
- const Crt::String &PublishPacket::getTopic() const noexcept { return m_topicName; }
- const Crt::Optional<PayloadFormatIndicator> &PublishPacket::getPayloadFormatIndicator() const noexcept
- {
- return m_payloadFormatIndicator;
- }
- const Crt::Optional<uint32_t> &PublishPacket::getMessageExpiryIntervalSec() const noexcept
- {
- return m_messageExpiryIntervalSec;
- }
- const Crt::Optional<ByteCursor> &PublishPacket::getResponseTopic() const noexcept
- {
- return m_responseTopic;
- }
- const Crt::Optional<ByteCursor> &PublishPacket::getCorrelationData() const noexcept
- {
- return m_correlationData;
- }
- const Crt::Vector<uint32_t> &PublishPacket::getSubscriptionIdentifiers() const noexcept
- {
- return m_subscriptionIdentifiers;
- }
- const Crt::Optional<ByteCursor> &PublishPacket::getContentType() const noexcept { return m_contentType; }
- const Crt::Vector<UserProperty> &PublishPacket::getUserProperties() const noexcept
- {
- return m_userProperties;
- }
- PublishPacket::~PublishPacket()
- {
- aws_byte_buf_clean_up(&m_payloadStorage);
- aws_byte_buf_clean_up(&m_correlationDataStorage);
- aws_byte_buf_clean_up(&m_contentTypeStorage);
- if (m_userProperties.size() > 0)
- {
- aws_mem_release(m_allocator, m_userPropertiesStorage);
- m_userProperties.clear();
- }
- }
- DisconnectPacket::DisconnectPacket(Allocator *allocator) noexcept
- : m_allocator(allocator), m_reasonCode(AWS_MQTT5_DRC_NORMAL_DISCONNECTION),
- m_userPropertiesStorage(nullptr)
- {
- }
- bool DisconnectPacket::initializeRawOptions(aws_mqtt5_packet_disconnect_view &raw_options) noexcept
- {
- AWS_ZERO_STRUCT(raw_options);
- raw_options.reason_code = m_reasonCode;
- if (m_sessionExpiryIntervalSec.has_value())
- {
- raw_options.session_expiry_interval_seconds = &m_sessionExpiryIntervalSec.value();
- }
- if (m_reasonString.has_value())
- {
- m_reasonStringCursor = ByteCursorFromString(m_reasonString.value());
- raw_options.reason_string = &m_reasonStringCursor;
- }
- if (m_serverReference.has_value())
- {
- m_serverReferenceCursor = ByteCursorFromString(m_serverReference.value());
- raw_options.server_reference = &m_serverReferenceCursor;
- }
- s_AllocateUnderlyingUserProperties(m_userPropertiesStorage, m_userProperties, m_allocator);
- raw_options.user_properties = m_userPropertiesStorage;
- raw_options.user_property_count = m_userProperties.size();
- return true;
- }
- DisconnectPacket &DisconnectPacket::withReasonCode(const DisconnectReasonCode code) noexcept
- {
- m_reasonCode = code;
- return *this;
- }
- DisconnectPacket &DisconnectPacket::withSessionExpiryIntervalSec(const uint32_t second) noexcept
- {
- m_sessionExpiryIntervalSec = second;
- return *this;
- }
- DisconnectPacket &DisconnectPacket::withReasonString(Crt::String reason) noexcept
- {
- m_reasonString = std::move(reason);
- return *this;
- }
- DisconnectPacket &DisconnectPacket::withServerReference(Crt::String server_reference) noexcept
- {
- m_serverReference = std::move(server_reference);
- return *this;
- }
- DisconnectPacket &DisconnectPacket::withUserProperties(const Vector<UserProperty> &userProperties) noexcept
- {
- m_userProperties = userProperties;
- return *this;
- }
- DisconnectPacket &DisconnectPacket::withUserProperties(Vector<UserProperty> &&userProperties) noexcept
- {
- m_userProperties = userProperties;
- return *this;
- }
- DisconnectPacket &DisconnectPacket::withUserProperty(UserProperty &&property) noexcept
- {
- m_userProperties.push_back(std::move(property));
- return *this;
- }
- DisconnectReasonCode DisconnectPacket::getReasonCode() const noexcept { return m_reasonCode; }
- const Crt::Optional<uint32_t> &DisconnectPacket::getSessionExpiryIntervalSec() const noexcept
- {
- return m_sessionExpiryIntervalSec;
- }
- const Crt::Optional<Crt::String> &DisconnectPacket::getReasonString() const noexcept
- {
- return m_reasonString;
- }
- const Crt::Optional<Crt::String> &DisconnectPacket::getServerReference() const noexcept
- {
- return m_serverReference;
- }
- const Crt::Vector<UserProperty> &DisconnectPacket::getUserProperties() const noexcept
- {
- return m_userProperties;
- }
- DisconnectPacket::DisconnectPacket(
- const aws_mqtt5_packet_disconnect_view &packet,
- Allocator *allocator) noexcept
- : m_allocator(allocator), m_userPropertiesStorage(nullptr)
- {
- m_reasonCode = packet.reason_code;
- setPacketOptional(m_sessionExpiryIntervalSec, packet.session_expiry_interval_seconds);
- setPacketStringOptional(m_reasonString, packet.reason_string);
- setPacketStringOptional(m_serverReference, packet.server_reference);
- setUserProperties(m_userProperties, packet.user_properties, packet.user_property_count);
- }
- DisconnectPacket::~DisconnectPacket()
- {
- if (m_userPropertiesStorage != nullptr)
- {
- aws_mem_release(m_allocator, m_userPropertiesStorage);
- }
- }
- PubAckPacket::PubAckPacket(const aws_mqtt5_packet_puback_view &packet, Allocator * /*allocator*/) noexcept
- {
- m_reasonCode = packet.reason_code;
- setPacketStringOptional(m_reasonString, packet.reason_string);
- setUserProperties(m_userProperties, packet.user_properties, packet.user_property_count);
- }
- PubAckReasonCode PubAckPacket::getReasonCode() const noexcept { return m_reasonCode; }
- const Crt::Optional<Crt::String> &PubAckPacket::getReasonString() const noexcept { return m_reasonString; }
- const Crt::Vector<UserProperty> &PubAckPacket::getUserProperties() const noexcept
- {
- return m_userProperties;
- }
- ConnAckPacket::ConnAckPacket(
- const aws_mqtt5_packet_connack_view &packet,
- Allocator * /*allocator*/) noexcept
- {
- m_sessionPresent = packet.session_present;
- m_reasonCode = packet.reason_code;
- setPacketOptional(m_sessionExpiryInterval, packet.session_expiry_interval);
- setPacketOptional(m_receiveMaximum, packet.receive_maximum);
- setPacketOptional(m_maximumQOS, packet.maximum_qos);
- setPacketOptional(m_retainAvailable, packet.retain_available);
- setPacketOptional(m_maximumPacketSize, packet.maximum_packet_size);
- setPacketStringOptional(m_assignedClientIdentifier, packet.assigned_client_identifier);
- setPacketOptional(m_topicAliasMaximum, packet.topic_alias_maximum);
- setPacketStringOptional(m_reasonString, packet.reason_string);
- setUserProperties(m_userProperties, packet.user_properties, packet.user_property_count);
- setPacketOptional(m_wildcardSubscriptionsAvaliable, packet.wildcard_subscriptions_available);
- setPacketOptional(m_subscriptionIdentifiersAvaliable, packet.subscription_identifiers_available);
- setPacketOptional(m_sharedSubscriptionsAvaliable, packet.shared_subscriptions_available);
- setPacketOptional(m_serverKeepAlive, packet.server_keep_alive);
- setPacketStringOptional(m_responseInformation, packet.response_information);
- setPacketStringOptional(m_serverReference, packet.server_reference);
- }
- bool ConnAckPacket::getSessionPresent() const noexcept { return m_sessionPresent; }
- ConnectReasonCode ConnAckPacket::getReasonCode() const noexcept { return m_reasonCode; }
- const Crt::Optional<uint32_t> &ConnAckPacket::getSessionExpiryInterval() const noexcept
- {
- return m_sessionExpiryInterval;
- }
- const Crt::Optional<uint16_t> &ConnAckPacket::getReceiveMaximum() const noexcept
- {
- return m_receiveMaximum;
- }
- const Crt::Optional<QOS> &ConnAckPacket::getMaximumQOS() const noexcept { return m_maximumQOS; }
- const Crt::Optional<bool> &ConnAckPacket::getRetainAvailable() const noexcept { return m_retainAvailable; }
- const Crt::Optional<uint32_t> &ConnAckPacket::getMaximumPacketSize() const noexcept
- {
- return m_maximumPacketSize;
- }
- const Crt::Optional<String> &ConnAckPacket::getAssignedClientIdentifier() const noexcept
- {
- return m_assignedClientIdentifier;
- }
- const Crt::Optional<uint16_t> ConnAckPacket::getTopicAliasMaximum() const noexcept
- {
- return m_topicAliasMaximum;
- }
- const Crt::Optional<String> &ConnAckPacket::getReasonString() const noexcept { return m_reasonString; }
- const Vector<UserProperty> &ConnAckPacket::getUserProperty() const noexcept { return m_userProperties; }
- const Crt::Optional<bool> &ConnAckPacket::getWildcardSubscriptionsAvaliable() const noexcept
- {
- return m_wildcardSubscriptionsAvaliable;
- }
- const Crt::Optional<bool> &ConnAckPacket::getSubscriptionIdentifiersAvaliable() const noexcept
- {
- return m_subscriptionIdentifiersAvaliable;
- }
- const Crt::Optional<bool> &ConnAckPacket::getSharedSubscriptionsAvaliable() const noexcept
- {
- return m_sharedSubscriptionsAvaliable;
- }
- const Crt::Optional<uint16_t> &ConnAckPacket::getServerKeepAlive() const noexcept
- {
- return m_serverKeepAlive;
- }
- const Crt::Optional<String> &ConnAckPacket::getResponseInformation() const noexcept
- {
- return m_responseInformation;
- }
- const Crt::Optional<String> &ConnAckPacket::getServerReference() const noexcept
- {
- return m_serverReference;
- }
- Subscription::Subscription(Allocator *allocator)
- : m_allocator(allocator), m_topicFilter(""), m_qos(QOS::AWS_MQTT5_QOS_AT_MOST_ONCE), m_noLocal(false),
- m_retain(false), m_retainHnadlingType(AWS_MQTT5_RHT_SEND_ON_SUBSCRIBE)
- {
- }
- Subscription::Subscription(Crt::String topicFilter, Mqtt5::QOS qos, Allocator *allocator)
- : m_allocator(allocator), m_topicFilter(std::move(topicFilter)), m_qos(qos), m_noLocal(false),
- m_retain(false), m_retainHnadlingType(AWS_MQTT5_RHT_SEND_ON_SUBSCRIBE)
- {
- }
- Subscription &Subscription::withTopicFilter(Crt::String topicFilter) noexcept
- {
- m_topicFilter = std::move(topicFilter);
- return *this;
- }
- Subscription &Subscription::withQOS(Mqtt5::QOS qos) noexcept
- {
- m_qos = qos;
- return *this;
- }
- Subscription &Subscription::withNoLocal(bool noLocal) noexcept
- {
- m_noLocal = noLocal;
- return *this;
- }
- Subscription &Subscription::withRetain(bool retain) noexcept
- {
- m_retain = retain;
- return *this;
- }
- Subscription &Subscription::withRetainHandlingType(RetainHandlingType retainHandlingType) noexcept
- {
- m_retainHnadlingType = retainHandlingType;
- return *this;
- }
- bool Subscription::initializeRawOptions(aws_mqtt5_subscription_view &raw_options) const noexcept
- {
- AWS_ZERO_STRUCT(raw_options);
- raw_options.topic_filter = ByteCursorFromString(m_topicFilter);
- raw_options.no_local = m_noLocal;
- raw_options.qos = m_qos;
- raw_options.retain_as_published = m_retain;
- raw_options.retain_handling_type = m_retainHnadlingType;
- return true;
- }
- Subscription::Subscription(const Subscription &toCopy) noexcept
- : m_allocator(toCopy.m_allocator), m_topicFilter(toCopy.m_topicFilter), m_qos(toCopy.m_qos),
- m_noLocal(toCopy.m_noLocal), m_retain(toCopy.m_retain),
- m_retainHnadlingType(toCopy.m_retainHnadlingType)
- {
- }
- Subscription::Subscription(Subscription &&toMove) noexcept
- : m_allocator(toMove.m_allocator), m_topicFilter(std::move(toMove.m_topicFilter)), m_qos(toMove.m_qos),
- m_noLocal(toMove.m_noLocal), m_retain(toMove.m_retain),
- m_retainHnadlingType(toMove.m_retainHnadlingType)
- {
- }
- Subscription &Subscription::operator=(const Subscription &toCopy) noexcept
- {
- if (&toCopy != this)
- {
- m_allocator = toCopy.m_allocator;
- m_qos = toCopy.m_qos;
- m_topicFilter = toCopy.m_topicFilter;
- m_noLocal = toCopy.m_noLocal;
- m_retain = toCopy.m_retain;
- m_retainHnadlingType = toCopy.m_retainHnadlingType;
- }
- return *this;
- }
- Subscription &Subscription::operator=(Subscription &&toMove) noexcept
- {
- if (&toMove != this)
- {
- m_allocator = toMove.m_allocator;
- m_qos = toMove.m_qos;
- m_topicFilter = std::move(toMove.m_topicFilter);
- m_noLocal = toMove.m_noLocal;
- m_retain = toMove.m_retain;
- m_retainHnadlingType = toMove.m_retainHnadlingType;
- }
- return *this;
- }
- SubscribePacket::SubscribePacket(Allocator *allocator) noexcept
- : m_allocator(allocator), m_subscriptionViewStorage(nullptr), m_userPropertiesStorage(nullptr)
- {
- }
- SubscribePacket &SubscribePacket::withUserProperties(const Vector<UserProperty> &userProperties) noexcept
- {
- m_userProperties = userProperties;
- return *this;
- }
- SubscribePacket &SubscribePacket::withUserProperties(Vector<UserProperty> &&userProperties) noexcept
- {
- m_userProperties = userProperties;
- return *this;
- }
- SubscribePacket &SubscribePacket::withUserProperty(UserProperty &&property) noexcept
- {
- m_userProperties.push_back(std::move(property));
- return *this;
- }
- SubscribePacket &SubscribePacket::withSubscriptionIdentifier(uint32_t identifier) noexcept
- {
- m_subscriptionIdentifier = identifier;
- return *this;
- }
- SubscribePacket &SubscribePacket::withSubscriptions(const Crt::Vector<Subscription> &subscriptions) noexcept
- {
- m_subscriptions = subscriptions;
- return *this;
- }
- SubscribePacket &SubscribePacket::withSubscriptions(Vector<Subscription> &&subscriptions) noexcept
- {
- m_subscriptions = subscriptions;
- return *this;
- }
- SubscribePacket &SubscribePacket::withSubscription(Subscription &&subscription) noexcept
- {
- m_subscriptions.push_back(subscription);
- return *this;
- }
- bool SubscribePacket::initializeRawOptions(aws_mqtt5_packet_subscribe_view &raw_options) noexcept
- {
- AWS_ZERO_STRUCT(raw_options);
- s_AllocateUnderlyingSubscription(m_subscriptionViewStorage, m_subscriptions, m_allocator);
- raw_options.subscription_count = m_subscriptions.size();
- raw_options.subscriptions = m_subscriptionViewStorage;
- s_AllocateUnderlyingUserProperties(m_userPropertiesStorage, m_userProperties, m_allocator);
- raw_options.user_properties = m_userPropertiesStorage;
- raw_options.user_property_count = m_userProperties.size();
- return true;
- }
- SubscribePacket::~SubscribePacket()
- {
- if (m_userPropertiesStorage != nullptr)
- {
- aws_mem_release(m_allocator, m_userPropertiesStorage);
- m_userPropertiesStorage = nullptr;
- }
- if (m_subscriptionViewStorage != nullptr)
- {
- aws_mem_release(m_allocator, m_subscriptionViewStorage);
- m_subscriptionViewStorage = nullptr;
- }
- }
- SubAckPacket::SubAckPacket(const aws_mqtt5_packet_suback_view &packet, Allocator * /*allocator*/) noexcept
- {
- setPacketStringOptional(m_reasonString, packet.reason_string);
- setUserProperties(m_userProperties, packet.user_properties, packet.user_property_count);
- for (size_t i = 0; i < packet.reason_code_count; i++)
- {
- m_reasonCodes.push_back(*(packet.reason_codes + i));
- }
- }
- const Crt::Optional<Crt::String> &SubAckPacket::getReasonString() const noexcept { return m_reasonString; }
- const Crt::Vector<UserProperty> &SubAckPacket::getUserProperties() const noexcept
- {
- return m_userProperties;
- }
- const Crt::Vector<SubAckReasonCode> &SubAckPacket::getReasonCodes() const noexcept { return m_reasonCodes; }
- UnsubscribePacket::UnsubscribePacket(Allocator *allocator) noexcept
- : m_allocator(allocator), m_userPropertiesStorage(nullptr)
- {
- AWS_ZERO_STRUCT(m_topicFiltersList);
- }
- UnsubscribePacket &UnsubscribePacket::withTopicFilter(Crt::String topicFilter) noexcept
- {
- m_topicFilters.push_back(std::move(topicFilter));
- return *this;
- }
- UnsubscribePacket &UnsubscribePacket::withTopicFilters(Crt::Vector<String> topicFilters) noexcept
- {
- m_topicFilters = std::move(topicFilters);
- return *this;
- }
- UnsubscribePacket &UnsubscribePacket::withUserProperties(
- const Vector<UserProperty> &userProperties) noexcept
- {
- m_userProperties = userProperties;
- return *this;
- }
- UnsubscribePacket &UnsubscribePacket::withUserProperties(Vector<UserProperty> &&userProperties) noexcept
- {
- m_userProperties = userProperties;
- return *this;
- }
- UnsubscribePacket &UnsubscribePacket::withUserProperty(UserProperty &&property) noexcept
- {
- m_userProperties.push_back(std::move(property));
- return *this;
- }
- bool UnsubscribePacket::initializeRawOptions(aws_mqtt5_packet_unsubscribe_view &raw_options) noexcept
- {
- AWS_ZERO_STRUCT(raw_options);
- s_AllocateStringVector(m_topicFiltersList, m_topicFilters, m_allocator);
- raw_options.topic_filters = static_cast<aws_byte_cursor *>(m_topicFiltersList.data);
- raw_options.topic_filter_count = m_topicFilters.size();
- s_AllocateUnderlyingUserProperties(m_userPropertiesStorage, m_userProperties, m_allocator);
- raw_options.user_properties = m_userPropertiesStorage;
- raw_options.user_property_count = m_userProperties.size();
- return true;
- }
- UnsubscribePacket::~UnsubscribePacket()
- {
- aws_array_list_clean_up(&m_topicFiltersList);
- AWS_ZERO_STRUCT(m_topicFiltersList);
- if (m_userPropertiesStorage != nullptr)
- {
- aws_mem_release(m_allocator, m_userPropertiesStorage);
- m_userPropertiesStorage = nullptr;
- }
- }
- UnSubAckPacket::UnSubAckPacket(const aws_mqtt5_packet_unsuback_view &packet, Allocator *allocator) noexcept
- {
- (void)allocator;
- setPacketStringOptional(m_reasonString, packet.reason_string);
- for (size_t i = 0; i < packet.reason_code_count; i++)
- {
- m_reasonCodes.push_back(*(packet.reason_codes + i));
- }
- setUserProperties(m_userProperties, packet.user_properties, packet.user_property_count);
- }
- const Crt::Optional<Crt::String> &UnSubAckPacket::getReasonString() const noexcept
- {
- return m_reasonString;
- }
- const Crt::Vector<UserProperty> &UnSubAckPacket::getUserProperties() const noexcept
- {
- return m_userProperties;
- }
- const Crt::Vector<UnSubAckReasonCode> &UnSubAckPacket::getReasonCodes() const noexcept
- {
- return m_reasonCodes;
- }
- NegotiatedSettings::NegotiatedSettings(
- const aws_mqtt5_negotiated_settings &negotiated_settings,
- Allocator *allocator) noexcept
- {
- (void)allocator;
- m_maximumQOS = negotiated_settings.maximum_qos;
- m_sessionExpiryIntervalSec = negotiated_settings.session_expiry_interval;
- m_receiveMaximumFromServer = negotiated_settings.receive_maximum_from_server;
- m_maximumPacketSizeBytes = negotiated_settings.maximum_packet_size_to_server;
- m_serverKeepAliveSec = negotiated_settings.server_keep_alive;
- m_retainAvailable = negotiated_settings.retain_available;
- m_wildcardSubscriptionsAvaliable = negotiated_settings.wildcard_subscriptions_available;
- m_subscriptionIdentifiersAvaliable = negotiated_settings.subscription_identifiers_available;
- m_sharedSubscriptionsAvaliable = negotiated_settings.shared_subscriptions_available;
- m_rejoinedSession = negotiated_settings.rejoined_session;
- m_clientId = Crt::String(
- (const char *)negotiated_settings.client_id_storage.buffer,
- negotiated_settings.client_id_storage.len);
- }
- Mqtt5::QOS NegotiatedSettings::getMaximumQOS() const noexcept { return m_maximumQOS; }
- uint32_t NegotiatedSettings::getSessionExpiryIntervalSec() const noexcept
- {
- return m_sessionExpiryIntervalSec;
- }
- uint16_t NegotiatedSettings::getReceiveMaximumFromServer() const noexcept
- {
- return m_receiveMaximumFromServer;
- }
- uint32_t NegotiatedSettings::getMaximumPacketSizeBytes() const noexcept { return m_maximumPacketSizeBytes; }
- uint16_t NegotiatedSettings::getServerKeepAlive() const noexcept { return m_serverKeepAliveSec; }
- bool NegotiatedSettings::getRetainAvailable() const noexcept { return m_retainAvailable; }
- bool NegotiatedSettings::getWildcardSubscriptionsAvaliable() const noexcept
- {
- return m_wildcardSubscriptionsAvaliable;
- }
- bool NegotiatedSettings::getSubscriptionIdentifiersAvaliable() const noexcept
- {
- return m_subscriptionIdentifiersAvaliable;
- }
- bool NegotiatedSettings::getSharedSubscriptionsAvaliable() const noexcept
- {
- return m_sharedSubscriptionsAvaliable;
- }
- bool NegotiatedSettings::getRejoinedSession() const noexcept { return m_rejoinedSession; }
- const Crt::String &NegotiatedSettings::getClientId() const noexcept { return m_clientId; }
- PublishResult::PublishResult() : m_ack(nullptr), m_errorCode(0) {}
- PublishResult::PublishResult(std::shared_ptr<PubAckPacket> puback) : m_errorCode(0) { m_ack = puback; }
- PublishResult::PublishResult(int error) : m_ack(nullptr), m_errorCode(error) {}
- PublishResult::~PublishResult() noexcept { m_ack.reset(); }
- } // namespace Mqtt5
- } // namespace Crt
- } // namespace Aws
|