123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737738739740741742743744745746747748749750751752753754755756757758759760761762763764765766767768769770771772773774775776777778779780781782783784785786787788789790791792793794795796797798799800801802803804805806807808809810811812813814815816817818819820821822823824825826827828829830831832833834835836837838839840841842843844845846847848849850851852853854855856857858859860861862863864865866867868869870871872873874875876877878879880881882883884885886887888889890891892893894895896897898899900901902903904905906907908909910911912913914915916917918919920921922923924925926927928929930931932933934935936937938939940941942943944945946947948949950951952953954955956957958959960961962963964965966967968969970971972973974975976977978979980981982983984985986987988989990991992993994995996997998999100010011002100310041005100610071008100910101011101210131014101510161017101810191020102110221023102410251026102710281029103010311032103310341035103610371038103910401041 |
- commit 8869ab354d10c071c1e5e33602cc6b7940b4427c
- Author: Timotej S <6674623+underhood@users.noreply.github.com>
- Date: Mon Dec 12 15:46:45 2022 +0700
- Initial support for topic aliases (#12)
-
- Add support for topic alias functionality for PUBLISH packets
- also adds support for parsing all MQTT properties as opposed to just skipping and ignoring them (what we did previously)
- diff --git a/.gitmodules b/.gitmodules
- index 8e778ee080..9e05f79422 100644
- --- a/.gitmodules
- +++ b/.gitmodules
- @@ -4,3 +4,6 @@
- [submodule "MQTT-C"]
- path = MQTT-C
- url = https://github.com/underhood/MQTT-C.git
- +[submodule "c_rhash"]
- + path = c_rhash
- + url = https://github.com/underhood/c_rhash.git
- diff --git a/Makefile b/Makefile
- index b9bba50d93..e8c40e1900 100644
- --- a/Makefile
- +++ b/Makefile
- @@ -16,10 +16,13 @@ CFLAGS = -Wextra -Wall `pkg-config --cflags openssl` `pkg-config --cflags libcry
- BUILD_DIR = build
-
- # dir having our version of mqtt_pal.h must preceede dir of MQTT-C to override this hdr
- -INCLUDES = -Isrc/include -Ic-rbuf/include -Imqtt/include -IMQTT-C/include
- +INCLUDES = -Isrc/include -Ic-rbuf/include -Ic_rhash/include -Imqtt/include -IMQTT-C/include
-
- all: test
-
- +$(BUILD_DIR)/c_rhash.o: c_rhash/src/c_rhash.c c_rhash/src/c_rhash_internal.h c_rhash/include/c_rhash.h
- + $(CC) -o $(BUILD_DIR)/c_rhash.o -c c_rhash/src/c_rhash.c $(CFLAGS) $(INCLUDES)
- +
- c-rbuf/build/ringbuffer.o:
- cd c-rbuf && $(MAKE) build/ringbuffer.o
-
- @@ -41,8 +44,8 @@ $(BUILD_DIR)/mqtt_ng.o: src/mqtt_ng.c src/include/mqtt_ng.h src/include/common_i
- $(BUILD_DIR)/common_public.o: src/common_public.c src/include/common_public.h
- $(CC) -o $(BUILD_DIR)/common_public.o -c src/common_public.c $(CFLAGS) $(INCLUDES)
-
- -libmqttwebsockets.a: $(BUILD_DIR)/mqtt_wss_client.o $(BUILD_DIR)/ws_client.o c-rbuf/build/ringbuffer.o $(BUILD_DIR)/mqtt.o $(BUILD_DIR)/mqtt_wss_log.o $(BUILD_DIR)/mqtt_ng.o $(BUILD_DIR)/common_public.o
- - ar rcs libmqttwebsockets.a $(BUILD_DIR)/mqtt_wss_client.o $(BUILD_DIR)/ws_client.o c-rbuf/build/ringbuffer.o $(BUILD_DIR)/mqtt.o $(BUILD_DIR)/mqtt_wss_log.o $(BUILD_DIR)/mqtt_ng.o $(BUILD_DIR)/common_public.o
- +libmqttwebsockets.a: $(BUILD_DIR)/mqtt_wss_client.o $(BUILD_DIR)/ws_client.o c-rbuf/build/ringbuffer.o $(BUILD_DIR)/c_rhash.o $(BUILD_DIR)/mqtt.o $(BUILD_DIR)/mqtt_wss_log.o $(BUILD_DIR)/mqtt_ng.o $(BUILD_DIR)/common_public.o
- + ar rcs libmqttwebsockets.a $(BUILD_DIR)/mqtt_wss_client.o $(BUILD_DIR)/ws_client.o c-rbuf/build/ringbuffer.o $(BUILD_DIR)/c_rhash.o $(BUILD_DIR)/mqtt.o $(BUILD_DIR)/mqtt_wss_log.o $(BUILD_DIR)/mqtt_ng.o $(BUILD_DIR)/common_public.o
-
- test: $(BUILD_DIR)/test.o libmqttwebsockets.a
- $(CC) -o test $(BUILD_DIR)/test.o libmqttwebsockets.a `pkg-config --libs openssl` -lpthread $(CFLAGS)
- diff --git a/c_rhash b/c_rhash
- new file mode 160000
- index 0000000000..98bc3c8ffb
- --- /dev/null
- +++ b/c_rhash
- @@ -0,0 +1 @@
- +Subproject commit 98bc3c8ffb872d83b40e2dfb624810d1f619e82d
- diff --git a/src/include/common_public.h b/src/include/common_public.h
- index ddc6379bb7..a855737f9e 100644
- --- a/src/include/common_public.h
- +++ b/src/include/common_public.h
- @@ -1,5 +1,8 @@
- #ifndef MQTT_WEBSOCKETS_COMMON_PUBLIC_H
- #define MQTT_WEBSOCKETS_COMMON_PUBLIC_H
- +
- +#include <stddef.h>
- +
- /* free_fnc_t in general (in whatever function or struct it is used)
- * decides how the related data will be handled.
- * - If NULL the data are copied internally (causing malloc and later free)
- @@ -15,4 +18,16 @@ typedef void (*free_fnc_t)(void *ptr);
- void _caller_responsibility(void *ptr);
- #define CALLER_RESPONSIBILITY ((free_fnc_t)&_caller_responsibility)
-
- +struct mqtt_ng_stats {
- + size_t tx_bytes_queued;
- + int tx_messages_queued;
- + int tx_messages_sent;
- + int rx_messages_rcvd;
- + size_t tx_buffer_used;
- + size_t tx_buffer_free;
- + size_t tx_buffer_size;
- + // part of transaction buffer that containes mesages we can free alredy during the garbage colleciton step
- + size_t tx_buffer_reclaimable;
- +};
- +
- #endif /* MQTT_WEBSOCKETS_COMMON_PUBLIC_H */
- diff --git a/src/include/mqtt_constants.h b/src/include/mqtt_constants.h
- index 55a8a0f7ef..1db4989762 100644
- --- a/src/include/mqtt_constants.h
- +++ b/src/include/mqtt_constants.h
- @@ -42,4 +42,60 @@
-
- #define MQTT_MAX_CLIENT_ID 23 /* [MQTT-3.1.3-5] */
-
- +// MQTT Property identifiers [MQTT-2.2.2.2]
- +#define MQTT_PROP_PAYLOAD_FMT_INDICATOR 0x01
- +#define MQTT_PROP_PAYLOAD_FMT_INDICATOR_NAME "Payload Format Indicator"
- +#define MQTT_PROP_MSG_EXPIRY_INTERVAL 0x02
- +#define MQTT_PROP_MSG_EXPIRY_INTERVAL_NAME "Message Expiry Interval"
- +#define MQTT_PROP_CONTENT_TYPE 0x03
- +#define MQTT_PROP_CONTENT_TYPE_NAME "Content Type"
- +#define MQTT_PROP_RESPONSE_TOPIC 0x08
- +#define MQTT_PROP_RESPONSE_TOPIC_NAME "Response Topic"
- +#define MQTT_PROP_CORRELATION_DATA 0x09
- +#define MQTT_PROP_CORRELATION_DATA_NAME "Correlation Data"
- +#define MQTT_PROP_SUB_IDENTIFIER 0x0B
- +#define MQTT_PROP_SUB_IDENTIFIER_NAME "Subscription Identifier"
- +#define MQTT_PROP_SESSION_EXPIRY_INTERVAL 0x11
- +#define MQTT_PROP_SESSION_EXPIRY_INTERVAL_NAME "Session Expiry Interval"
- +#define MQTT_PROP_ASSIGNED_CLIENT_ID 0x12
- +#define MQTT_PROP_ASSIGNED_CLIENT_ID_NAME "Assigned Client Identifier"
- +#define MQTT_PROP_SERVER_KEEP_ALIVE 0x13
- +#define MQTT_PROP_SERVER_KEEP_ALIVE_NAME "Server Keep Alive"
- +#define MQTT_PROP_AUTH_METHOD 0x15
- +#define MQTT_PROP_AUTH_METHOD_NAME "Authentication Method"
- +#define MQTT_PROP_AUTH_DATA 0x16
- +#define MQTT_PROP_AUTH_DATA_NAME "Authentication Data"
- +#define MQTT_PROP_REQ_PROBLEM_INFO 0x17
- +#define MQTT_PROP_REQ_PROBLEM_INFO_NAME "Request Problem Information"
- +#define MQTT_PROP_WILL_DELAY_INTERVAL 0x18
- +#define MQTT_PROP_WIIL_DELAY_INTERVAL_NAME "Will Delay Interval"
- +#define MQTT_PROP_REQ_RESP_INFORMATION 0x19
- +#define MQTT_PROP_REQ_RESP_INFORMATION_NAME "Request Response Information"
- +#define MQTT_PROP_RESP_INFORMATION 0x1A
- +#define MQTT_PROP_RESP_INFORMATION_NAME "Response Information"
- +#define MQTT_PROP_SERVER_REF 0x1C
- +#define MQTT_PROP_SERVER_REF_NAME "Server Reference"
- +#define MQTT_PROP_REASON_STR 0x1F
- +#define MQTT_PROP_REASON_STR_NAME "Reason String"
- +#define MQTT_PROP_RECEIVE_MAX 0x21
- +#define MQTT_PROP_RECEIVE_MAX_NAME "Receive Maximum"
- +#define MQTT_PROP_TOPIC_ALIAS_MAX 0x22
- +#define MQTT_PROP_TOPIC_ALIAS_MAX_NAME "Topic Alias Maximum"
- +#define MQTT_PROP_TOPIC_ALIAS 0x23
- +#define MQTT_PROP_TOPIC_ALIAS_NAME "Topic Alias"
- +#define MQTT_PROP_MAX_QOS 0x24
- +#define MQTT_PROP_MAX_QOS_NAME "Maximum QoS"
- +#define MQTT_PROP_RETAIN_AVAIL 0x25
- +#define MQTT_PROP_RETAIN_AVAIL_NAME "Retain Available"
- +#define MQTT_PROP_USR 0x26
- +#define MQTT_PROP_USR_NAME "User Property"
- +#define MQTT_PROP_MAX_PKT_SIZE 0x27
- +#define MQTT_PROP_MAX_PKT_SIZE_NAME "Maximum Packet Size"
- +#define MQTT_PROP_WILDCARD_SUB_AVAIL 0x28
- +#define MQTT_PROP_WILDCARD_SUB_AVAIL_NAME "Wildcard Subscription Available"
- +#define MQTT_PROP_SUB_ID_AVAIL 0x29
- +#define MQTT_PROP_SUB_ID_AVAIL_NAME "Subscription Identifier Available"
- +#define MQTT_PROP_SHARED_SUB_AVAIL 0x2A
- +#define MQTT_PROP_SHARED_SUB_AVAIL_NAME "Shared Subscription Available"
- +
- #endif /* MQTT_CONSTANTS_H */
- diff --git a/src/include/mqtt_ng.h b/src/include/mqtt_ng.h
- index bdfa590a3b..c42832068e 100644
- --- a/src/include/mqtt_ng.h
- +++ b/src/include/mqtt_ng.h
- @@ -90,3 +90,7 @@ int mqtt_ng_sync(struct mqtt_ng_client *client);
- time_t mqtt_ng_last_send_time(struct mqtt_ng_client *client);
-
- void mqtt_ng_set_max_mem(struct mqtt_ng_client *client, size_t bytes);
- +
- +void mqtt_ng_get_stats(struct mqtt_ng_client *client, struct mqtt_ng_stats *stats);
- +
- +int mqtt_ng_set_topic_alias(struct mqtt_ng_client *client, const char *topic);
- diff --git a/src/include/mqtt_wss_client.h b/src/include/mqtt_wss_client.h
- index fa0f708a92..e9d47134e4 100644
- --- a/src/include/mqtt_wss_client.h
- +++ b/src/include/mqtt_wss_client.h
- @@ -157,6 +157,8 @@ int mqtt_wss_publish5(mqtt_wss_client client,
- uint8_t publish_flags,
- uint16_t *packet_id);
-
- +int mqtt_wss_set_topic_alias(mqtt_wss_client client, const char *topic);
- +
- /* Subscribes to MQTT topic
- * @param client mqtt_wss_client which should do the subscription
- * @param topic MQTT topic to subscribe to
- @@ -165,9 +167,18 @@ int mqtt_wss_publish5(mqtt_wss_client client,
- */
- int mqtt_wss_subscribe(mqtt_wss_client client, char *topic, int max_qos_level);
-
- +
- struct mqtt_wss_stats {
- uint64_t bytes_tx;
- uint64_t bytes_rx;
- +#ifdef MQTT_WSS_CPUSTATS
- + uint64_t time_keepalive;
- + uint64_t time_read_socket;
- + uint64_t time_write_socket;
- + uint64_t time_process_websocket;
- + uint64_t time_process_mqtt;
- +#endif
- + struct mqtt_ng_stats mqtt;
- };
-
- struct mqtt_wss_stats mqtt_wss_get_stats(mqtt_wss_client client);
- diff --git a/src/mqtt_ng.c b/src/mqtt_ng.c
- index 4c442ca77f..6003159102 100644
- --- a/src/mqtt_ng.c
- +++ b/src/mqtt_ng.c
- @@ -4,6 +4,8 @@
- #include <pthread.h>
- #include <inttypes.h>
-
- +#include "c_rhash.h"
- +
- #include "common_internal.h"
- #include "mqtt_constants.h"
- #include "mqtt_wss_log.h"
- @@ -92,6 +94,7 @@ enum varhdr_parser_state {
- MQTT_PARSE_VARHDR_OPTIONAL_REASON_CODE,
- MQTT_PARSE_VARHDR_PROPS,
- MQTT_PARSE_VARHDR_TOPICNAME,
- + MQTT_PARSE_VARHDR_POST_TOPICNAME,
- MQTT_PARSE_VARHDR_PACKET_ID,
- MQTT_PARSE_REASONCODES,
- MQTT_PARSE_PAYLOAD
- @@ -103,22 +106,54 @@ struct mqtt_vbi_parser_ctx {
- uint32_t result;
- };
-
- +enum mqtt_datatype {
- + MQTT_TYPE_UNKNOWN = 0,
- + MQTT_TYPE_UINT_8,
- + MQTT_TYPE_UINT_16,
- + MQTT_TYPE_UINT_32,
- + MQTT_TYPE_VBI,
- + MQTT_TYPE_STR,
- + MQTT_TYPE_STR_PAIR,
- + MQTT_TYPE_BIN
- +};
- +
- struct mqtt_property {
- - uint32_t id;
- + uint8_t id;
- + enum mqtt_datatype type;
- + union {
- + char *strings[2];
- + void *bindata;
- + uint8_t uint8;
- + uint16_t uint16;
- + uint32_t uint32;
- + } data;
- + size_t bindata_len;
- struct mqtt_property *next;
- };
-
- enum mqtt_properties_parser_state {
- PROPERTIES_LENGTH = 0,
- - PROPERTY_ID
- + PROPERTY_CREATE,
- + PROPERTY_ID,
- + PROPERTY_TYPE_UINT8,
- + PROPERTY_TYPE_UINT16,
- + PROPERTY_TYPE_UINT32,
- + PROPERTY_TYPE_STR_BIN_LEN,
- + PROPERTY_TYPE_STR,
- + PROPERTY_TYPE_BIN,
- + PROPERTY_TYPE_VBI,
- + PROPERTY_NEXT
- };
-
- struct mqtt_properties_parser_ctx {
- enum mqtt_properties_parser_state state;
- struct mqtt_property *head;
- + struct mqtt_property *tail;
- uint32_t properties_length;
- + uint32_t vbi_length;
- struct mqtt_vbi_parser_ctx vbi_parser_ctx;
- size_t bytes_consumed;
- + int str_idx;
- };
-
- struct mqtt_connack {
- @@ -174,6 +209,17 @@ struct mqtt_ng_parser {
- } mqtt_packet;
- };
-
- +struct topic_alias_data {
- + uint16_t idx;
- + uint32_t usage_count;
- +};
- +
- +struct topic_aliases_data {
- + c_rhash stoi_dict;
- + uint32_t idx_max;
- + uint32_t idx_assigned;
- + int used;
- +};
-
- struct mqtt_ng_client {
- struct transaction_buffer main_buffer;
- @@ -199,6 +245,12 @@ struct mqtt_ng_client {
- void (*msg_callback)(const char *topic, const void *msg, size_t msglen, int qos);
-
- unsigned int ping_pending:1;
- +
- + struct mqtt_ng_stats stats;
- + pthread_mutex_t stats_mutex;
- +
- + struct topic_aliases_data tx_topic_aliases;
- + c_rhash rx_aliases;
- };
-
- char pingreq[] = { MQTT_CPT_PINGREQ << 4, 0x00 };
- @@ -356,6 +408,7 @@ static inline enum memory_mode ptr2memory_mode(void * ptr) {
- }
-
- #define frag_is_marked_for_gc(frag) ((frag->flags & BUFFER_FRAG_GARBAGE_COLLECT) || ((frag->flags & BUFFER_FRAG_GARBAGE_COLLECT_ON_SEND) && frag->sent == frag->len))
- +#define FRAG_SIZE_IN_BUFFER(frag) (sizeof(struct buffer_fragment) + ((frag->flags & BUFFER_FRAG_DATA_EXTERNAL) ? 0 : frag->len))
-
- static void buffer_frag_free_data(struct buffer_fragment *frag)
- {
- @@ -581,6 +634,12 @@ struct mqtt_ng_client *mqtt_ng_init(struct mqtt_ng_init *settings)
- client->connack_callback = settings->connack_callback;
- client->msg_callback = settings->msg_callback;
-
- + pthread_mutex_init(&client->stats_mutex, NULL);
- + client->tx_topic_aliases.stoi_dict = c_rhash_new(0);
- + client->tx_topic_aliases.idx_max = UINT16_MAX;
- +
- + client->rx_aliases = c_rhash_new(UINT16_MAX >> 8);
- +
- return client;
- }
-
- @@ -589,9 +648,27 @@ static inline uint8_t get_control_packet_type(uint8_t first_hdr_byte)
- return first_hdr_byte >> 4;
- }
-
- +static inline void mqtt_ng_destroy_rx_alias_hash(struct mqtt_ng_client *client)
- +{
- + c_rhash_iter_t i = C_RHASH_ITER_T_INITIALIZER;
- + uint64_t stored_key;
- + void *to_free;
- + while(!c_rhash_iter_uint64_keys(client->rx_aliases, &i, &stored_key)) {
- + c_rhash_get_ptr_by_uint64(client->rx_aliases, stored_key, &to_free);
- + mw_free(to_free);
- + }
- + c_rhash_destroy(client->rx_aliases);
- +}
- +
- void mqtt_ng_destroy(struct mqtt_ng_client *client)
- {
- transaction_buffer_destroy(&client->main_buffer);
- + pthread_mutex_destroy(&client->stats_mutex);
- +
- + c_rhash_destroy(client->tx_topic_aliases.stoi_dict);
- +
- + mqtt_ng_destroy_rx_alias_hash(client);
- +
- mw_free(client);
- }
-
- @@ -645,7 +722,7 @@ static size_t mqtt_ng_connect_size(struct mqtt_auth_properties *auth,
- + sizeof(mqtt_protocol_name_frag) /* Proto Name and Version */
- + 1 /* Connect Flags */
- + 2 /* Keep Alive */
- - + 1 /* 3.1.2.11.1 Property Length - for now 0, TODO TODO*/;
- + + 4 /* 3.1.2.11.1 Property Length - for now fixed to only Topic Alias Maximum, TODO TODO*/;
-
- // CONNECT payload. 3.1.3
- if (auth->client_id)
- @@ -720,21 +797,26 @@ static int _optimized_add(struct header_buffer *buf, mqtt_wss_log_ctx_t log_ctx,
- return 0;
- }
-
- -#define TRY_GENERATE_MESSAGE(generator_function, buf, log_ctx, max_mem, ...) \
- - int rc = generator_function(buf, log_ctx, ##__VA_ARGS__); \
- +#define TRY_GENERATE_MESSAGE(generator_function, client, ...) \
- + int rc = generator_function(&client->main_buffer, client->log, ##__VA_ARGS__); \
- if (rc == MQTT_NG_MSGGEN_BUFFER_OOM) { \
- - LOCK_HDR_BUFFER(buf); \
- - transaction_buffer_garbage_collect((buf), log_ctx); \
- - UNLOCK_HDR_BUFFER(buf); \
- - rc = generator_function(buf, log_ctx, ##__VA_ARGS__); \
- - if (rc == MQTT_NG_MSGGEN_BUFFER_OOM && max_mem) { \
- - LOCK_HDR_BUFFER(buf); \
- - transaction_buffer_grow((buf), log_ctx, GROWTH_FACTOR, max_mem); \
- - UNLOCK_HDR_BUFFER(buf); \
- - rc = generator_function(buf, log_ctx, ##__VA_ARGS__); \
- + LOCK_HDR_BUFFER(&client->main_buffer); \
- + transaction_buffer_garbage_collect((&client->main_buffer), client->log); \
- + UNLOCK_HDR_BUFFER(&client->main_buffer); \
- + rc = generator_function(&client->main_buffer, client->log, ##__VA_ARGS__); \
- + if (rc == MQTT_NG_MSGGEN_BUFFER_OOM && client->max_mem_bytes) { \
- + LOCK_HDR_BUFFER(&client->main_buffer); \
- + transaction_buffer_grow((&client->main_buffer), client->log, GROWTH_FACTOR, client->max_mem_bytes); \
- + UNLOCK_HDR_BUFFER(&client->main_buffer); \
- + rc = generator_function(&client->main_buffer, client->log, ##__VA_ARGS__); \
- } \
- if (rc == MQTT_NG_MSGGEN_BUFFER_OOM) \
- - mws_error(log_ctx, "%s failed to generate message due to insufficient buffer space (line %d)", __FUNCTION__, __LINE__); \
- + mws_error(client->log, "%s failed to generate message due to insufficient buffer space (line %d)", __FUNCTION__, __LINE__); \
- + } \
- + if (rc == MQTT_NG_MSGGEN_OK) { \
- + pthread_mutex_lock(&client->stats_mutex); \
- + client->stats.tx_messages_queued++; \
- + pthread_mutex_unlock(&client->stats_mutex); \
- } \
- return rc;
-
- @@ -829,10 +911,13 @@ mqtt_msg_data mqtt_ng_generate_connect(struct transaction_buffer *trx_buf,
-
- PACK_2B_INT(&trx_buf->hdr_buffer, keep_alive, frag);
-
- - // TODO Property Length [MQTT-3.1.3.2.1] temporary fixed 0
- - *WRITE_POS(frag) = 0;
- + // TODO Property Length [MQTT-3.1.3.2.1] temporary fixed to 3 (one property topic alias max)
- + DATA_ADVANCE(&trx_buf->hdr_buffer, uint32_to_mqtt_vbi(3, WRITE_POS(frag)), frag);
- + *WRITE_POS(frag) = MQTT_PROP_TOPIC_ALIAS_MAX;
- DATA_ADVANCE(&trx_buf->hdr_buffer, 1, frag);
-
- + PACK_2B_INT(&trx_buf->hdr_buffer, 65535, frag);
- +
- // [MQTT-3.1.3.1] Client identifier
- CHECK_BYTES_AVAILABLE(&trx_buf->hdr_buffer, 2, goto fail_rollback);
- PACK_2B_INT(&trx_buf->hdr_buffer, strlen(auth->client_id), frag);
- @@ -910,6 +995,15 @@ int mqtt_ng_connect(struct mqtt_ng_client *client,
- return 1;
- }
-
- + pthread_mutex_lock(&client->stats_mutex);
- + if (clean_start)
- + client->stats.tx_messages_queued++;
- + else
- + client->stats.tx_messages_queued = 1;
- + client->stats.tx_messages_sent = 0;
- + client->stats.rx_messages_rcvd = 0;
- + pthread_mutex_unlock(&client->stats_mutex);
- +
- client->client_state = CONNECT_PENDING;
- return 0;
- }
- @@ -921,13 +1015,19 @@ uint16_t get_unused_packet_id() {
- }
-
- static inline size_t mqtt_ng_publish_size(const char *topic,
- - size_t msg_len)
- + size_t msg_len,
- + uint16_t topic_id)
- {
- - return 2 /* Topic Name Length */
- - + strlen(topic)
- + size_t retval = 2 /* Topic Name Length */
- + + (topic == NULL ? 0 : strlen(topic))
- + 2 /* Packet identifier */
- - + 1 /* Properties Length TODO for now fixed 0 */
- + + 1 /* Properties Length TODO for now fixed to 1 property */
- + msg_len;
- +
- + if (topic_id)
- + retval += 3;
- +
- + return retval;
- }
-
- int mqtt_ng_generate_publish(struct transaction_buffer *trx_buf,
- @@ -938,13 +1038,14 @@ int mqtt_ng_generate_publish(struct transaction_buffer *trx_buf,
- free_fnc_t msg_free,
- size_t msg_len,
- uint8_t publish_flags,
- - uint16_t *packet_id)
- + uint16_t *packet_id,
- + uint16_t topic_alias)
- {
- // >> START THE RODEO <<
- transaction_buffer_transaction_start(trx_buf);
-
- // Calculate the resulting message size sans fixed MQTT header
- - size_t size = mqtt_ng_publish_size(topic, msg_len);
- + size_t size = mqtt_ng_publish_size(topic, msg_len, topic_alias);
-
- // Start generating the message
- struct buffer_fragment *frag = NULL;
- @@ -967,10 +1068,12 @@ int mqtt_ng_generate_publish(struct transaction_buffer *trx_buf,
-
- // MQTT Variable Header
- // [MQTT-3.3.2.1]
- - PACK_2B_INT(&trx_buf->hdr_buffer, strlen(topic), frag);
- - if (_optimized_add(&trx_buf->hdr_buffer, log_ctx, topic, strlen(topic), topic_free, &frag))
- - goto fail_rollback;
- - BUFFER_TRANSACTION_NEW_FRAG(&trx_buf->hdr_buffer, 0, frag, goto fail_rollback);
- + PACK_2B_INT(&trx_buf->hdr_buffer, topic == NULL ? 0 : strlen(topic), frag);
- + if (topic != NULL) {
- + if (_optimized_add(&trx_buf->hdr_buffer, log_ctx, topic, strlen(topic), topic_free, &frag))
- + goto fail_rollback;
- + BUFFER_TRANSACTION_NEW_FRAG(&trx_buf->hdr_buffer, 0, frag, goto fail_rollback);
- + }
-
- // [MQTT-3.3.2.2]
- mqtt_msg->packet_id = get_unused_packet_id();
- @@ -978,9 +1081,16 @@ int mqtt_ng_generate_publish(struct transaction_buffer *trx_buf,
- PACK_2B_INT(&trx_buf->hdr_buffer, mqtt_msg->packet_id, frag);
-
- // [MQTT-3.3.2.3.1] TODO Property Length for now fixed 0
- - *WRITE_POS(frag) = 0;
- + *WRITE_POS(frag) = topic_alias ? 3 : 0;
- DATA_ADVANCE(&trx_buf->hdr_buffer, 1, frag);
-
- + if(topic_alias) {
- + *WRITE_POS(frag) = MQTT_PROP_TOPIC_ALIAS;
- + DATA_ADVANCE(&trx_buf->hdr_buffer, 1, frag);
- +
- + PACK_2B_INT(&trx_buf->hdr_buffer, topic_alias, frag);
- + }
- +
- if( (frag = buffer_new_frag(&trx_buf->hdr_buffer, BUFFER_FRAG_DATA_EXTERNAL)) == NULL )
- goto fail_rollback;
-
- @@ -1006,7 +1116,20 @@ int mqtt_ng_publish(struct mqtt_ng_client *client,
- uint8_t publish_flags,
- uint16_t *packet_id)
- {
- - TRY_GENERATE_MESSAGE(mqtt_ng_generate_publish, &client->main_buffer, client->log, client->max_mem_bytes, topic, topic_free, msg, msg_free, msg_len, publish_flags, packet_id);
- + struct topic_alias_data *alias = NULL;
- + c_rhash_get_ptr_by_str(client->tx_topic_aliases.stoi_dict, topic, (void**)&alias);
- +
- + uint16_t topic_id = 0;
- +
- + if (alias != NULL) {
- + topic_id = alias->idx;
- + if (alias->usage_count++) {
- + topic = NULL;
- + topic_free = NULL;
- + }
- + }
- +
- + TRY_GENERATE_MESSAGE(mqtt_ng_generate_publish, client, topic, topic_free, msg, msg_free, msg_len, publish_flags, packet_id, topic_id);
- }
-
- static inline size_t mqtt_ng_subscribe_size(struct mqtt_sub *subs, size_t sub_count)
- @@ -1072,7 +1195,7 @@ fail_rollback:
-
- int mqtt_ng_subscribe(struct mqtt_ng_client *client, struct mqtt_sub *subs, size_t sub_count)
- {
- - TRY_GENERATE_MESSAGE(mqtt_ng_generate_subscribe, &client->main_buffer, client->log, client->max_mem_bytes, subs, sub_count);
- + TRY_GENERATE_MESSAGE(mqtt_ng_generate_subscribe, client, subs, sub_count);
- }
-
- int mqtt_ng_generate_disconnect(struct transaction_buffer *trx_buf, mqtt_wss_log_ctx_t log_ctx, uint8_t reason_code)
- @@ -1116,7 +1239,7 @@ fail_rollback:
-
- int mqtt_ng_disconnect(struct mqtt_ng_client *client, uint8_t reason_code)
- {
- - TRY_GENERATE_MESSAGE(mqtt_ng_generate_disconnect, &client->main_buffer, client->log, client->max_mem_bytes, reason_code);
- + TRY_GENERATE_MESSAGE(mqtt_ng_generate_disconnect, client, reason_code);
- }
-
- static int mqtt_generate_puback(struct transaction_buffer *trx_buf, mqtt_wss_log_ctx_t log_ctx, uint16_t packet_id, uint8_t reason_code)
- @@ -1161,7 +1284,7 @@ fail_rollback:
-
- static int mqtt_ng_puback(struct mqtt_ng_client *client, uint16_t packet_id, uint8_t reason_code)
- {
- - TRY_GENERATE_MESSAGE(mqtt_generate_puback, &client->main_buffer, client->log, client->max_mem_bytes, packet_id, reason_code);
- + TRY_GENERATE_MESSAGE(mqtt_generate_puback, client, packet_id, reason_code);
- }
-
- int mqtt_ng_ping(struct mqtt_ng_client *client)
- @@ -1212,12 +1335,81 @@ static int vbi_parser_parse(struct mqtt_vbi_parser_ctx *ctx, rbuf_t data, mqtt_w
- static void mqtt_properties_parser_ctx_reset(struct mqtt_properties_parser_ctx *ctx)
- {
- ctx->state = PROPERTIES_LENGTH;
- - ctx->head = NULL;
- + while (ctx->head) {
- + struct mqtt_property *f = ctx->head;
- + ctx->head = ctx->head->next;
- + if (f->type == MQTT_TYPE_STR || f->type == MQTT_TYPE_STR_PAIR)
- + mw_free(f->data.strings[0]);
- + if (f->type == MQTT_TYPE_STR_PAIR)
- + mw_free(f->data.strings[1]);
- + if (f->type == MQTT_TYPE_BIN)
- + mw_free(f->data.bindata);
- + mw_free(f);
- + }
- + ctx->tail = NULL;
- ctx->properties_length = 0;
- ctx->bytes_consumed = 0;
- vbi_parser_reset_ctx(&ctx->vbi_parser_ctx);
- }
-
- +struct mqtt_property_type {
- + uint8_t id;
- + enum mqtt_datatype datatype;
- + const char* name;
- +};
- +
- +const struct mqtt_property_type mqtt_property_types[] = {
- + { .id = MQTT_PROP_TOPIC_ALIAS, .name = MQTT_PROP_TOPIC_ALIAS_NAME, .datatype = MQTT_TYPE_UINT_16 },
- +
- + { .id = MQTT_PROP_PAYLOAD_FMT_INDICATOR, .name = MQTT_PROP_PAYLOAD_FMT_INDICATOR_NAME, .datatype = MQTT_TYPE_UINT_8 },
- + { .id = MQTT_PROP_MSG_EXPIRY_INTERVAL, .name = MQTT_PROP_MSG_EXPIRY_INTERVAL_NAME, .datatype = MQTT_TYPE_UINT_32 },
- + { .id = MQTT_PROP_CONTENT_TYPE, .name = MQTT_PROP_CONTENT_TYPE_NAME, .datatype = MQTT_TYPE_STR },
- + { .id = MQTT_PROP_RESPONSE_TOPIC, .name = MQTT_PROP_RESPONSE_TOPIC_NAME, .datatype = MQTT_TYPE_STR },
- + { .id = MQTT_PROP_CORRELATION_DATA, .name = MQTT_PROP_CORRELATION_DATA_NAME, .datatype = MQTT_TYPE_BIN },
- + { .id = MQTT_PROP_SUB_IDENTIFIER, .name = MQTT_PROP_SUB_IDENTIFIER_NAME, .datatype = MQTT_TYPE_VBI },
- + { .id = MQTT_PROP_SESSION_EXPIRY_INTERVAL, .name = MQTT_PROP_SESSION_EXPIRY_INTERVAL_NAME, .datatype = MQTT_TYPE_UINT_32 },
- + { .id = MQTT_PROP_ASSIGNED_CLIENT_ID, .name = MQTT_PROP_ASSIGNED_CLIENT_ID_NAME, .datatype = MQTT_TYPE_STR },
- + { .id = MQTT_PROP_SERVER_KEEP_ALIVE, .name = MQTT_PROP_SERVER_KEEP_ALIVE_NAME, .datatype = MQTT_TYPE_UINT_16 },
- + { .id = MQTT_PROP_AUTH_METHOD, .name = MQTT_PROP_AUTH_METHOD_NAME, .datatype = MQTT_TYPE_STR },
- + { .id = MQTT_PROP_AUTH_DATA, .name = MQTT_PROP_AUTH_DATA_NAME, .datatype = MQTT_TYPE_BIN },
- + { .id = MQTT_PROP_REQ_PROBLEM_INFO, .name = MQTT_PROP_REQ_PROBLEM_INFO_NAME, .datatype = MQTT_TYPE_UINT_8 },
- + { .id = MQTT_PROP_WILL_DELAY_INTERVAL, .name = MQTT_PROP_WIIL_DELAY_INTERVAL_NAME, .datatype = MQTT_TYPE_UINT_32 },
- + { .id = MQTT_PROP_REQ_RESP_INFORMATION, .name = MQTT_PROP_REQ_RESP_INFORMATION_NAME, .datatype = MQTT_TYPE_UINT_8 },
- + { .id = MQTT_PROP_RESP_INFORMATION, .name = MQTT_PROP_RESP_INFORMATION_NAME, .datatype = MQTT_TYPE_STR },
- + { .id = MQTT_PROP_SERVER_REF, .name = MQTT_PROP_SERVER_REF_NAME, .datatype = MQTT_TYPE_STR },
- + { .id = MQTT_PROP_REASON_STR, .name = MQTT_PROP_REASON_STR_NAME, .datatype = MQTT_TYPE_STR },
- + { .id = MQTT_PROP_RECEIVE_MAX, .name = MQTT_PROP_RECEIVE_MAX_NAME, .datatype = MQTT_TYPE_UINT_16 },
- + { .id = MQTT_PROP_TOPIC_ALIAS_MAX, .name = MQTT_PROP_TOPIC_ALIAS_MAX_NAME, .datatype = MQTT_TYPE_UINT_16 },
- + // MQTT_PROP_TOPIC_ALIAS is first as it is most often used
- + { .id = MQTT_PROP_MAX_QOS, .name = MQTT_PROP_MAX_QOS_NAME, .datatype = MQTT_TYPE_UINT_8 },
- + { .id = MQTT_PROP_RETAIN_AVAIL, .name = MQTT_PROP_RETAIN_AVAIL_NAME, .datatype = MQTT_TYPE_UINT_8 },
- + { .id = MQTT_PROP_USR, .name = MQTT_PROP_USR_NAME, .datatype = MQTT_TYPE_STR_PAIR },
- + { .id = MQTT_PROP_MAX_PKT_SIZE, .name = MQTT_PROP_MAX_PKT_SIZE_NAME, .datatype = MQTT_TYPE_UINT_32 },
- + { .id = MQTT_PROP_WILDCARD_SUB_AVAIL, .name = MQTT_PROP_WILDCARD_SUB_AVAIL_NAME, .datatype = MQTT_TYPE_UINT_8 },
- + { .id = MQTT_PROP_SUB_ID_AVAIL, .name = MQTT_PROP_SUB_ID_AVAIL_NAME, .datatype = MQTT_TYPE_UINT_8 },
- + { .id = MQTT_PROP_SHARED_SUB_AVAIL, .name = MQTT_PROP_SHARED_SUB_AVAIL_NAME, .datatype = MQTT_TYPE_UINT_8 },
- + { .id = 0, .name = NULL, .datatype = MQTT_TYPE_UNKNOWN }
- +};
- +
- +static int get_property_type_by_id(uint8_t property_id) {
- + for (int i = 0; mqtt_property_types[i].datatype != MQTT_TYPE_UNKNOWN; i++) {
- + if (mqtt_property_types[i].id == property_id)
- + return mqtt_property_types[i].datatype;
- + }
- + return MQTT_TYPE_UNKNOWN;
- +}
- +
- +struct mqtt_property *get_property_by_id(struct mqtt_property *props, uint8_t property_id)
- +{
- + while (props) {
- + if (props->id == property_id) {
- + return props;
- + }
- + props = props->next;
- + }
- + return NULL;
- +}
- +
- // Parses [MQTT-2.2.2]
- static int parse_properties_array(struct mqtt_properties_parser_ctx *ctx, rbuf_t data, mqtt_wss_log_ctx_t log)
- {
- @@ -1228,19 +1420,128 @@ static int parse_properties_array(struct mqtt_properties_parser_ctx *ctx, rbuf_t
- if (rc == MQTT_NG_CLIENT_PARSE_DONE) {
- ctx->properties_length = ctx->vbi_parser_ctx.result;
- ctx->bytes_consumed += ctx->vbi_parser_ctx.bytes;
- + ctx->vbi_length = ctx->vbi_parser_ctx.bytes;
- if (!ctx->properties_length)
- return MQTT_NG_CLIENT_PARSE_DONE;
- - ctx->state = PROPERTY_ID;
- - vbi_parser_reset_ctx(&ctx->vbi_parser_ctx);
- + ctx->state = PROPERTY_CREATE;
- break;
- }
- return rc;
- + case PROPERTY_CREATE:
- + BUF_READ_CHECK_AT_LEAST(data, 1);
- + struct mqtt_property *prop = mw_calloc(1, sizeof(struct mqtt_property));
- + if (ctx->head == NULL) {
- + ctx->head = prop;
- + ctx->tail = prop;
- + } else {
- + ctx->tail->next = prop;
- + ctx->tail = ctx->tail->next;
- + }
- + ctx->state = PROPERTY_ID;
- + /* FALLTHROUGH */
- case PROPERTY_ID:
- - // TODO ignore for now... just skip
- - rbuf_bump_tail(data, ctx->properties_length);
- - ctx->bytes_consumed += ctx->properties_length;
- + rbuf_pop(data, (char*)&ctx->tail->id, 1);
- + ctx->bytes_consumed += 1;
- + ctx->tail->type = get_property_type_by_id(ctx->tail->id);
- + switch (ctx->tail->type) {
- + case MQTT_TYPE_UINT_16:
- + ctx->state = PROPERTY_TYPE_UINT16;
- + break;
- + case MQTT_TYPE_UINT_32:
- + ctx->state = PROPERTY_TYPE_UINT32;
- + break;
- + case MQTT_TYPE_UINT_8:
- + ctx->state = PROPERTY_TYPE_UINT8;
- + break;
- + case MQTT_TYPE_VBI:
- + ctx->state = PROPERTY_TYPE_VBI;
- + vbi_parser_reset_ctx(&ctx->vbi_parser_ctx);
- + break;
- + case MQTT_TYPE_STR:
- + case MQTT_TYPE_STR_PAIR:
- + ctx->str_idx = 0;
- + /* FALLTHROUGH */
- + case MQTT_TYPE_BIN:
- + ctx->state = PROPERTY_TYPE_STR_BIN_LEN;
- + break;
- + default:
- + mws_error(log, "Unsupported property type %d for property id %d.", (int)ctx->tail->type, (int)ctx->tail->id);
- + return MQTT_NG_CLIENT_PROTOCOL_ERROR;
- + }
- + break;
- + case PROPERTY_TYPE_STR_BIN_LEN:
- + BUF_READ_CHECK_AT_LEAST(data, sizeof(uint16_t));
- + rbuf_pop(data, (char*)&ctx->tail->bindata_len, sizeof(uint16_t));
- + ctx->tail->bindata_len = be16toh(ctx->tail->bindata_len);
- + ctx->bytes_consumed += 2;
- + switch (ctx->tail->type) {
- + case MQTT_TYPE_BIN:
- + ctx->state = PROPERTY_TYPE_BIN;
- + break;
- + case MQTT_TYPE_STR:
- + case MQTT_TYPE_STR_PAIR:
- + ctx->state = PROPERTY_TYPE_STR;
- + break;
- + default:
- + mws_error(log, "Unexpected datatype in PROPERTY_TYPE_STR_BIN_LEN %d", (int)ctx->tail->type);
- + return MQTT_NG_CLIENT_INTERNAL_ERROR;
- + }
- + break;
- + case PROPERTY_TYPE_STR:
- + BUF_READ_CHECK_AT_LEAST(data, ctx->tail->bindata_len);
- + ctx->tail->data.strings[ctx->str_idx] = mw_malloc(ctx->tail->bindata_len + 1);
- + rbuf_pop(data, ctx->tail->data.strings[ctx->str_idx], ctx->tail->bindata_len);
- + ctx->tail->data.strings[ctx->str_idx][ctx->tail->bindata_len] = 0;
- + ctx->str_idx++;
- + ctx->bytes_consumed += ctx->tail->bindata_len;
- + if (ctx->tail->type == MQTT_TYPE_STR_PAIR && ctx->str_idx < 2) {
- + ctx->state = PROPERTY_TYPE_STR_BIN_LEN;
- + break;
- + }
- + ctx->state = PROPERTY_NEXT;
- + break;
- + case PROPERTY_TYPE_BIN:
- + BUF_READ_CHECK_AT_LEAST(data, ctx->tail->bindata_len);
- + ctx->tail->data.bindata = mw_malloc(ctx->tail->bindata_len);
- + rbuf_pop(data, ctx->tail->data.bindata, ctx->tail->bindata_len);
- + ctx->bytes_consumed += ctx->tail->bindata_len;
- + ctx->state = PROPERTY_NEXT;
- + break;
- + case PROPERTY_TYPE_VBI:
- + rc = vbi_parser_parse(&ctx->vbi_parser_ctx, data, log);
- + if (rc == MQTT_NG_CLIENT_PARSE_DONE) {
- + ctx->tail->data.uint32 = ctx->vbi_parser_ctx.result;
- + ctx->bytes_consumed += ctx->vbi_parser_ctx.bytes;
- + ctx->state = PROPERTY_NEXT;
- + break;
- + }
- + return rc;
- + case PROPERTY_TYPE_UINT8:
- + BUF_READ_CHECK_AT_LEAST(data, sizeof(uint8_t));
- + rbuf_pop(data, (char*)&ctx->tail->data.uint8, sizeof(uint8_t));
- + ctx->bytes_consumed += sizeof(uint8_t);
- + ctx->state = PROPERTY_NEXT;
- + break;
- + case PROPERTY_TYPE_UINT32:
- + BUF_READ_CHECK_AT_LEAST(data, sizeof(uint32_t));
- + rbuf_pop(data, (char*)&ctx->tail->data.uint32, sizeof(uint32_t));
- + ctx->tail->data.uint32 = be32toh(ctx->tail->data.uint32);
- + ctx->bytes_consumed += sizeof(uint32_t);
- + ctx->state = PROPERTY_NEXT;
- + break;
- + case PROPERTY_TYPE_UINT16:
- + BUF_READ_CHECK_AT_LEAST(data, sizeof(uint16_t));
- + rbuf_pop(data, (char*)&ctx->tail->data.uint16, sizeof(uint16_t));
- + ctx->tail->data.uint16 = be16toh(ctx->tail->data.uint16);
- + ctx->bytes_consumed += sizeof(uint16_t);
- + ctx->state = PROPERTY_NEXT;
- + /* FALLTHROUGH */
- + case PROPERTY_NEXT:
- + if (ctx->properties_length > ctx->bytes_consumed - ctx->vbi_length) {
- + ctx->state = PROPERTY_CREATE;
- + break;
- + } else
- return MQTT_NG_CLIENT_PARSE_DONE;
- -// rc = vbi_parser_parse(&ctx->vbi_parser_ctx, data, log);
- }
- return MQTT_NG_CLIENT_OK_CALL_AGAIN;
- }
- @@ -1381,20 +1682,28 @@ static int parse_publish_varhdr(struct mqtt_ng_client *client)
- switch (parser->varhdr_state) {
- case MQTT_PARSE_VARHDR_INITIAL:
- BUF_READ_CHECK_AT_LEAST(parser->received_data, 2);
- + publish->topic = NULL;
- publish->qos = ((parser->mqtt_control_packet_type >> 1) & 0x03);
- rbuf_pop(parser->received_data, (char*)&publish->topic_len, 2);
- publish->topic_len = be16toh(publish->topic_len);
- + parser->mqtt_parsed_len = 2;
- + if (!publish->topic_len) {
- + parser->varhdr_state = MQTT_PARSE_VARHDR_POST_TOPICNAME;
- + break;
- + }
- publish->topic = mw_calloc(1, publish->topic_len + 1 /* add 0x00 */);
- if (publish->topic == NULL)
- return MQTT_NG_CLIENT_OOM;
- parser->varhdr_state = MQTT_PARSE_VARHDR_TOPICNAME;
- - parser->mqtt_parsed_len = 2;
- /* FALLTHROUGH */
- case MQTT_PARSE_VARHDR_TOPICNAME:
- // TODO check empty topic can be valid? In which case we have to skip this step
- BUF_READ_CHECK_AT_LEAST(parser->received_data, publish->topic_len);
- rbuf_pop(parser->received_data, publish->topic, publish->topic_len);
- parser->mqtt_parsed_len += publish->topic_len;
- + parser->varhdr_state = MQTT_PARSE_VARHDR_POST_TOPICNAME;
- + /* FALLTHROUGH */
- + case MQTT_PARSE_VARHDR_POST_TOPICNAME:
- mqtt_properties_parser_ctx_reset(&parser->properties_parser);
- if (!publish->qos) { // PacketID present only for QOS > 0 [MQTT-3.3.2.2]
- parser->varhdr_state = MQTT_PARSE_VARHDR_PROPS;
- @@ -1589,6 +1898,11 @@ static int send_fragment(struct mqtt_ng_client *client) {
-
- if (frag->flags & BUFFER_FRAG_MQTT_PACKET_TAIL) {
- client->time_of_last_send = time(NULL);
- + pthread_mutex_lock(&client->stats_mutex);
- + if (client->main_buffer.sending_frag != &ping_frag)
- + client->stats.tx_messages_queued--;
- + client->stats.tx_messages_sent++;
- + pthread_mutex_unlock(&client->stats_mutex);
- client->main_buffer.sending_frag = NULL;
- return 1;
- }
- @@ -1654,6 +1968,10 @@ int handle_incoming_traffic(struct mqtt_ng_client *client)
- #ifdef MQTT_DEBUG_VERBOSE
- DEBUG("MQTT Packet Parsed Successfully!");
- #endif
- + pthread_mutex_lock(&client->stats_mutex);
- + client->stats.rx_messages_rcvd++;
- + pthread_mutex_unlock(&client->stats_mutex);
- +
- switch (get_control_packet_type(client->parser.mqtt_control_packet_type)) {
- case MQTT_CPT_CONNACK:
- #ifdef MQTT_DEBUG_VERBOSE
- @@ -1713,9 +2031,30 @@ int handle_incoming_traffic(struct mqtt_ng_client *client)
- ERROR("Error generating PUBACK reply for PUBLISH");
- return rc;
- }
- + struct mqtt_property *prop;
- + if ( (prop = get_property_by_id(client->parser.properties_parser.head, MQTT_PROP_TOPIC_ALIAS)) != NULL ) {
- + // Topic Alias property was sent from server
- + void *topic_ptr;
- + if (!c_rhash_get_ptr_by_uint64(client->rx_aliases, prop->data.uint8, &topic_ptr)) {
- + if (pub->topic != NULL) {
- + ERROR("We do not yet support topic alias reassignment");
- + return MQTT_NG_CLIENT_NOT_IMPL_YET;
- + }
- + pub->topic = topic_ptr;
- + } else {
- + if (pub->topic == NULL) {
- + ERROR("Topic alias with id %d unknown and topic not set by server!", prop->data.uint8);
- + return MQTT_NG_CLIENT_PROTOCOL_ERROR;
- + }
- + c_rhash_insert_uint64_ptr(client->rx_aliases, prop->data.uint8, pub->topic);
- + }
- + }
- if (client->msg_callback)
- client->msg_callback(pub->topic, pub->data, pub->data_len, pub->qos);
- - mw_free(pub->topic);
- + // in case we have property topic alias and we have topic we take over the string
- + // and add pointer to it into topic alias list
- + if (prop == NULL)
- + mw_free(pub->topic);
- mw_free(pub->data);
- return MQTT_NG_CLIENT_WANT_WRITE;
- case MQTT_CPT_DISCONNECT:
- @@ -1767,3 +2106,42 @@ void mqtt_ng_set_max_mem(struct mqtt_ng_client *client, size_t bytes)
- {
- client->max_mem_bytes = bytes;
- }
- +
- +void mqtt_ng_get_stats(struct mqtt_ng_client *client, struct mqtt_ng_stats *stats)
- +{
- + pthread_mutex_lock(&client->stats_mutex);
- + memcpy(stats, &client->stats, sizeof(struct mqtt_ng_stats));
- + pthread_mutex_unlock(&client->stats_mutex);
- +
- + stats->tx_bytes_queued = 0;
- + stats->tx_buffer_reclaimable = 0;
- +
- + LOCK_HDR_BUFFER(&client->main_buffer);
- + stats->tx_buffer_used = BUFFER_BYTES_USED(&client->main_buffer.hdr_buffer);
- + stats->tx_buffer_free = BUFFER_BYTES_AVAILABLE(&client->main_buffer.hdr_buffer);
- + stats->tx_buffer_size = client->main_buffer.hdr_buffer.size;
- + struct buffer_fragment *frag = BUFFER_FIRST_FRAG(&client->main_buffer.hdr_buffer);
- + while (frag) {
- + stats->tx_bytes_queued += frag->len - frag->sent;
- + if (frag_is_marked_for_gc(frag))
- + stats->tx_buffer_reclaimable += FRAG_SIZE_IN_BUFFER(frag);
- +
- + frag = frag->next;
- + }
- + UNLOCK_HDR_BUFFER(&client->main_buffer);
- +}
- +
- +int mqtt_ng_set_topic_alias(struct mqtt_ng_client *client, const char *topic)
- +{
- + if (client->tx_topic_aliases.idx_assigned >= client->tx_topic_aliases.idx_max) {
- + mws_error(client->log, "Tx topic alias indexes were exhausted (current version of the library doesn't support reassigning yet. Feel free to contribute.");
- + return 0; //0 is not a valid topic alias
- + }
- + struct topic_alias_data *alias = mw_malloc(sizeof(struct topic_alias_data));
- + alias->idx = ++client->tx_topic_aliases.idx_assigned;
- + alias->usage_count = 0;
- +
- + c_rhash_insert_str_ptr(client->tx_topic_aliases.stoi_dict, topic, (void*)alias);
- +
- + return alias->idx;
- +}
- diff --git a/src/mqtt_wss_client.c b/src/mqtt_wss_client.c
- index 8fee62bdb7..a87be00d79 100644
- --- a/src/mqtt_wss_client.c
- +++ b/src/mqtt_wss_client.c
- @@ -1087,6 +1087,17 @@ static inline long long int t_till_next_keepalive_ms(mqtt_wss_client client)
- return(next_mqtt_keep_alive - (MQTT_PAL_TIME() * SEC_TO_MSEC));
- }
-
- +#ifdef MQTT_WSS_CPUSTATS
- +static inline uint64_t mqtt_wss_now_usec(mqtt_wss_client client) {
- + struct timespec ts;
- + if(clock_gettime(CLOCK_MONOTONIC, &ts) == -1) {
- + mws_error(client->log, "clock_gettime(CLOCK_MONOTONIC, ×pec) failed.");
- + return 0;
- + }
- + return (uint64_t)ts.tv_sec * USEC_PER_SEC + (ts.tv_nsec % NSEC_PER_SEC) / NSEC_PER_USEC;
- +}
- +#endif
- +
- int mqtt_wss_service(mqtt_wss_client client, int timeout_ms)
- {
- char *ptr;
- @@ -1094,6 +1105,11 @@ int mqtt_wss_service(mqtt_wss_client client, int timeout_ms)
- int ret;
- int send_keepalive = 0;
-
- +#ifdef MQTT_WSS_CPUSTATS
- + uint64_t t1,t2;
- + t1 = mqtt_wss_now_usec(client);
- +#endif
- +
- #ifdef DEBUG_ULTRA_VERBOSE
- mws_debug(client->log, ">>>>> mqtt_wss_service <<<<<");
- mws_debug(client->log, "Waiting for events: %s%s%s",
- @@ -1112,6 +1128,11 @@ int mqtt_wss_service(mqtt_wss_client client, int timeout_ms)
- send_keepalive = 1;
- }
-
- +#ifdef MQTT_WSS_CPUSTATS
- + t2 = mqtt_wss_now_usec(client);
- + client->stats.time_keepalive += t2 - t1;
- +#endif
- +
- if ((ret = poll(client->poll_fds, 2, timeout_ms >= 0 ? timeout_ms : -1)) < 0) {
- if (errno == EINTR) {
- mws_warn(client->log, "poll interrupted by EINTR");
- @@ -1134,6 +1155,10 @@ int mqtt_wss_service(mqtt_wss_client client, int timeout_ms)
- return client->last_ec;
- }
-
- +#ifdef MQTT_WSS_CPUSTATS
- + t1 = mqtt_wss_now_usec(client);
- +#endif
- +
- if (ret == 0) {
- if (send_keepalive) {
- // otherwise we shortened the timeout ourselves to take care of
- @@ -1152,6 +1177,11 @@ int mqtt_wss_service(mqtt_wss_client client, int timeout_ms)
- }
- }
-
- +#ifdef MQTT_WSS_CPUSTATS
- + t2 = mqtt_wss_now_usec(client);
- + client->stats.time_keepalive += t2 - t1;
- +#endif
- +
- client->poll_fds[POLLFD_SOCKET].events = 0;
-
- if ((ptr = rbuf_get_linear_insert_range(client->ws_client->buf_read, &size))) {
- @@ -1180,6 +1210,11 @@ int mqtt_wss_service(mqtt_wss_client client, int timeout_ms)
- }
- }
-
- +#ifdef MQTT_WSS_CPUSTATS
- + t1 = mqtt_wss_now_usec(client);
- + client->stats.time_read_socket += t1 - t2;
- +#endif
- +
- ret = ws_client_process(client->ws_client);
- switch(ret) {
- case WS_CLIENT_PROTOCOL_ERROR:
- @@ -1194,6 +1229,11 @@ int mqtt_wss_service(mqtt_wss_client client, int timeout_ms)
- return MQTT_WSS_ERR_CONN_DROP;
- }
-
- +#ifdef MQTT_WSS_CPUSTATS
- + t2 = mqtt_wss_now_usec(client);
- + client->stats.time_process_websocket += t2 - t1;
- +#endif
- +
- if (handle_mqtt(client))
- return MQTT_WSS_ERR_PROTO_MQTT;
-
- @@ -1202,6 +1242,11 @@ int mqtt_wss_service(mqtt_wss_client client, int timeout_ms)
- client->poll_fds[POLLFD_SOCKET].events |= POLLOUT;
- }
-
- +#ifdef MQTT_WSS_CPUSTATS
- + t1 = mqtt_wss_now_usec(client);
- + client->stats.time_process_mqtt += t1 - t2;
- +#endif
- +
- if ((ptr = rbuf_get_linear_read_range(client->ws_client->buf_write, &size))) {
- #ifdef DEBUG_ULTRA_VERBOSE
- mws_debug(client->log, "Have data to write to SSL");
- @@ -1234,6 +1279,11 @@ int mqtt_wss_service(mqtt_wss_client client, int timeout_ms)
- if(client->poll_fds[POLLFD_PIPE].revents & POLLIN)
- util_clear_pipe(client->write_notif_pipe[PIPE_READ_END]);
-
- +#ifdef MQTT_WSS_CPUSTATS
- + t2 = mqtt_wss_now_usec(client);
- + client->stats.time_write_socket += t2 - t1;
- +#endif
- +
- return MQTT_WSS_OK;
- }
-
- @@ -1504,9 +1554,18 @@ struct mqtt_wss_stats mqtt_wss_get_stats(mqtt_wss_client client)
- current = client->stats;
- memset(&client->stats, 0, sizeof(client->stats));
- pthread_mutex_unlock(&client->stat_lock);
- + mqtt_ng_get_stats(client->mqtt.mqtt_ctx, ¤t.mqtt);
- return current;
- }
-
- +int mqtt_wss_set_topic_alias(mqtt_wss_client client, const char *topic)
- +{
- + if(!client->internal_mqtt)
- + return 0;
- +
- + return mqtt_ng_set_topic_alias(client->mqtt.mqtt_ctx, topic);
- +}
- +
- #ifdef MQTT_WSS_DEBUG
- void mqtt_wss_set_SSL_CTX_keylog_cb(mqtt_wss_client client, void (*ssl_ctx_keylog_cb)(const SSL *ssl, const char *line))
- {
|