mqtt_websockets 1.3 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737738739740741742743744745746747748749750751752753754755756757758759760761762763764765766767768769770771772773774775776777778779780781782783784785786787788789790791792793794795796797798799800801802803804805806807808809810811812813814815816817818819820821822823824825826827828829830831832833834835836837838839840841842843844845846847848849850851852853854855856857858859860861862863864865866867868869870871872873874875876877878879880881882883884885886887888889890891892893894895896897898899900901902903904905906907908909910911912913914915916917918919920921922923924925926927928929930931932933934935936937938939940941942943944945946947948949950951952953954955956957958959960961962963964965966967968969970971972973974975976977978979980981982983984985986987988989990991992993994995996997998999100010011002100310041005100610071008100910101011101210131014101510161017101810191020102110221023102410251026102710281029103010311032103310341035103610371038103910401041
  1. commit 8869ab354d10c071c1e5e33602cc6b7940b4427c
  2. Author: Timotej S <6674623+underhood@users.noreply.github.com>
  3. Date: Mon Dec 12 15:46:45 2022 +0700
  4. Initial support for topic aliases (#12)
  5. Add support for topic alias functionality for PUBLISH packets
  6. also adds support for parsing all MQTT properties as opposed to just skipping and ignoring them (what we did previously)
  7. diff --git a/.gitmodules b/.gitmodules
  8. index 8e778ee080..9e05f79422 100644
  9. --- a/.gitmodules
  10. +++ b/.gitmodules
  11. @@ -4,3 +4,6 @@
  12. [submodule "MQTT-C"]
  13. path = MQTT-C
  14. url = https://github.com/underhood/MQTT-C.git
  15. +[submodule "c_rhash"]
  16. + path = c_rhash
  17. + url = https://github.com/underhood/c_rhash.git
  18. diff --git a/Makefile b/Makefile
  19. index b9bba50d93..e8c40e1900 100644
  20. --- a/Makefile
  21. +++ b/Makefile
  22. @@ -16,10 +16,13 @@ CFLAGS = -Wextra -Wall `pkg-config --cflags openssl` `pkg-config --cflags libcry
  23. BUILD_DIR = build
  24. # dir having our version of mqtt_pal.h must preceede dir of MQTT-C to override this hdr
  25. -INCLUDES = -Isrc/include -Ic-rbuf/include -Imqtt/include -IMQTT-C/include
  26. +INCLUDES = -Isrc/include -Ic-rbuf/include -Ic_rhash/include -Imqtt/include -IMQTT-C/include
  27. all: test
  28. +$(BUILD_DIR)/c_rhash.o: c_rhash/src/c_rhash.c c_rhash/src/c_rhash_internal.h c_rhash/include/c_rhash.h
  29. + $(CC) -o $(BUILD_DIR)/c_rhash.o -c c_rhash/src/c_rhash.c $(CFLAGS) $(INCLUDES)
  30. +
  31. c-rbuf/build/ringbuffer.o:
  32. cd c-rbuf && $(MAKE) build/ringbuffer.o
  33. @@ -41,8 +44,8 @@ $(BUILD_DIR)/mqtt_ng.o: src/mqtt_ng.c src/include/mqtt_ng.h src/include/common_i
  34. $(BUILD_DIR)/common_public.o: src/common_public.c src/include/common_public.h
  35. $(CC) -o $(BUILD_DIR)/common_public.o -c src/common_public.c $(CFLAGS) $(INCLUDES)
  36. -libmqttwebsockets.a: $(BUILD_DIR)/mqtt_wss_client.o $(BUILD_DIR)/ws_client.o c-rbuf/build/ringbuffer.o $(BUILD_DIR)/mqtt.o $(BUILD_DIR)/mqtt_wss_log.o $(BUILD_DIR)/mqtt_ng.o $(BUILD_DIR)/common_public.o
  37. - ar rcs libmqttwebsockets.a $(BUILD_DIR)/mqtt_wss_client.o $(BUILD_DIR)/ws_client.o c-rbuf/build/ringbuffer.o $(BUILD_DIR)/mqtt.o $(BUILD_DIR)/mqtt_wss_log.o $(BUILD_DIR)/mqtt_ng.o $(BUILD_DIR)/common_public.o
  38. +libmqttwebsockets.a: $(BUILD_DIR)/mqtt_wss_client.o $(BUILD_DIR)/ws_client.o c-rbuf/build/ringbuffer.o $(BUILD_DIR)/c_rhash.o $(BUILD_DIR)/mqtt.o $(BUILD_DIR)/mqtt_wss_log.o $(BUILD_DIR)/mqtt_ng.o $(BUILD_DIR)/common_public.o
  39. + ar rcs libmqttwebsockets.a $(BUILD_DIR)/mqtt_wss_client.o $(BUILD_DIR)/ws_client.o c-rbuf/build/ringbuffer.o $(BUILD_DIR)/c_rhash.o $(BUILD_DIR)/mqtt.o $(BUILD_DIR)/mqtt_wss_log.o $(BUILD_DIR)/mqtt_ng.o $(BUILD_DIR)/common_public.o
  40. test: $(BUILD_DIR)/test.o libmqttwebsockets.a
  41. $(CC) -o test $(BUILD_DIR)/test.o libmqttwebsockets.a `pkg-config --libs openssl` -lpthread $(CFLAGS)
  42. diff --git a/c_rhash b/c_rhash
  43. new file mode 160000
  44. index 0000000000..98bc3c8ffb
  45. --- /dev/null
  46. +++ b/c_rhash
  47. @@ -0,0 +1 @@
  48. +Subproject commit 98bc3c8ffb872d83b40e2dfb624810d1f619e82d
  49. diff --git a/src/include/common_public.h b/src/include/common_public.h
  50. index ddc6379bb7..a855737f9e 100644
  51. --- a/src/include/common_public.h
  52. +++ b/src/include/common_public.h
  53. @@ -1,5 +1,8 @@
  54. #ifndef MQTT_WEBSOCKETS_COMMON_PUBLIC_H
  55. #define MQTT_WEBSOCKETS_COMMON_PUBLIC_H
  56. +
  57. +#include <stddef.h>
  58. +
  59. /* free_fnc_t in general (in whatever function or struct it is used)
  60. * decides how the related data will be handled.
  61. * - If NULL the data are copied internally (causing malloc and later free)
  62. @@ -15,4 +18,16 @@ typedef void (*free_fnc_t)(void *ptr);
  63. void _caller_responsibility(void *ptr);
  64. #define CALLER_RESPONSIBILITY ((free_fnc_t)&_caller_responsibility)
  65. +struct mqtt_ng_stats {
  66. + size_t tx_bytes_queued;
  67. + int tx_messages_queued;
  68. + int tx_messages_sent;
  69. + int rx_messages_rcvd;
  70. + size_t tx_buffer_used;
  71. + size_t tx_buffer_free;
  72. + size_t tx_buffer_size;
  73. + // part of transaction buffer that containes mesages we can free alredy during the garbage colleciton step
  74. + size_t tx_buffer_reclaimable;
  75. +};
  76. +
  77. #endif /* MQTT_WEBSOCKETS_COMMON_PUBLIC_H */
  78. diff --git a/src/include/mqtt_constants.h b/src/include/mqtt_constants.h
  79. index 55a8a0f7ef..1db4989762 100644
  80. --- a/src/include/mqtt_constants.h
  81. +++ b/src/include/mqtt_constants.h
  82. @@ -42,4 +42,60 @@
  83. #define MQTT_MAX_CLIENT_ID 23 /* [MQTT-3.1.3-5] */
  84. +// MQTT Property identifiers [MQTT-2.2.2.2]
  85. +#define MQTT_PROP_PAYLOAD_FMT_INDICATOR 0x01
  86. +#define MQTT_PROP_PAYLOAD_FMT_INDICATOR_NAME "Payload Format Indicator"
  87. +#define MQTT_PROP_MSG_EXPIRY_INTERVAL 0x02
  88. +#define MQTT_PROP_MSG_EXPIRY_INTERVAL_NAME "Message Expiry Interval"
  89. +#define MQTT_PROP_CONTENT_TYPE 0x03
  90. +#define MQTT_PROP_CONTENT_TYPE_NAME "Content Type"
  91. +#define MQTT_PROP_RESPONSE_TOPIC 0x08
  92. +#define MQTT_PROP_RESPONSE_TOPIC_NAME "Response Topic"
  93. +#define MQTT_PROP_CORRELATION_DATA 0x09
  94. +#define MQTT_PROP_CORRELATION_DATA_NAME "Correlation Data"
  95. +#define MQTT_PROP_SUB_IDENTIFIER 0x0B
  96. +#define MQTT_PROP_SUB_IDENTIFIER_NAME "Subscription Identifier"
  97. +#define MQTT_PROP_SESSION_EXPIRY_INTERVAL 0x11
  98. +#define MQTT_PROP_SESSION_EXPIRY_INTERVAL_NAME "Session Expiry Interval"
  99. +#define MQTT_PROP_ASSIGNED_CLIENT_ID 0x12
  100. +#define MQTT_PROP_ASSIGNED_CLIENT_ID_NAME "Assigned Client Identifier"
  101. +#define MQTT_PROP_SERVER_KEEP_ALIVE 0x13
  102. +#define MQTT_PROP_SERVER_KEEP_ALIVE_NAME "Server Keep Alive"
  103. +#define MQTT_PROP_AUTH_METHOD 0x15
  104. +#define MQTT_PROP_AUTH_METHOD_NAME "Authentication Method"
  105. +#define MQTT_PROP_AUTH_DATA 0x16
  106. +#define MQTT_PROP_AUTH_DATA_NAME "Authentication Data"
  107. +#define MQTT_PROP_REQ_PROBLEM_INFO 0x17
  108. +#define MQTT_PROP_REQ_PROBLEM_INFO_NAME "Request Problem Information"
  109. +#define MQTT_PROP_WILL_DELAY_INTERVAL 0x18
  110. +#define MQTT_PROP_WIIL_DELAY_INTERVAL_NAME "Will Delay Interval"
  111. +#define MQTT_PROP_REQ_RESP_INFORMATION 0x19
  112. +#define MQTT_PROP_REQ_RESP_INFORMATION_NAME "Request Response Information"
  113. +#define MQTT_PROP_RESP_INFORMATION 0x1A
  114. +#define MQTT_PROP_RESP_INFORMATION_NAME "Response Information"
  115. +#define MQTT_PROP_SERVER_REF 0x1C
  116. +#define MQTT_PROP_SERVER_REF_NAME "Server Reference"
  117. +#define MQTT_PROP_REASON_STR 0x1F
  118. +#define MQTT_PROP_REASON_STR_NAME "Reason String"
  119. +#define MQTT_PROP_RECEIVE_MAX 0x21
  120. +#define MQTT_PROP_RECEIVE_MAX_NAME "Receive Maximum"
  121. +#define MQTT_PROP_TOPIC_ALIAS_MAX 0x22
  122. +#define MQTT_PROP_TOPIC_ALIAS_MAX_NAME "Topic Alias Maximum"
  123. +#define MQTT_PROP_TOPIC_ALIAS 0x23
  124. +#define MQTT_PROP_TOPIC_ALIAS_NAME "Topic Alias"
  125. +#define MQTT_PROP_MAX_QOS 0x24
  126. +#define MQTT_PROP_MAX_QOS_NAME "Maximum QoS"
  127. +#define MQTT_PROP_RETAIN_AVAIL 0x25
  128. +#define MQTT_PROP_RETAIN_AVAIL_NAME "Retain Available"
  129. +#define MQTT_PROP_USR 0x26
  130. +#define MQTT_PROP_USR_NAME "User Property"
  131. +#define MQTT_PROP_MAX_PKT_SIZE 0x27
  132. +#define MQTT_PROP_MAX_PKT_SIZE_NAME "Maximum Packet Size"
  133. +#define MQTT_PROP_WILDCARD_SUB_AVAIL 0x28
  134. +#define MQTT_PROP_WILDCARD_SUB_AVAIL_NAME "Wildcard Subscription Available"
  135. +#define MQTT_PROP_SUB_ID_AVAIL 0x29
  136. +#define MQTT_PROP_SUB_ID_AVAIL_NAME "Subscription Identifier Available"
  137. +#define MQTT_PROP_SHARED_SUB_AVAIL 0x2A
  138. +#define MQTT_PROP_SHARED_SUB_AVAIL_NAME "Shared Subscription Available"
  139. +
  140. #endif /* MQTT_CONSTANTS_H */
  141. diff --git a/src/include/mqtt_ng.h b/src/include/mqtt_ng.h
  142. index bdfa590a3b..c42832068e 100644
  143. --- a/src/include/mqtt_ng.h
  144. +++ b/src/include/mqtt_ng.h
  145. @@ -90,3 +90,7 @@ int mqtt_ng_sync(struct mqtt_ng_client *client);
  146. time_t mqtt_ng_last_send_time(struct mqtt_ng_client *client);
  147. void mqtt_ng_set_max_mem(struct mqtt_ng_client *client, size_t bytes);
  148. +
  149. +void mqtt_ng_get_stats(struct mqtt_ng_client *client, struct mqtt_ng_stats *stats);
  150. +
  151. +int mqtt_ng_set_topic_alias(struct mqtt_ng_client *client, const char *topic);
  152. diff --git a/src/include/mqtt_wss_client.h b/src/include/mqtt_wss_client.h
  153. index fa0f708a92..e9d47134e4 100644
  154. --- a/src/include/mqtt_wss_client.h
  155. +++ b/src/include/mqtt_wss_client.h
  156. @@ -157,6 +157,8 @@ int mqtt_wss_publish5(mqtt_wss_client client,
  157. uint8_t publish_flags,
  158. uint16_t *packet_id);
  159. +int mqtt_wss_set_topic_alias(mqtt_wss_client client, const char *topic);
  160. +
  161. /* Subscribes to MQTT topic
  162. * @param client mqtt_wss_client which should do the subscription
  163. * @param topic MQTT topic to subscribe to
  164. @@ -165,9 +167,18 @@ int mqtt_wss_publish5(mqtt_wss_client client,
  165. */
  166. int mqtt_wss_subscribe(mqtt_wss_client client, char *topic, int max_qos_level);
  167. +
  168. struct mqtt_wss_stats {
  169. uint64_t bytes_tx;
  170. uint64_t bytes_rx;
  171. +#ifdef MQTT_WSS_CPUSTATS
  172. + uint64_t time_keepalive;
  173. + uint64_t time_read_socket;
  174. + uint64_t time_write_socket;
  175. + uint64_t time_process_websocket;
  176. + uint64_t time_process_mqtt;
  177. +#endif
  178. + struct mqtt_ng_stats mqtt;
  179. };
  180. struct mqtt_wss_stats mqtt_wss_get_stats(mqtt_wss_client client);
  181. diff --git a/src/mqtt_ng.c b/src/mqtt_ng.c
  182. index 4c442ca77f..6003159102 100644
  183. --- a/src/mqtt_ng.c
  184. +++ b/src/mqtt_ng.c
  185. @@ -4,6 +4,8 @@
  186. #include <pthread.h>
  187. #include <inttypes.h>
  188. +#include "c_rhash.h"
  189. +
  190. #include "common_internal.h"
  191. #include "mqtt_constants.h"
  192. #include "mqtt_wss_log.h"
  193. @@ -92,6 +94,7 @@ enum varhdr_parser_state {
  194. MQTT_PARSE_VARHDR_OPTIONAL_REASON_CODE,
  195. MQTT_PARSE_VARHDR_PROPS,
  196. MQTT_PARSE_VARHDR_TOPICNAME,
  197. + MQTT_PARSE_VARHDR_POST_TOPICNAME,
  198. MQTT_PARSE_VARHDR_PACKET_ID,
  199. MQTT_PARSE_REASONCODES,
  200. MQTT_PARSE_PAYLOAD
  201. @@ -103,22 +106,54 @@ struct mqtt_vbi_parser_ctx {
  202. uint32_t result;
  203. };
  204. +enum mqtt_datatype {
  205. + MQTT_TYPE_UNKNOWN = 0,
  206. + MQTT_TYPE_UINT_8,
  207. + MQTT_TYPE_UINT_16,
  208. + MQTT_TYPE_UINT_32,
  209. + MQTT_TYPE_VBI,
  210. + MQTT_TYPE_STR,
  211. + MQTT_TYPE_STR_PAIR,
  212. + MQTT_TYPE_BIN
  213. +};
  214. +
  215. struct mqtt_property {
  216. - uint32_t id;
  217. + uint8_t id;
  218. + enum mqtt_datatype type;
  219. + union {
  220. + char *strings[2];
  221. + void *bindata;
  222. + uint8_t uint8;
  223. + uint16_t uint16;
  224. + uint32_t uint32;
  225. + } data;
  226. + size_t bindata_len;
  227. struct mqtt_property *next;
  228. };
  229. enum mqtt_properties_parser_state {
  230. PROPERTIES_LENGTH = 0,
  231. - PROPERTY_ID
  232. + PROPERTY_CREATE,
  233. + PROPERTY_ID,
  234. + PROPERTY_TYPE_UINT8,
  235. + PROPERTY_TYPE_UINT16,
  236. + PROPERTY_TYPE_UINT32,
  237. + PROPERTY_TYPE_STR_BIN_LEN,
  238. + PROPERTY_TYPE_STR,
  239. + PROPERTY_TYPE_BIN,
  240. + PROPERTY_TYPE_VBI,
  241. + PROPERTY_NEXT
  242. };
  243. struct mqtt_properties_parser_ctx {
  244. enum mqtt_properties_parser_state state;
  245. struct mqtt_property *head;
  246. + struct mqtt_property *tail;
  247. uint32_t properties_length;
  248. + uint32_t vbi_length;
  249. struct mqtt_vbi_parser_ctx vbi_parser_ctx;
  250. size_t bytes_consumed;
  251. + int str_idx;
  252. };
  253. struct mqtt_connack {
  254. @@ -174,6 +209,17 @@ struct mqtt_ng_parser {
  255. } mqtt_packet;
  256. };
  257. +struct topic_alias_data {
  258. + uint16_t idx;
  259. + uint32_t usage_count;
  260. +};
  261. +
  262. +struct topic_aliases_data {
  263. + c_rhash stoi_dict;
  264. + uint32_t idx_max;
  265. + uint32_t idx_assigned;
  266. + int used;
  267. +};
  268. struct mqtt_ng_client {
  269. struct transaction_buffer main_buffer;
  270. @@ -199,6 +245,12 @@ struct mqtt_ng_client {
  271. void (*msg_callback)(const char *topic, const void *msg, size_t msglen, int qos);
  272. unsigned int ping_pending:1;
  273. +
  274. + struct mqtt_ng_stats stats;
  275. + pthread_mutex_t stats_mutex;
  276. +
  277. + struct topic_aliases_data tx_topic_aliases;
  278. + c_rhash rx_aliases;
  279. };
  280. char pingreq[] = { MQTT_CPT_PINGREQ << 4, 0x00 };
  281. @@ -356,6 +408,7 @@ static inline enum memory_mode ptr2memory_mode(void * ptr) {
  282. }
  283. #define frag_is_marked_for_gc(frag) ((frag->flags & BUFFER_FRAG_GARBAGE_COLLECT) || ((frag->flags & BUFFER_FRAG_GARBAGE_COLLECT_ON_SEND) && frag->sent == frag->len))
  284. +#define FRAG_SIZE_IN_BUFFER(frag) (sizeof(struct buffer_fragment) + ((frag->flags & BUFFER_FRAG_DATA_EXTERNAL) ? 0 : frag->len))
  285. static void buffer_frag_free_data(struct buffer_fragment *frag)
  286. {
  287. @@ -581,6 +634,12 @@ struct mqtt_ng_client *mqtt_ng_init(struct mqtt_ng_init *settings)
  288. client->connack_callback = settings->connack_callback;
  289. client->msg_callback = settings->msg_callback;
  290. + pthread_mutex_init(&client->stats_mutex, NULL);
  291. + client->tx_topic_aliases.stoi_dict = c_rhash_new(0);
  292. + client->tx_topic_aliases.idx_max = UINT16_MAX;
  293. +
  294. + client->rx_aliases = c_rhash_new(UINT16_MAX >> 8);
  295. +
  296. return client;
  297. }
  298. @@ -589,9 +648,27 @@ static inline uint8_t get_control_packet_type(uint8_t first_hdr_byte)
  299. return first_hdr_byte >> 4;
  300. }
  301. +static inline void mqtt_ng_destroy_rx_alias_hash(struct mqtt_ng_client *client)
  302. +{
  303. + c_rhash_iter_t i = C_RHASH_ITER_T_INITIALIZER;
  304. + uint64_t stored_key;
  305. + void *to_free;
  306. + while(!c_rhash_iter_uint64_keys(client->rx_aliases, &i, &stored_key)) {
  307. + c_rhash_get_ptr_by_uint64(client->rx_aliases, stored_key, &to_free);
  308. + mw_free(to_free);
  309. + }
  310. + c_rhash_destroy(client->rx_aliases);
  311. +}
  312. +
  313. void mqtt_ng_destroy(struct mqtt_ng_client *client)
  314. {
  315. transaction_buffer_destroy(&client->main_buffer);
  316. + pthread_mutex_destroy(&client->stats_mutex);
  317. +
  318. + c_rhash_destroy(client->tx_topic_aliases.stoi_dict);
  319. +
  320. + mqtt_ng_destroy_rx_alias_hash(client);
  321. +
  322. mw_free(client);
  323. }
  324. @@ -645,7 +722,7 @@ static size_t mqtt_ng_connect_size(struct mqtt_auth_properties *auth,
  325. + sizeof(mqtt_protocol_name_frag) /* Proto Name and Version */
  326. + 1 /* Connect Flags */
  327. + 2 /* Keep Alive */
  328. - + 1 /* 3.1.2.11.1 Property Length - for now 0, TODO TODO*/;
  329. + + 4 /* 3.1.2.11.1 Property Length - for now fixed to only Topic Alias Maximum, TODO TODO*/;
  330. // CONNECT payload. 3.1.3
  331. if (auth->client_id)
  332. @@ -720,21 +797,26 @@ static int _optimized_add(struct header_buffer *buf, mqtt_wss_log_ctx_t log_ctx,
  333. return 0;
  334. }
  335. -#define TRY_GENERATE_MESSAGE(generator_function, buf, log_ctx, max_mem, ...) \
  336. - int rc = generator_function(buf, log_ctx, ##__VA_ARGS__); \
  337. +#define TRY_GENERATE_MESSAGE(generator_function, client, ...) \
  338. + int rc = generator_function(&client->main_buffer, client->log, ##__VA_ARGS__); \
  339. if (rc == MQTT_NG_MSGGEN_BUFFER_OOM) { \
  340. - LOCK_HDR_BUFFER(buf); \
  341. - transaction_buffer_garbage_collect((buf), log_ctx); \
  342. - UNLOCK_HDR_BUFFER(buf); \
  343. - rc = generator_function(buf, log_ctx, ##__VA_ARGS__); \
  344. - if (rc == MQTT_NG_MSGGEN_BUFFER_OOM && max_mem) { \
  345. - LOCK_HDR_BUFFER(buf); \
  346. - transaction_buffer_grow((buf), log_ctx, GROWTH_FACTOR, max_mem); \
  347. - UNLOCK_HDR_BUFFER(buf); \
  348. - rc = generator_function(buf, log_ctx, ##__VA_ARGS__); \
  349. + LOCK_HDR_BUFFER(&client->main_buffer); \
  350. + transaction_buffer_garbage_collect((&client->main_buffer), client->log); \
  351. + UNLOCK_HDR_BUFFER(&client->main_buffer); \
  352. + rc = generator_function(&client->main_buffer, client->log, ##__VA_ARGS__); \
  353. + if (rc == MQTT_NG_MSGGEN_BUFFER_OOM && client->max_mem_bytes) { \
  354. + LOCK_HDR_BUFFER(&client->main_buffer); \
  355. + transaction_buffer_grow((&client->main_buffer), client->log, GROWTH_FACTOR, client->max_mem_bytes); \
  356. + UNLOCK_HDR_BUFFER(&client->main_buffer); \
  357. + rc = generator_function(&client->main_buffer, client->log, ##__VA_ARGS__); \
  358. } \
  359. if (rc == MQTT_NG_MSGGEN_BUFFER_OOM) \
  360. - mws_error(log_ctx, "%s failed to generate message due to insufficient buffer space (line %d)", __FUNCTION__, __LINE__); \
  361. + mws_error(client->log, "%s failed to generate message due to insufficient buffer space (line %d)", __FUNCTION__, __LINE__); \
  362. + } \
  363. + if (rc == MQTT_NG_MSGGEN_OK) { \
  364. + pthread_mutex_lock(&client->stats_mutex); \
  365. + client->stats.tx_messages_queued++; \
  366. + pthread_mutex_unlock(&client->stats_mutex); \
  367. } \
  368. return rc;
  369. @@ -829,10 +911,13 @@ mqtt_msg_data mqtt_ng_generate_connect(struct transaction_buffer *trx_buf,
  370. PACK_2B_INT(&trx_buf->hdr_buffer, keep_alive, frag);
  371. - // TODO Property Length [MQTT-3.1.3.2.1] temporary fixed 0
  372. - *WRITE_POS(frag) = 0;
  373. + // TODO Property Length [MQTT-3.1.3.2.1] temporary fixed to 3 (one property topic alias max)
  374. + DATA_ADVANCE(&trx_buf->hdr_buffer, uint32_to_mqtt_vbi(3, WRITE_POS(frag)), frag);
  375. + *WRITE_POS(frag) = MQTT_PROP_TOPIC_ALIAS_MAX;
  376. DATA_ADVANCE(&trx_buf->hdr_buffer, 1, frag);
  377. + PACK_2B_INT(&trx_buf->hdr_buffer, 65535, frag);
  378. +
  379. // [MQTT-3.1.3.1] Client identifier
  380. CHECK_BYTES_AVAILABLE(&trx_buf->hdr_buffer, 2, goto fail_rollback);
  381. PACK_2B_INT(&trx_buf->hdr_buffer, strlen(auth->client_id), frag);
  382. @@ -910,6 +995,15 @@ int mqtt_ng_connect(struct mqtt_ng_client *client,
  383. return 1;
  384. }
  385. + pthread_mutex_lock(&client->stats_mutex);
  386. + if (clean_start)
  387. + client->stats.tx_messages_queued++;
  388. + else
  389. + client->stats.tx_messages_queued = 1;
  390. + client->stats.tx_messages_sent = 0;
  391. + client->stats.rx_messages_rcvd = 0;
  392. + pthread_mutex_unlock(&client->stats_mutex);
  393. +
  394. client->client_state = CONNECT_PENDING;
  395. return 0;
  396. }
  397. @@ -921,13 +1015,19 @@ uint16_t get_unused_packet_id() {
  398. }
  399. static inline size_t mqtt_ng_publish_size(const char *topic,
  400. - size_t msg_len)
  401. + size_t msg_len,
  402. + uint16_t topic_id)
  403. {
  404. - return 2 /* Topic Name Length */
  405. - + strlen(topic)
  406. + size_t retval = 2 /* Topic Name Length */
  407. + + (topic == NULL ? 0 : strlen(topic))
  408. + 2 /* Packet identifier */
  409. - + 1 /* Properties Length TODO for now fixed 0 */
  410. + + 1 /* Properties Length TODO for now fixed to 1 property */
  411. + msg_len;
  412. +
  413. + if (topic_id)
  414. + retval += 3;
  415. +
  416. + return retval;
  417. }
  418. int mqtt_ng_generate_publish(struct transaction_buffer *trx_buf,
  419. @@ -938,13 +1038,14 @@ int mqtt_ng_generate_publish(struct transaction_buffer *trx_buf,
  420. free_fnc_t msg_free,
  421. size_t msg_len,
  422. uint8_t publish_flags,
  423. - uint16_t *packet_id)
  424. + uint16_t *packet_id,
  425. + uint16_t topic_alias)
  426. {
  427. // >> START THE RODEO <<
  428. transaction_buffer_transaction_start(trx_buf);
  429. // Calculate the resulting message size sans fixed MQTT header
  430. - size_t size = mqtt_ng_publish_size(topic, msg_len);
  431. + size_t size = mqtt_ng_publish_size(topic, msg_len, topic_alias);
  432. // Start generating the message
  433. struct buffer_fragment *frag = NULL;
  434. @@ -967,10 +1068,12 @@ int mqtt_ng_generate_publish(struct transaction_buffer *trx_buf,
  435. // MQTT Variable Header
  436. // [MQTT-3.3.2.1]
  437. - PACK_2B_INT(&trx_buf->hdr_buffer, strlen(topic), frag);
  438. - if (_optimized_add(&trx_buf->hdr_buffer, log_ctx, topic, strlen(topic), topic_free, &frag))
  439. - goto fail_rollback;
  440. - BUFFER_TRANSACTION_NEW_FRAG(&trx_buf->hdr_buffer, 0, frag, goto fail_rollback);
  441. + PACK_2B_INT(&trx_buf->hdr_buffer, topic == NULL ? 0 : strlen(topic), frag);
  442. + if (topic != NULL) {
  443. + if (_optimized_add(&trx_buf->hdr_buffer, log_ctx, topic, strlen(topic), topic_free, &frag))
  444. + goto fail_rollback;
  445. + BUFFER_TRANSACTION_NEW_FRAG(&trx_buf->hdr_buffer, 0, frag, goto fail_rollback);
  446. + }
  447. // [MQTT-3.3.2.2]
  448. mqtt_msg->packet_id = get_unused_packet_id();
  449. @@ -978,9 +1081,16 @@ int mqtt_ng_generate_publish(struct transaction_buffer *trx_buf,
  450. PACK_2B_INT(&trx_buf->hdr_buffer, mqtt_msg->packet_id, frag);
  451. // [MQTT-3.3.2.3.1] TODO Property Length for now fixed 0
  452. - *WRITE_POS(frag) = 0;
  453. + *WRITE_POS(frag) = topic_alias ? 3 : 0;
  454. DATA_ADVANCE(&trx_buf->hdr_buffer, 1, frag);
  455. + if(topic_alias) {
  456. + *WRITE_POS(frag) = MQTT_PROP_TOPIC_ALIAS;
  457. + DATA_ADVANCE(&trx_buf->hdr_buffer, 1, frag);
  458. +
  459. + PACK_2B_INT(&trx_buf->hdr_buffer, topic_alias, frag);
  460. + }
  461. +
  462. if( (frag = buffer_new_frag(&trx_buf->hdr_buffer, BUFFER_FRAG_DATA_EXTERNAL)) == NULL )
  463. goto fail_rollback;
  464. @@ -1006,7 +1116,20 @@ int mqtt_ng_publish(struct mqtt_ng_client *client,
  465. uint8_t publish_flags,
  466. uint16_t *packet_id)
  467. {
  468. - TRY_GENERATE_MESSAGE(mqtt_ng_generate_publish, &client->main_buffer, client->log, client->max_mem_bytes, topic, topic_free, msg, msg_free, msg_len, publish_flags, packet_id);
  469. + struct topic_alias_data *alias = NULL;
  470. + c_rhash_get_ptr_by_str(client->tx_topic_aliases.stoi_dict, topic, (void**)&alias);
  471. +
  472. + uint16_t topic_id = 0;
  473. +
  474. + if (alias != NULL) {
  475. + topic_id = alias->idx;
  476. + if (alias->usage_count++) {
  477. + topic = NULL;
  478. + topic_free = NULL;
  479. + }
  480. + }
  481. +
  482. + TRY_GENERATE_MESSAGE(mqtt_ng_generate_publish, client, topic, topic_free, msg, msg_free, msg_len, publish_flags, packet_id, topic_id);
  483. }
  484. static inline size_t mqtt_ng_subscribe_size(struct mqtt_sub *subs, size_t sub_count)
  485. @@ -1072,7 +1195,7 @@ fail_rollback:
  486. int mqtt_ng_subscribe(struct mqtt_ng_client *client, struct mqtt_sub *subs, size_t sub_count)
  487. {
  488. - TRY_GENERATE_MESSAGE(mqtt_ng_generate_subscribe, &client->main_buffer, client->log, client->max_mem_bytes, subs, sub_count);
  489. + TRY_GENERATE_MESSAGE(mqtt_ng_generate_subscribe, client, subs, sub_count);
  490. }
  491. int mqtt_ng_generate_disconnect(struct transaction_buffer *trx_buf, mqtt_wss_log_ctx_t log_ctx, uint8_t reason_code)
  492. @@ -1116,7 +1239,7 @@ fail_rollback:
  493. int mqtt_ng_disconnect(struct mqtt_ng_client *client, uint8_t reason_code)
  494. {
  495. - TRY_GENERATE_MESSAGE(mqtt_ng_generate_disconnect, &client->main_buffer, client->log, client->max_mem_bytes, reason_code);
  496. + TRY_GENERATE_MESSAGE(mqtt_ng_generate_disconnect, client, reason_code);
  497. }
  498. static int mqtt_generate_puback(struct transaction_buffer *trx_buf, mqtt_wss_log_ctx_t log_ctx, uint16_t packet_id, uint8_t reason_code)
  499. @@ -1161,7 +1284,7 @@ fail_rollback:
  500. static int mqtt_ng_puback(struct mqtt_ng_client *client, uint16_t packet_id, uint8_t reason_code)
  501. {
  502. - TRY_GENERATE_MESSAGE(mqtt_generate_puback, &client->main_buffer, client->log, client->max_mem_bytes, packet_id, reason_code);
  503. + TRY_GENERATE_MESSAGE(mqtt_generate_puback, client, packet_id, reason_code);
  504. }
  505. int mqtt_ng_ping(struct mqtt_ng_client *client)
  506. @@ -1212,12 +1335,81 @@ static int vbi_parser_parse(struct mqtt_vbi_parser_ctx *ctx, rbuf_t data, mqtt_w
  507. static void mqtt_properties_parser_ctx_reset(struct mqtt_properties_parser_ctx *ctx)
  508. {
  509. ctx->state = PROPERTIES_LENGTH;
  510. - ctx->head = NULL;
  511. + while (ctx->head) {
  512. + struct mqtt_property *f = ctx->head;
  513. + ctx->head = ctx->head->next;
  514. + if (f->type == MQTT_TYPE_STR || f->type == MQTT_TYPE_STR_PAIR)
  515. + mw_free(f->data.strings[0]);
  516. + if (f->type == MQTT_TYPE_STR_PAIR)
  517. + mw_free(f->data.strings[1]);
  518. + if (f->type == MQTT_TYPE_BIN)
  519. + mw_free(f->data.bindata);
  520. + mw_free(f);
  521. + }
  522. + ctx->tail = NULL;
  523. ctx->properties_length = 0;
  524. ctx->bytes_consumed = 0;
  525. vbi_parser_reset_ctx(&ctx->vbi_parser_ctx);
  526. }
  527. +struct mqtt_property_type {
  528. + uint8_t id;
  529. + enum mqtt_datatype datatype;
  530. + const char* name;
  531. +};
  532. +
  533. +const struct mqtt_property_type mqtt_property_types[] = {
  534. + { .id = MQTT_PROP_TOPIC_ALIAS, .name = MQTT_PROP_TOPIC_ALIAS_NAME, .datatype = MQTT_TYPE_UINT_16 },
  535. +
  536. + { .id = MQTT_PROP_PAYLOAD_FMT_INDICATOR, .name = MQTT_PROP_PAYLOAD_FMT_INDICATOR_NAME, .datatype = MQTT_TYPE_UINT_8 },
  537. + { .id = MQTT_PROP_MSG_EXPIRY_INTERVAL, .name = MQTT_PROP_MSG_EXPIRY_INTERVAL_NAME, .datatype = MQTT_TYPE_UINT_32 },
  538. + { .id = MQTT_PROP_CONTENT_TYPE, .name = MQTT_PROP_CONTENT_TYPE_NAME, .datatype = MQTT_TYPE_STR },
  539. + { .id = MQTT_PROP_RESPONSE_TOPIC, .name = MQTT_PROP_RESPONSE_TOPIC_NAME, .datatype = MQTT_TYPE_STR },
  540. + { .id = MQTT_PROP_CORRELATION_DATA, .name = MQTT_PROP_CORRELATION_DATA_NAME, .datatype = MQTT_TYPE_BIN },
  541. + { .id = MQTT_PROP_SUB_IDENTIFIER, .name = MQTT_PROP_SUB_IDENTIFIER_NAME, .datatype = MQTT_TYPE_VBI },
  542. + { .id = MQTT_PROP_SESSION_EXPIRY_INTERVAL, .name = MQTT_PROP_SESSION_EXPIRY_INTERVAL_NAME, .datatype = MQTT_TYPE_UINT_32 },
  543. + { .id = MQTT_PROP_ASSIGNED_CLIENT_ID, .name = MQTT_PROP_ASSIGNED_CLIENT_ID_NAME, .datatype = MQTT_TYPE_STR },
  544. + { .id = MQTT_PROP_SERVER_KEEP_ALIVE, .name = MQTT_PROP_SERVER_KEEP_ALIVE_NAME, .datatype = MQTT_TYPE_UINT_16 },
  545. + { .id = MQTT_PROP_AUTH_METHOD, .name = MQTT_PROP_AUTH_METHOD_NAME, .datatype = MQTT_TYPE_STR },
  546. + { .id = MQTT_PROP_AUTH_DATA, .name = MQTT_PROP_AUTH_DATA_NAME, .datatype = MQTT_TYPE_BIN },
  547. + { .id = MQTT_PROP_REQ_PROBLEM_INFO, .name = MQTT_PROP_REQ_PROBLEM_INFO_NAME, .datatype = MQTT_TYPE_UINT_8 },
  548. + { .id = MQTT_PROP_WILL_DELAY_INTERVAL, .name = MQTT_PROP_WIIL_DELAY_INTERVAL_NAME, .datatype = MQTT_TYPE_UINT_32 },
  549. + { .id = MQTT_PROP_REQ_RESP_INFORMATION, .name = MQTT_PROP_REQ_RESP_INFORMATION_NAME, .datatype = MQTT_TYPE_UINT_8 },
  550. + { .id = MQTT_PROP_RESP_INFORMATION, .name = MQTT_PROP_RESP_INFORMATION_NAME, .datatype = MQTT_TYPE_STR },
  551. + { .id = MQTT_PROP_SERVER_REF, .name = MQTT_PROP_SERVER_REF_NAME, .datatype = MQTT_TYPE_STR },
  552. + { .id = MQTT_PROP_REASON_STR, .name = MQTT_PROP_REASON_STR_NAME, .datatype = MQTT_TYPE_STR },
  553. + { .id = MQTT_PROP_RECEIVE_MAX, .name = MQTT_PROP_RECEIVE_MAX_NAME, .datatype = MQTT_TYPE_UINT_16 },
  554. + { .id = MQTT_PROP_TOPIC_ALIAS_MAX, .name = MQTT_PROP_TOPIC_ALIAS_MAX_NAME, .datatype = MQTT_TYPE_UINT_16 },
  555. + // MQTT_PROP_TOPIC_ALIAS is first as it is most often used
  556. + { .id = MQTT_PROP_MAX_QOS, .name = MQTT_PROP_MAX_QOS_NAME, .datatype = MQTT_TYPE_UINT_8 },
  557. + { .id = MQTT_PROP_RETAIN_AVAIL, .name = MQTT_PROP_RETAIN_AVAIL_NAME, .datatype = MQTT_TYPE_UINT_8 },
  558. + { .id = MQTT_PROP_USR, .name = MQTT_PROP_USR_NAME, .datatype = MQTT_TYPE_STR_PAIR },
  559. + { .id = MQTT_PROP_MAX_PKT_SIZE, .name = MQTT_PROP_MAX_PKT_SIZE_NAME, .datatype = MQTT_TYPE_UINT_32 },
  560. + { .id = MQTT_PROP_WILDCARD_SUB_AVAIL, .name = MQTT_PROP_WILDCARD_SUB_AVAIL_NAME, .datatype = MQTT_TYPE_UINT_8 },
  561. + { .id = MQTT_PROP_SUB_ID_AVAIL, .name = MQTT_PROP_SUB_ID_AVAIL_NAME, .datatype = MQTT_TYPE_UINT_8 },
  562. + { .id = MQTT_PROP_SHARED_SUB_AVAIL, .name = MQTT_PROP_SHARED_SUB_AVAIL_NAME, .datatype = MQTT_TYPE_UINT_8 },
  563. + { .id = 0, .name = NULL, .datatype = MQTT_TYPE_UNKNOWN }
  564. +};
  565. +
  566. +static int get_property_type_by_id(uint8_t property_id) {
  567. + for (int i = 0; mqtt_property_types[i].datatype != MQTT_TYPE_UNKNOWN; i++) {
  568. + if (mqtt_property_types[i].id == property_id)
  569. + return mqtt_property_types[i].datatype;
  570. + }
  571. + return MQTT_TYPE_UNKNOWN;
  572. +}
  573. +
  574. +struct mqtt_property *get_property_by_id(struct mqtt_property *props, uint8_t property_id)
  575. +{
  576. + while (props) {
  577. + if (props->id == property_id) {
  578. + return props;
  579. + }
  580. + props = props->next;
  581. + }
  582. + return NULL;
  583. +}
  584. +
  585. // Parses [MQTT-2.2.2]
  586. static int parse_properties_array(struct mqtt_properties_parser_ctx *ctx, rbuf_t data, mqtt_wss_log_ctx_t log)
  587. {
  588. @@ -1228,19 +1420,128 @@ static int parse_properties_array(struct mqtt_properties_parser_ctx *ctx, rbuf_t
  589. if (rc == MQTT_NG_CLIENT_PARSE_DONE) {
  590. ctx->properties_length = ctx->vbi_parser_ctx.result;
  591. ctx->bytes_consumed += ctx->vbi_parser_ctx.bytes;
  592. + ctx->vbi_length = ctx->vbi_parser_ctx.bytes;
  593. if (!ctx->properties_length)
  594. return MQTT_NG_CLIENT_PARSE_DONE;
  595. - ctx->state = PROPERTY_ID;
  596. - vbi_parser_reset_ctx(&ctx->vbi_parser_ctx);
  597. + ctx->state = PROPERTY_CREATE;
  598. break;
  599. }
  600. return rc;
  601. + case PROPERTY_CREATE:
  602. + BUF_READ_CHECK_AT_LEAST(data, 1);
  603. + struct mqtt_property *prop = mw_calloc(1, sizeof(struct mqtt_property));
  604. + if (ctx->head == NULL) {
  605. + ctx->head = prop;
  606. + ctx->tail = prop;
  607. + } else {
  608. + ctx->tail->next = prop;
  609. + ctx->tail = ctx->tail->next;
  610. + }
  611. + ctx->state = PROPERTY_ID;
  612. + /* FALLTHROUGH */
  613. case PROPERTY_ID:
  614. - // TODO ignore for now... just skip
  615. - rbuf_bump_tail(data, ctx->properties_length);
  616. - ctx->bytes_consumed += ctx->properties_length;
  617. + rbuf_pop(data, (char*)&ctx->tail->id, 1);
  618. + ctx->bytes_consumed += 1;
  619. + ctx->tail->type = get_property_type_by_id(ctx->tail->id);
  620. + switch (ctx->tail->type) {
  621. + case MQTT_TYPE_UINT_16:
  622. + ctx->state = PROPERTY_TYPE_UINT16;
  623. + break;
  624. + case MQTT_TYPE_UINT_32:
  625. + ctx->state = PROPERTY_TYPE_UINT32;
  626. + break;
  627. + case MQTT_TYPE_UINT_8:
  628. + ctx->state = PROPERTY_TYPE_UINT8;
  629. + break;
  630. + case MQTT_TYPE_VBI:
  631. + ctx->state = PROPERTY_TYPE_VBI;
  632. + vbi_parser_reset_ctx(&ctx->vbi_parser_ctx);
  633. + break;
  634. + case MQTT_TYPE_STR:
  635. + case MQTT_TYPE_STR_PAIR:
  636. + ctx->str_idx = 0;
  637. + /* FALLTHROUGH */
  638. + case MQTT_TYPE_BIN:
  639. + ctx->state = PROPERTY_TYPE_STR_BIN_LEN;
  640. + break;
  641. + default:
  642. + mws_error(log, "Unsupported property type %d for property id %d.", (int)ctx->tail->type, (int)ctx->tail->id);
  643. + return MQTT_NG_CLIENT_PROTOCOL_ERROR;
  644. + }
  645. + break;
  646. + case PROPERTY_TYPE_STR_BIN_LEN:
  647. + BUF_READ_CHECK_AT_LEAST(data, sizeof(uint16_t));
  648. + rbuf_pop(data, (char*)&ctx->tail->bindata_len, sizeof(uint16_t));
  649. + ctx->tail->bindata_len = be16toh(ctx->tail->bindata_len);
  650. + ctx->bytes_consumed += 2;
  651. + switch (ctx->tail->type) {
  652. + case MQTT_TYPE_BIN:
  653. + ctx->state = PROPERTY_TYPE_BIN;
  654. + break;
  655. + case MQTT_TYPE_STR:
  656. + case MQTT_TYPE_STR_PAIR:
  657. + ctx->state = PROPERTY_TYPE_STR;
  658. + break;
  659. + default:
  660. + mws_error(log, "Unexpected datatype in PROPERTY_TYPE_STR_BIN_LEN %d", (int)ctx->tail->type);
  661. + return MQTT_NG_CLIENT_INTERNAL_ERROR;
  662. + }
  663. + break;
  664. + case PROPERTY_TYPE_STR:
  665. + BUF_READ_CHECK_AT_LEAST(data, ctx->tail->bindata_len);
  666. + ctx->tail->data.strings[ctx->str_idx] = mw_malloc(ctx->tail->bindata_len + 1);
  667. + rbuf_pop(data, ctx->tail->data.strings[ctx->str_idx], ctx->tail->bindata_len);
  668. + ctx->tail->data.strings[ctx->str_idx][ctx->tail->bindata_len] = 0;
  669. + ctx->str_idx++;
  670. + ctx->bytes_consumed += ctx->tail->bindata_len;
  671. + if (ctx->tail->type == MQTT_TYPE_STR_PAIR && ctx->str_idx < 2) {
  672. + ctx->state = PROPERTY_TYPE_STR_BIN_LEN;
  673. + break;
  674. + }
  675. + ctx->state = PROPERTY_NEXT;
  676. + break;
  677. + case PROPERTY_TYPE_BIN:
  678. + BUF_READ_CHECK_AT_LEAST(data, ctx->tail->bindata_len);
  679. + ctx->tail->data.bindata = mw_malloc(ctx->tail->bindata_len);
  680. + rbuf_pop(data, ctx->tail->data.bindata, ctx->tail->bindata_len);
  681. + ctx->bytes_consumed += ctx->tail->bindata_len;
  682. + ctx->state = PROPERTY_NEXT;
  683. + break;
  684. + case PROPERTY_TYPE_VBI:
  685. + rc = vbi_parser_parse(&ctx->vbi_parser_ctx, data, log);
  686. + if (rc == MQTT_NG_CLIENT_PARSE_DONE) {
  687. + ctx->tail->data.uint32 = ctx->vbi_parser_ctx.result;
  688. + ctx->bytes_consumed += ctx->vbi_parser_ctx.bytes;
  689. + ctx->state = PROPERTY_NEXT;
  690. + break;
  691. + }
  692. + return rc;
  693. + case PROPERTY_TYPE_UINT8:
  694. + BUF_READ_CHECK_AT_LEAST(data, sizeof(uint8_t));
  695. + rbuf_pop(data, (char*)&ctx->tail->data.uint8, sizeof(uint8_t));
  696. + ctx->bytes_consumed += sizeof(uint8_t);
  697. + ctx->state = PROPERTY_NEXT;
  698. + break;
  699. + case PROPERTY_TYPE_UINT32:
  700. + BUF_READ_CHECK_AT_LEAST(data, sizeof(uint32_t));
  701. + rbuf_pop(data, (char*)&ctx->tail->data.uint32, sizeof(uint32_t));
  702. + ctx->tail->data.uint32 = be32toh(ctx->tail->data.uint32);
  703. + ctx->bytes_consumed += sizeof(uint32_t);
  704. + ctx->state = PROPERTY_NEXT;
  705. + break;
  706. + case PROPERTY_TYPE_UINT16:
  707. + BUF_READ_CHECK_AT_LEAST(data, sizeof(uint16_t));
  708. + rbuf_pop(data, (char*)&ctx->tail->data.uint16, sizeof(uint16_t));
  709. + ctx->tail->data.uint16 = be16toh(ctx->tail->data.uint16);
  710. + ctx->bytes_consumed += sizeof(uint16_t);
  711. + ctx->state = PROPERTY_NEXT;
  712. + /* FALLTHROUGH */
  713. + case PROPERTY_NEXT:
  714. + if (ctx->properties_length > ctx->bytes_consumed - ctx->vbi_length) {
  715. + ctx->state = PROPERTY_CREATE;
  716. + break;
  717. + } else
  718. return MQTT_NG_CLIENT_PARSE_DONE;
  719. -// rc = vbi_parser_parse(&ctx->vbi_parser_ctx, data, log);
  720. }
  721. return MQTT_NG_CLIENT_OK_CALL_AGAIN;
  722. }
  723. @@ -1381,20 +1682,28 @@ static int parse_publish_varhdr(struct mqtt_ng_client *client)
  724. switch (parser->varhdr_state) {
  725. case MQTT_PARSE_VARHDR_INITIAL:
  726. BUF_READ_CHECK_AT_LEAST(parser->received_data, 2);
  727. + publish->topic = NULL;
  728. publish->qos = ((parser->mqtt_control_packet_type >> 1) & 0x03);
  729. rbuf_pop(parser->received_data, (char*)&publish->topic_len, 2);
  730. publish->topic_len = be16toh(publish->topic_len);
  731. + parser->mqtt_parsed_len = 2;
  732. + if (!publish->topic_len) {
  733. + parser->varhdr_state = MQTT_PARSE_VARHDR_POST_TOPICNAME;
  734. + break;
  735. + }
  736. publish->topic = mw_calloc(1, publish->topic_len + 1 /* add 0x00 */);
  737. if (publish->topic == NULL)
  738. return MQTT_NG_CLIENT_OOM;
  739. parser->varhdr_state = MQTT_PARSE_VARHDR_TOPICNAME;
  740. - parser->mqtt_parsed_len = 2;
  741. /* FALLTHROUGH */
  742. case MQTT_PARSE_VARHDR_TOPICNAME:
  743. // TODO check empty topic can be valid? In which case we have to skip this step
  744. BUF_READ_CHECK_AT_LEAST(parser->received_data, publish->topic_len);
  745. rbuf_pop(parser->received_data, publish->topic, publish->topic_len);
  746. parser->mqtt_parsed_len += publish->topic_len;
  747. + parser->varhdr_state = MQTT_PARSE_VARHDR_POST_TOPICNAME;
  748. + /* FALLTHROUGH */
  749. + case MQTT_PARSE_VARHDR_POST_TOPICNAME:
  750. mqtt_properties_parser_ctx_reset(&parser->properties_parser);
  751. if (!publish->qos) { // PacketID present only for QOS > 0 [MQTT-3.3.2.2]
  752. parser->varhdr_state = MQTT_PARSE_VARHDR_PROPS;
  753. @@ -1589,6 +1898,11 @@ static int send_fragment(struct mqtt_ng_client *client) {
  754. if (frag->flags & BUFFER_FRAG_MQTT_PACKET_TAIL) {
  755. client->time_of_last_send = time(NULL);
  756. + pthread_mutex_lock(&client->stats_mutex);
  757. + if (client->main_buffer.sending_frag != &ping_frag)
  758. + client->stats.tx_messages_queued--;
  759. + client->stats.tx_messages_sent++;
  760. + pthread_mutex_unlock(&client->stats_mutex);
  761. client->main_buffer.sending_frag = NULL;
  762. return 1;
  763. }
  764. @@ -1654,6 +1968,10 @@ int handle_incoming_traffic(struct mqtt_ng_client *client)
  765. #ifdef MQTT_DEBUG_VERBOSE
  766. DEBUG("MQTT Packet Parsed Successfully!");
  767. #endif
  768. + pthread_mutex_lock(&client->stats_mutex);
  769. + client->stats.rx_messages_rcvd++;
  770. + pthread_mutex_unlock(&client->stats_mutex);
  771. +
  772. switch (get_control_packet_type(client->parser.mqtt_control_packet_type)) {
  773. case MQTT_CPT_CONNACK:
  774. #ifdef MQTT_DEBUG_VERBOSE
  775. @@ -1713,9 +2031,30 @@ int handle_incoming_traffic(struct mqtt_ng_client *client)
  776. ERROR("Error generating PUBACK reply for PUBLISH");
  777. return rc;
  778. }
  779. + struct mqtt_property *prop;
  780. + if ( (prop = get_property_by_id(client->parser.properties_parser.head, MQTT_PROP_TOPIC_ALIAS)) != NULL ) {
  781. + // Topic Alias property was sent from server
  782. + void *topic_ptr;
  783. + if (!c_rhash_get_ptr_by_uint64(client->rx_aliases, prop->data.uint8, &topic_ptr)) {
  784. + if (pub->topic != NULL) {
  785. + ERROR("We do not yet support topic alias reassignment");
  786. + return MQTT_NG_CLIENT_NOT_IMPL_YET;
  787. + }
  788. + pub->topic = topic_ptr;
  789. + } else {
  790. + if (pub->topic == NULL) {
  791. + ERROR("Topic alias with id %d unknown and topic not set by server!", prop->data.uint8);
  792. + return MQTT_NG_CLIENT_PROTOCOL_ERROR;
  793. + }
  794. + c_rhash_insert_uint64_ptr(client->rx_aliases, prop->data.uint8, pub->topic);
  795. + }
  796. + }
  797. if (client->msg_callback)
  798. client->msg_callback(pub->topic, pub->data, pub->data_len, pub->qos);
  799. - mw_free(pub->topic);
  800. + // in case we have property topic alias and we have topic we take over the string
  801. + // and add pointer to it into topic alias list
  802. + if (prop == NULL)
  803. + mw_free(pub->topic);
  804. mw_free(pub->data);
  805. return MQTT_NG_CLIENT_WANT_WRITE;
  806. case MQTT_CPT_DISCONNECT:
  807. @@ -1767,3 +2106,42 @@ void mqtt_ng_set_max_mem(struct mqtt_ng_client *client, size_t bytes)
  808. {
  809. client->max_mem_bytes = bytes;
  810. }
  811. +
  812. +void mqtt_ng_get_stats(struct mqtt_ng_client *client, struct mqtt_ng_stats *stats)
  813. +{
  814. + pthread_mutex_lock(&client->stats_mutex);
  815. + memcpy(stats, &client->stats, sizeof(struct mqtt_ng_stats));
  816. + pthread_mutex_unlock(&client->stats_mutex);
  817. +
  818. + stats->tx_bytes_queued = 0;
  819. + stats->tx_buffer_reclaimable = 0;
  820. +
  821. + LOCK_HDR_BUFFER(&client->main_buffer);
  822. + stats->tx_buffer_used = BUFFER_BYTES_USED(&client->main_buffer.hdr_buffer);
  823. + stats->tx_buffer_free = BUFFER_BYTES_AVAILABLE(&client->main_buffer.hdr_buffer);
  824. + stats->tx_buffer_size = client->main_buffer.hdr_buffer.size;
  825. + struct buffer_fragment *frag = BUFFER_FIRST_FRAG(&client->main_buffer.hdr_buffer);
  826. + while (frag) {
  827. + stats->tx_bytes_queued += frag->len - frag->sent;
  828. + if (frag_is_marked_for_gc(frag))
  829. + stats->tx_buffer_reclaimable += FRAG_SIZE_IN_BUFFER(frag);
  830. +
  831. + frag = frag->next;
  832. + }
  833. + UNLOCK_HDR_BUFFER(&client->main_buffer);
  834. +}
  835. +
  836. +int mqtt_ng_set_topic_alias(struct mqtt_ng_client *client, const char *topic)
  837. +{
  838. + if (client->tx_topic_aliases.idx_assigned >= client->tx_topic_aliases.idx_max) {
  839. + mws_error(client->log, "Tx topic alias indexes were exhausted (current version of the library doesn't support reassigning yet. Feel free to contribute.");
  840. + return 0; //0 is not a valid topic alias
  841. + }
  842. + struct topic_alias_data *alias = mw_malloc(sizeof(struct topic_alias_data));
  843. + alias->idx = ++client->tx_topic_aliases.idx_assigned;
  844. + alias->usage_count = 0;
  845. +
  846. + c_rhash_insert_str_ptr(client->tx_topic_aliases.stoi_dict, topic, (void*)alias);
  847. +
  848. + return alias->idx;
  849. +}
  850. diff --git a/src/mqtt_wss_client.c b/src/mqtt_wss_client.c
  851. index 8fee62bdb7..a87be00d79 100644
  852. --- a/src/mqtt_wss_client.c
  853. +++ b/src/mqtt_wss_client.c
  854. @@ -1087,6 +1087,17 @@ static inline long long int t_till_next_keepalive_ms(mqtt_wss_client client)
  855. return(next_mqtt_keep_alive - (MQTT_PAL_TIME() * SEC_TO_MSEC));
  856. }
  857. +#ifdef MQTT_WSS_CPUSTATS
  858. +static inline uint64_t mqtt_wss_now_usec(mqtt_wss_client client) {
  859. + struct timespec ts;
  860. + if(clock_gettime(CLOCK_MONOTONIC, &ts) == -1) {
  861. + mws_error(client->log, "clock_gettime(CLOCK_MONOTONIC, &timespec) failed.");
  862. + return 0;
  863. + }
  864. + return (uint64_t)ts.tv_sec * USEC_PER_SEC + (ts.tv_nsec % NSEC_PER_SEC) / NSEC_PER_USEC;
  865. +}
  866. +#endif
  867. +
  868. int mqtt_wss_service(mqtt_wss_client client, int timeout_ms)
  869. {
  870. char *ptr;
  871. @@ -1094,6 +1105,11 @@ int mqtt_wss_service(mqtt_wss_client client, int timeout_ms)
  872. int ret;
  873. int send_keepalive = 0;
  874. +#ifdef MQTT_WSS_CPUSTATS
  875. + uint64_t t1,t2;
  876. + t1 = mqtt_wss_now_usec(client);
  877. +#endif
  878. +
  879. #ifdef DEBUG_ULTRA_VERBOSE
  880. mws_debug(client->log, ">>>>> mqtt_wss_service <<<<<");
  881. mws_debug(client->log, "Waiting for events: %s%s%s",
  882. @@ -1112,6 +1128,11 @@ int mqtt_wss_service(mqtt_wss_client client, int timeout_ms)
  883. send_keepalive = 1;
  884. }
  885. +#ifdef MQTT_WSS_CPUSTATS
  886. + t2 = mqtt_wss_now_usec(client);
  887. + client->stats.time_keepalive += t2 - t1;
  888. +#endif
  889. +
  890. if ((ret = poll(client->poll_fds, 2, timeout_ms >= 0 ? timeout_ms : -1)) < 0) {
  891. if (errno == EINTR) {
  892. mws_warn(client->log, "poll interrupted by EINTR");
  893. @@ -1134,6 +1155,10 @@ int mqtt_wss_service(mqtt_wss_client client, int timeout_ms)
  894. return client->last_ec;
  895. }
  896. +#ifdef MQTT_WSS_CPUSTATS
  897. + t1 = mqtt_wss_now_usec(client);
  898. +#endif
  899. +
  900. if (ret == 0) {
  901. if (send_keepalive) {
  902. // otherwise we shortened the timeout ourselves to take care of
  903. @@ -1152,6 +1177,11 @@ int mqtt_wss_service(mqtt_wss_client client, int timeout_ms)
  904. }
  905. }
  906. +#ifdef MQTT_WSS_CPUSTATS
  907. + t2 = mqtt_wss_now_usec(client);
  908. + client->stats.time_keepalive += t2 - t1;
  909. +#endif
  910. +
  911. client->poll_fds[POLLFD_SOCKET].events = 0;
  912. if ((ptr = rbuf_get_linear_insert_range(client->ws_client->buf_read, &size))) {
  913. @@ -1180,6 +1210,11 @@ int mqtt_wss_service(mqtt_wss_client client, int timeout_ms)
  914. }
  915. }
  916. +#ifdef MQTT_WSS_CPUSTATS
  917. + t1 = mqtt_wss_now_usec(client);
  918. + client->stats.time_read_socket += t1 - t2;
  919. +#endif
  920. +
  921. ret = ws_client_process(client->ws_client);
  922. switch(ret) {
  923. case WS_CLIENT_PROTOCOL_ERROR:
  924. @@ -1194,6 +1229,11 @@ int mqtt_wss_service(mqtt_wss_client client, int timeout_ms)
  925. return MQTT_WSS_ERR_CONN_DROP;
  926. }
  927. +#ifdef MQTT_WSS_CPUSTATS
  928. + t2 = mqtt_wss_now_usec(client);
  929. + client->stats.time_process_websocket += t2 - t1;
  930. +#endif
  931. +
  932. if (handle_mqtt(client))
  933. return MQTT_WSS_ERR_PROTO_MQTT;
  934. @@ -1202,6 +1242,11 @@ int mqtt_wss_service(mqtt_wss_client client, int timeout_ms)
  935. client->poll_fds[POLLFD_SOCKET].events |= POLLOUT;
  936. }
  937. +#ifdef MQTT_WSS_CPUSTATS
  938. + t1 = mqtt_wss_now_usec(client);
  939. + client->stats.time_process_mqtt += t1 - t2;
  940. +#endif
  941. +
  942. if ((ptr = rbuf_get_linear_read_range(client->ws_client->buf_write, &size))) {
  943. #ifdef DEBUG_ULTRA_VERBOSE
  944. mws_debug(client->log, "Have data to write to SSL");
  945. @@ -1234,6 +1279,11 @@ int mqtt_wss_service(mqtt_wss_client client, int timeout_ms)
  946. if(client->poll_fds[POLLFD_PIPE].revents & POLLIN)
  947. util_clear_pipe(client->write_notif_pipe[PIPE_READ_END]);
  948. +#ifdef MQTT_WSS_CPUSTATS
  949. + t2 = mqtt_wss_now_usec(client);
  950. + client->stats.time_write_socket += t2 - t1;
  951. +#endif
  952. +
  953. return MQTT_WSS_OK;
  954. }
  955. @@ -1504,9 +1554,18 @@ struct mqtt_wss_stats mqtt_wss_get_stats(mqtt_wss_client client)
  956. current = client->stats;
  957. memset(&client->stats, 0, sizeof(client->stats));
  958. pthread_mutex_unlock(&client->stat_lock);
  959. + mqtt_ng_get_stats(client->mqtt.mqtt_ctx, &current.mqtt);
  960. return current;
  961. }
  962. +int mqtt_wss_set_topic_alias(mqtt_wss_client client, const char *topic)
  963. +{
  964. + if(!client->internal_mqtt)
  965. + return 0;
  966. +
  967. + return mqtt_ng_set_topic_alias(client->mqtt.mqtt_ctx, topic);
  968. +}
  969. +
  970. #ifdef MQTT_WSS_DEBUG
  971. void mqtt_wss_set_SSL_CTX_keylog_cb(mqtt_wss_client client, void (*ssl_ctx_keylog_cb)(const SSL *ssl, const char *line))
  972. {