mqtt5_decoder.c 41 KB

1234567891011121314151617181920212223242526272829303132333435363738394041424344454647484950515253545556575859606162636465666768697071727374757677787980818283848586878889909192939495969798991001011021031041051061071081091101111121131141151161171181191201211221231241251261271281291301311321331341351361371381391401411421431441451461471481491501511521531541551561571581591601611621631641651661671681691701711721731741751761771781791801811821831841851861871881891901911921931941951961971981992002012022032042052062072082092102112122132142152162172182192202212222232242252262272282292302312322332342352362372382392402412422432442452462472482492502512522532542552562572582592602612622632642652662672682692702712722732742752762772782792802812822832842852862872882892902912922932942952962972982993003013023033043053063073083093103113123133143153163173183193203213223233243253263273283293303313323333343353363373383393403413423433443453463473483493503513523533543553563573583593603613623633643653663673683693703713723733743753763773783793803813823833843853863873883893903913923933943953963973983994004014024034044054064074084094104114124134144154164174184194204214224234244254264274284294304314324334344354364374384394404414424434444454464474484494504514524534544554564574584594604614624634644654664674684694704714724734744754764774784794804814824834844854864874884894904914924934944954964974984995005015025035045055065075085095105115125135145155165175185195205215225235245255265275285295305315325335345355365375385395405415425435445455465475485495505515525535545555565575585595605615625635645655665675685695705715725735745755765775785795805815825835845855865875885895905915925935945955965975985996006016026036046056066076086096106116126136146156166176186196206216226236246256266276286296306316326336346356366376386396406416426436446456466476486496506516526536546556566576586596606616626636646656666676686696706716726736746756766776786796806816826836846856866876886896906916926936946956966976986997007017027037047057067077087097107117127137147157167177187197207217227237247257267277287297307317327337347357367377387397407417427437447457467477487497507517527537547557567577587597607617627637647657667677687697707717727737747757767777787797807817827837847857867877887897907917927937947957967977987998008018028038048058068078088098108118128138148158168178188198208218228238248258268278288298308318328338348358368378388398408418428438448458468478488498508518528538548558568578588598608618628638648658668678688698708718728738748758768778788798808818828838848858868878888898908918928938948958968978988999009019029039049059069079089099109119129139149159169179189199209219229239249259269279289299309319329339349359369379389399409419429439449459469479489499509519529539549559569579589599609619629639649659669679689699709719729739749759769779789799809819829839849859869879889899909919929939949959969979989991000100110021003100410051006100710081009101010111012101310141015101610171018101910201021102210231024102510261027102810291030103110321033103410351036103710381039104010411042104310441045104610471048104910501051105210531054105510561057105810591060106110621063106410651066106710681069107010711072107310741075107610771078107910801081108210831084108510861087108810891090109110921093109410951096109710981099110011011102110311041105110611071108110911101111111211131114111511161117111811191120112111221123112411251126112711281129113011311132113311341135113611371138113911401141114211431144114511461147114811491150115111521153115411551156115711581159116011611162116311641165116611671168116911701171117211731174
  1. /**
  2. * Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved.
  3. * SPDX-License-Identifier: Apache-2.0.
  4. */
  5. #include <aws/mqtt/private/v5/mqtt5_decoder.h>
  6. #include <aws/mqtt/private/v5/mqtt5_topic_alias.h>
  7. #include <aws/mqtt/private/v5/mqtt5_utils.h>
  8. #define AWS_MQTT5_DECODER_BUFFER_START_SIZE 2048
  9. #define PUBLISH_PACKET_FIXED_HEADER_DUPLICATE_FLAG 8
  10. #define PUBLISH_PACKET_FIXED_HEADER_RETAIN_FLAG 1
  11. #define PUBLISH_PACKET_FIXED_HEADER_QOS_FLAG 3
  12. static void s_reset_decoder_for_new_packet(struct aws_mqtt5_decoder *decoder) {
  13. aws_byte_buf_reset(&decoder->scratch_space, false);
  14. decoder->packet_first_byte = 0;
  15. decoder->remaining_length = 0;
  16. AWS_ZERO_STRUCT(decoder->packet_cursor);
  17. }
  18. static void s_enter_state(struct aws_mqtt5_decoder *decoder, enum aws_mqtt5_decoder_state state) {
  19. decoder->state = state;
  20. if (state == AWS_MQTT5_DS_READ_PACKET_TYPE) {
  21. s_reset_decoder_for_new_packet(decoder);
  22. } else {
  23. aws_byte_buf_reset(&decoder->scratch_space, false);
  24. }
  25. }
  26. static bool s_is_decodable_packet_type(struct aws_mqtt5_decoder *decoder, enum aws_mqtt5_packet_type type) {
  27. return (uint32_t)type < AWS_ARRAY_SIZE(decoder->options.decoder_table->decoders_by_packet_type) &&
  28. decoder->options.decoder_table->decoders_by_packet_type[type] != NULL;
  29. }
  30. /*
  31. * Every mqtt packet has a first byte that, amongst other things, determines the packet type
  32. */
  33. static int s_aws_mqtt5_decoder_read_packet_type_on_data(
  34. struct aws_mqtt5_decoder *decoder,
  35. struct aws_byte_cursor *data) {
  36. if (data->len == 0) {
  37. return AWS_MQTT5_DRT_MORE_DATA;
  38. }
  39. uint8_t byte = *data->ptr;
  40. aws_byte_cursor_advance(data, 1);
  41. aws_byte_buf_append_byte_dynamic(&decoder->scratch_space, byte);
  42. enum aws_mqtt5_packet_type packet_type = (byte >> 4);
  43. if (!s_is_decodable_packet_type(decoder, packet_type)) {
  44. AWS_LOGF_ERROR(
  45. AWS_LS_MQTT5_CLIENT,
  46. "id=%p: unsupported or illegal packet type value: %d",
  47. decoder->options.callback_user_data,
  48. (int)packet_type);
  49. return AWS_MQTT5_DRT_ERROR;
  50. }
  51. decoder->packet_first_byte = byte;
  52. s_enter_state(decoder, AWS_MQTT5_DS_READ_REMAINING_LENGTH);
  53. return AWS_MQTT5_DRT_SUCCESS;
  54. }
  55. /*
  56. * non-streaming variable length integer decode. cursor is updated only if the value was successfully read
  57. */
  58. enum aws_mqtt5_decode_result_type aws_mqtt5_decode_vli(struct aws_byte_cursor *cursor, uint32_t *dest) {
  59. uint32_t value = 0;
  60. bool more_data = false;
  61. size_t bytes_used = 0;
  62. uint32_t shift = 0;
  63. struct aws_byte_cursor cursor_copy = *cursor;
  64. for (; bytes_used < 4; ++bytes_used) {
  65. uint8_t byte = 0;
  66. if (!aws_byte_cursor_read_u8(&cursor_copy, &byte)) {
  67. return AWS_MQTT5_DRT_MORE_DATA;
  68. }
  69. value |= ((byte & 0x7F) << shift);
  70. shift += 7;
  71. more_data = (byte & 0x80) != 0;
  72. if (!more_data) {
  73. break;
  74. }
  75. }
  76. if (more_data) {
  77. /* A variable length integer with the 4th byte high bit set is not valid */
  78. AWS_LOGF_ERROR(AWS_LS_MQTT5_GENERAL, "(static) aws_mqtt5_decoder - illegal variable length integer encoding");
  79. return AWS_MQTT5_DRT_ERROR;
  80. }
  81. aws_byte_cursor_advance(cursor, bytes_used + 1);
  82. *dest = value;
  83. return AWS_MQTT5_DRT_SUCCESS;
  84. }
  85. /* "streaming" variable length integer decode */
  86. static enum aws_mqtt5_decode_result_type s_aws_mqtt5_decoder_read_vli_on_data(
  87. struct aws_mqtt5_decoder *decoder,
  88. uint32_t *vli_dest,
  89. struct aws_byte_cursor *data) {
  90. enum aws_mqtt5_decode_result_type decode_vli_result = AWS_MQTT5_DRT_MORE_DATA;
  91. /* try to decode the vli integer one byte at a time */
  92. while (data->len > 0 && decode_vli_result == AWS_MQTT5_DRT_MORE_DATA) {
  93. /* append a single byte to the scratch buffer */
  94. struct aws_byte_cursor byte_cursor = aws_byte_cursor_advance(data, 1);
  95. aws_byte_buf_append_dynamic(&decoder->scratch_space, &byte_cursor);
  96. /* now try and decode a vli integer based on the range implied by the offset into the buffer */
  97. struct aws_byte_cursor vli_cursor = {
  98. .ptr = decoder->scratch_space.buffer,
  99. .len = decoder->scratch_space.len,
  100. };
  101. decode_vli_result = aws_mqtt5_decode_vli(&vli_cursor, vli_dest);
  102. }
  103. return decode_vli_result;
  104. }
  105. /* attempts to read the variable length integer that is always the second piece of data in an mqtt packet */
  106. static enum aws_mqtt5_decode_result_type s_aws_mqtt5_decoder_read_remaining_length_on_data(
  107. struct aws_mqtt5_decoder *decoder,
  108. struct aws_byte_cursor *data) {
  109. enum aws_mqtt5_decode_result_type result =
  110. s_aws_mqtt5_decoder_read_vli_on_data(decoder, &decoder->remaining_length, data);
  111. if (result != AWS_MQTT5_DRT_SUCCESS) {
  112. return result;
  113. }
  114. s_enter_state(decoder, AWS_MQTT5_DS_READ_PACKET);
  115. return AWS_MQTT5_DRT_SUCCESS;
  116. }
  117. /* non-streaming decode of a user property; failure implies connection termination */
  118. int aws_mqtt5_decode_user_property(
  119. struct aws_byte_cursor *packet_cursor,
  120. struct aws_mqtt5_user_property_set *properties) {
  121. struct aws_mqtt5_user_property property;
  122. AWS_MQTT5_DECODE_LENGTH_PREFIXED_CURSOR(packet_cursor, &property.name, error);
  123. AWS_MQTT5_DECODE_LENGTH_PREFIXED_CURSOR(packet_cursor, &property.value, error);
  124. if (aws_array_list_push_back(&properties->properties, &property)) {
  125. return AWS_OP_ERR;
  126. }
  127. return AWS_OP_SUCCESS;
  128. error:
  129. return AWS_OP_ERR;
  130. }
  131. /* decode function for all CONNACK properties */
  132. static int s_read_connack_property(
  133. struct aws_mqtt5_packet_connack_storage *storage,
  134. struct aws_byte_cursor *packet_cursor) {
  135. int result = AWS_OP_ERR;
  136. uint8_t property_type = 0;
  137. AWS_MQTT5_DECODE_U8(packet_cursor, &property_type, done);
  138. struct aws_mqtt5_packet_connack_view *storage_view = &storage->storage_view;
  139. switch (property_type) {
  140. case AWS_MQTT5_PROPERTY_TYPE_SESSION_EXPIRY_INTERVAL:
  141. AWS_MQTT5_DECODE_U32_OPTIONAL(
  142. packet_cursor, &storage->session_expiry_interval, &storage_view->session_expiry_interval, done);
  143. break;
  144. case AWS_MQTT5_PROPERTY_TYPE_RECEIVE_MAXIMUM:
  145. AWS_MQTT5_DECODE_U16_OPTIONAL(
  146. packet_cursor, &storage->receive_maximum, &storage_view->receive_maximum, done);
  147. break;
  148. case AWS_MQTT5_PROPERTY_TYPE_MAXIMUM_QOS:
  149. AWS_MQTT5_DECODE_U8_OPTIONAL(packet_cursor, &storage->maximum_qos, &storage_view->maximum_qos, done);
  150. break;
  151. case AWS_MQTT5_PROPERTY_TYPE_RETAIN_AVAILABLE:
  152. AWS_MQTT5_DECODE_U8_OPTIONAL(
  153. packet_cursor, &storage->retain_available, &storage_view->retain_available, done);
  154. break;
  155. case AWS_MQTT5_PROPERTY_TYPE_MAXIMUM_PACKET_SIZE:
  156. AWS_MQTT5_DECODE_U32_OPTIONAL(
  157. packet_cursor, &storage->maximum_packet_size, &storage_view->maximum_packet_size, done);
  158. break;
  159. case AWS_MQTT5_PROPERTY_TYPE_ASSIGNED_CLIENT_IDENTIFIER:
  160. AWS_MQTT5_DECODE_LENGTH_PREFIXED_CURSOR_OPTIONAL(
  161. packet_cursor, &storage->assigned_client_identifier, &storage_view->assigned_client_identifier, done);
  162. break;
  163. case AWS_MQTT5_PROPERTY_TYPE_TOPIC_ALIAS_MAXIMUM:
  164. AWS_MQTT5_DECODE_U16_OPTIONAL(
  165. packet_cursor, &storage->topic_alias_maximum, &storage_view->topic_alias_maximum, done);
  166. break;
  167. case AWS_MQTT5_PROPERTY_TYPE_REASON_STRING:
  168. AWS_MQTT5_DECODE_LENGTH_PREFIXED_CURSOR_OPTIONAL(
  169. packet_cursor, &storage->reason_string, &storage_view->reason_string, done);
  170. break;
  171. case AWS_MQTT5_PROPERTY_TYPE_WILDCARD_SUBSCRIPTIONS_AVAILABLE:
  172. AWS_MQTT5_DECODE_U8_OPTIONAL(
  173. packet_cursor,
  174. &storage->wildcard_subscriptions_available,
  175. &storage_view->wildcard_subscriptions_available,
  176. done);
  177. break;
  178. case AWS_MQTT5_PROPERTY_TYPE_SUBSCRIPTION_IDENTIFIERS_AVAILABLE:
  179. AWS_MQTT5_DECODE_U8_OPTIONAL(
  180. packet_cursor,
  181. &storage->subscription_identifiers_available,
  182. &storage_view->subscription_identifiers_available,
  183. done);
  184. break;
  185. case AWS_MQTT5_PROPERTY_TYPE_SHARED_SUBSCRIPTIONS_AVAILABLE:
  186. AWS_MQTT5_DECODE_U8_OPTIONAL(
  187. packet_cursor,
  188. &storage->shared_subscriptions_available,
  189. &storage_view->shared_subscriptions_available,
  190. done);
  191. break;
  192. case AWS_MQTT5_PROPERTY_TYPE_SERVER_KEEP_ALIVE:
  193. AWS_MQTT5_DECODE_U16_OPTIONAL(
  194. packet_cursor, &storage->server_keep_alive, &storage_view->server_keep_alive, done);
  195. break;
  196. case AWS_MQTT5_PROPERTY_TYPE_RESPONSE_INFORMATION:
  197. AWS_MQTT5_DECODE_LENGTH_PREFIXED_CURSOR_OPTIONAL(
  198. packet_cursor, &storage->response_information, &storage_view->response_information, done);
  199. break;
  200. case AWS_MQTT5_PROPERTY_TYPE_SERVER_REFERENCE:
  201. AWS_MQTT5_DECODE_LENGTH_PREFIXED_CURSOR_OPTIONAL(
  202. packet_cursor, &storage->server_reference, &storage_view->server_reference, done);
  203. break;
  204. case AWS_MQTT5_PROPERTY_TYPE_AUTHENTICATION_METHOD:
  205. AWS_MQTT5_DECODE_LENGTH_PREFIXED_CURSOR_OPTIONAL(
  206. packet_cursor, &storage->authentication_method, &storage_view->authentication_method, done);
  207. break;
  208. case AWS_MQTT5_PROPERTY_TYPE_AUTHENTICATION_DATA:
  209. AWS_MQTT5_DECODE_LENGTH_PREFIXED_CURSOR_OPTIONAL(
  210. packet_cursor, &storage->authentication_data, &storage_view->authentication_data, done);
  211. break;
  212. case AWS_MQTT5_PROPERTY_TYPE_USER_PROPERTY:
  213. if (aws_mqtt5_decode_user_property(packet_cursor, &storage->user_properties)) {
  214. goto done;
  215. }
  216. break;
  217. default:
  218. goto done;
  219. }
  220. result = AWS_OP_SUCCESS;
  221. done:
  222. if (result != AWS_OP_SUCCESS) {
  223. AWS_LOGF_ERROR(AWS_LS_MQTT5_CLIENT, "Read CONNACK property decode failure");
  224. aws_raise_error(AWS_ERROR_MQTT5_DECODE_PROTOCOL_ERROR);
  225. }
  226. return result;
  227. }
  228. /* decodes a CONNACK packet whose data must be in the scratch buffer */
  229. static int s_aws_mqtt5_decoder_decode_connack(struct aws_mqtt5_decoder *decoder) {
  230. struct aws_mqtt5_packet_connack_storage storage;
  231. if (aws_mqtt5_packet_connack_storage_init_from_external_storage(&storage, decoder->allocator)) {
  232. return AWS_OP_ERR;
  233. }
  234. int result = AWS_OP_ERR;
  235. uint8_t first_byte = decoder->packet_first_byte;
  236. /* CONNACK flags must be zero by protocol */
  237. if ((first_byte & 0x0F) != 0) {
  238. goto done;
  239. }
  240. struct aws_byte_cursor packet_cursor = decoder->packet_cursor;
  241. uint32_t remaining_length = decoder->remaining_length;
  242. if (remaining_length != (uint32_t)packet_cursor.len) {
  243. goto done;
  244. }
  245. uint8_t connect_flags = 0;
  246. AWS_MQTT5_DECODE_U8(&packet_cursor, &connect_flags, done);
  247. /* everything but the 0-bit must be 0 */
  248. if ((connect_flags & 0xFE) != 0) {
  249. goto done;
  250. }
  251. struct aws_mqtt5_packet_connack_view *storage_view = &storage.storage_view;
  252. storage_view->session_present = (connect_flags & 0x01) != 0;
  253. uint8_t reason_code = 0;
  254. AWS_MQTT5_DECODE_U8(&packet_cursor, &reason_code, done);
  255. storage_view->reason_code = reason_code;
  256. uint32_t property_length = 0;
  257. AWS_MQTT5_DECODE_VLI(&packet_cursor, &property_length, done);
  258. if (property_length != (uint32_t)packet_cursor.len) {
  259. goto done;
  260. }
  261. while (packet_cursor.len > 0) {
  262. if (s_read_connack_property(&storage, &packet_cursor)) {
  263. goto done;
  264. }
  265. }
  266. storage_view->user_property_count = aws_mqtt5_user_property_set_size(&storage.user_properties);
  267. storage_view->user_properties = storage.user_properties.properties.data;
  268. result = AWS_OP_SUCCESS;
  269. done:
  270. if (result == AWS_OP_SUCCESS) {
  271. if (decoder->options.on_packet_received != NULL) {
  272. result = (*decoder->options.on_packet_received)(
  273. AWS_MQTT5_PT_CONNACK, &storage.storage_view, decoder->options.callback_user_data);
  274. }
  275. } else {
  276. AWS_LOGF_ERROR(AWS_LS_MQTT5_CLIENT, "id=%p: CONNACK decode failure", decoder->options.callback_user_data);
  277. aws_raise_error(AWS_ERROR_MQTT5_DECODE_PROTOCOL_ERROR);
  278. }
  279. aws_mqtt5_packet_connack_storage_clean_up(&storage);
  280. return result;
  281. }
  282. /* decode function for all PUBLISH properties */
  283. static int s_read_publish_property(
  284. struct aws_mqtt5_packet_publish_storage *storage,
  285. struct aws_byte_cursor *packet_cursor) {
  286. int result = AWS_OP_ERR;
  287. uint8_t property_type = 0;
  288. AWS_MQTT5_DECODE_U8(packet_cursor, &property_type, done);
  289. struct aws_mqtt5_packet_publish_view *storage_view = &storage->storage_view;
  290. switch (property_type) {
  291. case AWS_MQTT5_PROPERTY_TYPE_PAYLOAD_FORMAT_INDICATOR:
  292. AWS_MQTT5_DECODE_U8_OPTIONAL(packet_cursor, &storage->payload_format, &storage_view->payload_format, done);
  293. break;
  294. case AWS_MQTT5_PROPERTY_TYPE_MESSAGE_EXPIRY_INTERVAL:
  295. AWS_MQTT5_DECODE_U32_OPTIONAL(
  296. packet_cursor,
  297. &storage->message_expiry_interval_seconds,
  298. &storage_view->message_expiry_interval_seconds,
  299. done);
  300. break;
  301. case AWS_MQTT5_PROPERTY_TYPE_TOPIC_ALIAS:
  302. AWS_MQTT5_DECODE_U16_OPTIONAL(packet_cursor, &storage->topic_alias, &storage_view->topic_alias, done);
  303. break;
  304. case AWS_MQTT5_PROPERTY_TYPE_RESPONSE_TOPIC:
  305. AWS_MQTT5_DECODE_LENGTH_PREFIXED_CURSOR_OPTIONAL(
  306. packet_cursor, &storage->response_topic, &storage_view->response_topic, done);
  307. break;
  308. case AWS_MQTT5_PROPERTY_TYPE_CORRELATION_DATA:
  309. AWS_MQTT5_DECODE_LENGTH_PREFIXED_CURSOR_OPTIONAL(
  310. packet_cursor, &storage->correlation_data, &storage_view->correlation_data, done);
  311. break;
  312. case AWS_MQTT5_PROPERTY_TYPE_USER_PROPERTY:
  313. if (aws_mqtt5_decode_user_property(packet_cursor, &storage->user_properties)) {
  314. goto done;
  315. }
  316. break;
  317. case AWS_MQTT5_PROPERTY_TYPE_SUBSCRIPTION_IDENTIFIER: {
  318. uint32_t subscription_identifier = 0;
  319. AWS_MQTT5_DECODE_VLI(packet_cursor, &subscription_identifier, done);
  320. aws_array_list_push_back(&storage->subscription_identifiers, &subscription_identifier);
  321. break;
  322. }
  323. case AWS_MQTT5_PROPERTY_TYPE_CONTENT_TYPE:
  324. AWS_MQTT5_DECODE_LENGTH_PREFIXED_CURSOR_OPTIONAL(
  325. packet_cursor, &storage->content_type, &storage_view->content_type, done);
  326. break;
  327. default:
  328. goto done;
  329. }
  330. result = AWS_OP_SUCCESS;
  331. done:
  332. if (result != AWS_OP_SUCCESS) {
  333. AWS_LOGF_ERROR(AWS_LS_MQTT5_CLIENT, "Read PUBLISH property decode failure");
  334. aws_raise_error(AWS_ERROR_MQTT5_DECODE_PROTOCOL_ERROR);
  335. }
  336. return result;
  337. }
  338. /* decodes a PUBLISH packet whose data must be in the scratch buffer */
  339. static int s_aws_mqtt5_decoder_decode_publish(struct aws_mqtt5_decoder *decoder) {
  340. struct aws_mqtt5_packet_publish_storage storage;
  341. if (aws_mqtt5_packet_publish_storage_init_from_external_storage(&storage, decoder->allocator)) {
  342. return AWS_OP_ERR;
  343. }
  344. int result = AWS_OP_ERR;
  345. struct aws_mqtt5_packet_publish_view *storage_view = &storage.storage_view;
  346. /*
  347. * Fixed Header
  348. * byte 1:
  349. * bits 4-7: MQTT Control Packet Type
  350. * bit 3: DUP flag
  351. * bit 1-2: QoS level
  352. * bit 0: RETAIN
  353. * byte 2-x: Remaining Length as Variable Byte Integer (1-4 bytes)
  354. */
  355. uint8_t first_byte = decoder->packet_first_byte;
  356. if ((first_byte & PUBLISH_PACKET_FIXED_HEADER_DUPLICATE_FLAG) != 0) {
  357. storage_view->duplicate = true;
  358. }
  359. if ((first_byte & PUBLISH_PACKET_FIXED_HEADER_RETAIN_FLAG) != 0) {
  360. storage_view->retain = true;
  361. }
  362. storage_view->qos = (enum aws_mqtt5_qos)((first_byte >> 1) & PUBLISH_PACKET_FIXED_HEADER_QOS_FLAG);
  363. struct aws_byte_cursor packet_cursor = decoder->packet_cursor;
  364. uint32_t remaining_length = decoder->remaining_length;
  365. if (remaining_length != (uint32_t)packet_cursor.len) {
  366. goto done;
  367. }
  368. /*
  369. * Topic Name
  370. * Packet Identifier (only present for > QoS 0)
  371. * Properties
  372. * - Property Length
  373. * - Properties
  374. * Payload
  375. */
  376. AWS_MQTT5_DECODE_LENGTH_PREFIXED_CURSOR(&packet_cursor, &storage_view->topic, done);
  377. if (storage_view->qos > 0) {
  378. AWS_MQTT5_DECODE_U16(&packet_cursor, &storage_view->packet_id, done);
  379. }
  380. uint32_t property_length = 0;
  381. AWS_MQTT5_DECODE_VLI(&packet_cursor, &property_length, done);
  382. if (property_length > (uint32_t)packet_cursor.len) {
  383. goto done;
  384. }
  385. struct aws_byte_cursor properties_cursor = aws_byte_cursor_advance(&packet_cursor, property_length);
  386. while (properties_cursor.len > 0) {
  387. if (s_read_publish_property(&storage, &properties_cursor)) {
  388. goto done;
  389. }
  390. }
  391. storage_view->subscription_identifier_count = aws_array_list_length(&storage.subscription_identifiers);
  392. storage_view->subscription_identifiers = storage.subscription_identifiers.data;
  393. storage_view->user_property_count = aws_mqtt5_user_property_set_size(&storage.user_properties);
  394. storage_view->user_properties = storage.user_properties.properties.data;
  395. storage_view->payload = packet_cursor;
  396. if (storage_view->topic_alias != NULL) {
  397. if (decoder->topic_alias_resolver == NULL) {
  398. AWS_LOGF_ERROR(
  399. AWS_LS_MQTT5_CLIENT,
  400. "id=%p: PUBLISH packet contained topic alias when not allowed",
  401. decoder->options.callback_user_data);
  402. goto done;
  403. }
  404. uint16_t topic_alias_id = *storage_view->topic_alias;
  405. if (topic_alias_id == 0) {
  406. AWS_LOGF_ERROR(
  407. AWS_LS_MQTT5_CLIENT,
  408. "id=%p: PUBLISH packet contained illegal topic alias",
  409. decoder->options.callback_user_data);
  410. goto done;
  411. }
  412. if (storage_view->topic.len > 0) {
  413. if (aws_mqtt5_inbound_topic_alias_resolver_register_alias(
  414. decoder->topic_alias_resolver, topic_alias_id, storage_view->topic)) {
  415. AWS_LOGF_ERROR(
  416. AWS_LS_MQTT5_CLIENT, "id=%p: unable to register topic alias", decoder->options.callback_user_data);
  417. goto done;
  418. }
  419. } else {
  420. if (aws_mqtt5_inbound_topic_alias_resolver_resolve_alias(
  421. decoder->topic_alias_resolver, topic_alias_id, &storage_view->topic)) {
  422. AWS_LOGF_ERROR(
  423. AWS_LS_MQTT5_CLIENT,
  424. "id=%p: PUBLISH packet contained unknown topic alias",
  425. decoder->options.callback_user_data);
  426. goto done;
  427. }
  428. }
  429. }
  430. result = AWS_OP_SUCCESS;
  431. done:
  432. if (result == AWS_OP_SUCCESS) {
  433. if (decoder->options.on_packet_received != NULL) {
  434. result = (*decoder->options.on_packet_received)(
  435. AWS_MQTT5_PT_PUBLISH, &storage.storage_view, decoder->options.callback_user_data);
  436. }
  437. } else {
  438. AWS_LOGF_ERROR(AWS_LS_MQTT5_CLIENT, "id=%p: PUBLISH decode failure", decoder->options.callback_user_data);
  439. aws_raise_error(AWS_ERROR_MQTT5_DECODE_PROTOCOL_ERROR);
  440. }
  441. aws_mqtt5_packet_publish_storage_clean_up(&storage);
  442. return result;
  443. }
  444. /* decode function for all PUBACK properties */
  445. static int s_read_puback_property(
  446. struct aws_mqtt5_packet_puback_storage *storage,
  447. struct aws_byte_cursor *packet_cursor) {
  448. int result = AWS_OP_ERR;
  449. uint8_t property_type = 0;
  450. AWS_MQTT5_DECODE_U8(packet_cursor, &property_type, done);
  451. struct aws_mqtt5_packet_puback_view *storage_view = &storage->storage_view;
  452. switch (property_type) {
  453. case AWS_MQTT5_PROPERTY_TYPE_REASON_STRING:
  454. AWS_MQTT5_DECODE_LENGTH_PREFIXED_CURSOR_OPTIONAL(
  455. packet_cursor, &storage->reason_string, &storage_view->reason_string, done);
  456. break;
  457. case AWS_MQTT5_PROPERTY_TYPE_USER_PROPERTY:
  458. if (aws_mqtt5_decode_user_property(packet_cursor, &storage->user_properties)) {
  459. goto done;
  460. }
  461. break;
  462. default:
  463. goto done;
  464. }
  465. result = AWS_OP_SUCCESS;
  466. done:
  467. if (result != AWS_OP_SUCCESS) {
  468. AWS_LOGF_ERROR(AWS_LS_MQTT5_CLIENT, "Read PUBACK property decode failure");
  469. aws_raise_error(AWS_ERROR_MQTT5_DECODE_PROTOCOL_ERROR);
  470. }
  471. return result;
  472. }
  473. /* decodes a PUBACK packet whose data must be in the scratch buffer */
  474. static int s_aws_mqtt5_decoder_decode_puback(struct aws_mqtt5_decoder *decoder) {
  475. struct aws_mqtt5_packet_puback_storage storage;
  476. if (aws_mqtt5_packet_puback_storage_init_from_external_storage(&storage, decoder->allocator)) {
  477. return AWS_OP_ERR;
  478. }
  479. int result = AWS_OP_ERR;
  480. uint8_t first_byte = decoder->packet_first_byte;
  481. /* PUBACK flags must be zero by protocol */
  482. if ((first_byte & 0x0F) != 0) {
  483. goto done;
  484. }
  485. struct aws_byte_cursor packet_cursor = decoder->packet_cursor;
  486. uint32_t remaining_length = decoder->remaining_length;
  487. if (remaining_length != (uint32_t)packet_cursor.len) {
  488. goto done;
  489. }
  490. struct aws_mqtt5_packet_puback_view *storage_view = &storage.storage_view;
  491. AWS_MQTT5_DECODE_U16(&packet_cursor, &storage_view->packet_id, done);
  492. /* Packet can end immediately following packet id with default success reason code */
  493. uint8_t reason_code = 0;
  494. if (packet_cursor.len > 0) {
  495. AWS_MQTT5_DECODE_U8(&packet_cursor, &reason_code, done);
  496. /* Packet can end immediately following reason code */
  497. if (packet_cursor.len > 0) {
  498. uint32_t property_length = 0;
  499. AWS_MQTT5_DECODE_VLI(&packet_cursor, &property_length, done);
  500. if (property_length != (uint32_t)packet_cursor.len) {
  501. goto done;
  502. }
  503. while (packet_cursor.len > 0) {
  504. if (s_read_puback_property(&storage, &packet_cursor)) {
  505. goto done;
  506. }
  507. }
  508. }
  509. }
  510. storage_view->user_property_count = aws_mqtt5_user_property_set_size(&storage.user_properties);
  511. storage_view->user_properties = storage.user_properties.properties.data;
  512. storage_view->reason_code = reason_code;
  513. result = AWS_OP_SUCCESS;
  514. done:
  515. if (result == AWS_OP_SUCCESS) {
  516. if (decoder->options.on_packet_received != NULL) {
  517. result = (*decoder->options.on_packet_received)(
  518. AWS_MQTT5_PT_PUBACK, &storage.storage_view, decoder->options.callback_user_data);
  519. }
  520. } else {
  521. AWS_LOGF_ERROR(
  522. AWS_LS_MQTT5_GENERAL,
  523. "(%p) aws_mqtt5_decoder - PUBACK decode failure",
  524. decoder->options.callback_user_data);
  525. aws_raise_error(AWS_ERROR_MQTT5_DECODE_PROTOCOL_ERROR);
  526. }
  527. aws_mqtt5_packet_puback_storage_clean_up(&storage);
  528. return result;
  529. }
  530. /* decode function for all SUBACK properties */
  531. static int s_read_suback_property(
  532. struct aws_mqtt5_packet_suback_storage *storage,
  533. struct aws_byte_cursor *packet_cursor) {
  534. int result = AWS_OP_ERR;
  535. uint8_t property_type = 0;
  536. AWS_MQTT5_DECODE_U8(packet_cursor, &property_type, done);
  537. struct aws_mqtt5_packet_suback_view *storage_view = &storage->storage_view;
  538. switch (property_type) {
  539. case AWS_MQTT5_PROPERTY_TYPE_REASON_STRING:
  540. AWS_MQTT5_DECODE_LENGTH_PREFIXED_CURSOR_OPTIONAL(
  541. packet_cursor, &storage->reason_string, &storage_view->reason_string, done);
  542. break;
  543. case AWS_MQTT5_PROPERTY_TYPE_USER_PROPERTY:
  544. if (aws_mqtt5_decode_user_property(packet_cursor, &storage->user_properties)) {
  545. goto done;
  546. }
  547. break;
  548. default:
  549. goto done;
  550. }
  551. result = AWS_OP_SUCCESS;
  552. done:
  553. if (result != AWS_OP_SUCCESS) {
  554. AWS_LOGF_ERROR(AWS_LS_MQTT5_CLIENT, "Read SUBACK property decode failure");
  555. aws_raise_error(AWS_ERROR_MQTT5_DECODE_PROTOCOL_ERROR);
  556. }
  557. return result;
  558. }
  559. /* decodes a SUBACK packet whose data must be in the scratch buffer */
  560. static int s_aws_mqtt5_decoder_decode_suback(struct aws_mqtt5_decoder *decoder) {
  561. struct aws_mqtt5_packet_suback_storage storage;
  562. if (aws_mqtt5_packet_suback_storage_init_from_external_storage(&storage, decoder->allocator)) {
  563. return AWS_OP_ERR;
  564. }
  565. int result = AWS_OP_ERR;
  566. struct aws_mqtt5_packet_suback_view *storage_view = &storage.storage_view;
  567. struct aws_byte_cursor packet_cursor = decoder->packet_cursor;
  568. AWS_MQTT5_DECODE_U16(&packet_cursor, &storage_view->packet_id, done);
  569. uint32_t property_length = 0;
  570. AWS_MQTT5_DECODE_VLI(&packet_cursor, &property_length, done);
  571. struct aws_byte_cursor properties_cursor = aws_byte_cursor_advance(&packet_cursor, property_length);
  572. while (properties_cursor.len > 0) {
  573. if (s_read_suback_property(&storage, &properties_cursor)) {
  574. goto done;
  575. }
  576. }
  577. aws_array_list_init_dynamic(
  578. &storage.reason_codes, decoder->allocator, packet_cursor.len, sizeof(enum aws_mqtt5_suback_reason_code));
  579. while (packet_cursor.len > 0) {
  580. uint8_t reason_code;
  581. AWS_MQTT5_DECODE_U8(&packet_cursor, &reason_code, done);
  582. enum aws_mqtt5_suback_reason_code reason_code_enum = reason_code;
  583. aws_array_list_push_back(&storage.reason_codes, &reason_code_enum);
  584. }
  585. storage_view->reason_code_count = aws_array_list_length(&storage.reason_codes);
  586. storage_view->reason_codes = storage.reason_codes.data;
  587. storage_view->user_property_count = aws_mqtt5_user_property_set_size(&storage.user_properties);
  588. storage_view->user_properties = storage.user_properties.properties.data;
  589. result = AWS_OP_SUCCESS;
  590. done:
  591. if (result == AWS_OP_SUCCESS) {
  592. if (decoder->options.on_packet_received != NULL) {
  593. result = (*decoder->options.on_packet_received)(
  594. AWS_MQTT5_PT_SUBACK, &storage.storage_view, decoder->options.callback_user_data);
  595. }
  596. } else {
  597. AWS_LOGF_ERROR(
  598. AWS_LS_MQTT5_GENERAL,
  599. "(%p) aws_mqtt5_decoder - SUBACK decode failure",
  600. decoder->options.callback_user_data);
  601. aws_raise_error(AWS_ERROR_MQTT5_DECODE_PROTOCOL_ERROR);
  602. }
  603. aws_mqtt5_packet_suback_storage_clean_up(&storage);
  604. return result;
  605. }
  606. /* decode function for all UNSUBACK properties */
  607. static int s_read_unsuback_property(
  608. struct aws_mqtt5_packet_unsuback_storage *storage,
  609. struct aws_byte_cursor *packet_cursor) {
  610. int result = AWS_OP_ERR;
  611. uint8_t property_type = 0;
  612. AWS_MQTT5_DECODE_U8(packet_cursor, &property_type, done);
  613. struct aws_mqtt5_packet_unsuback_view *storage_view = &storage->storage_view;
  614. switch (property_type) {
  615. case AWS_MQTT5_PROPERTY_TYPE_REASON_STRING:
  616. AWS_MQTT5_DECODE_LENGTH_PREFIXED_CURSOR_OPTIONAL(
  617. packet_cursor, &storage->reason_string, &storage_view->reason_string, done);
  618. break;
  619. case AWS_MQTT5_PROPERTY_TYPE_USER_PROPERTY:
  620. if (aws_mqtt5_decode_user_property(packet_cursor, &storage->user_properties)) {
  621. goto done;
  622. }
  623. break;
  624. default:
  625. goto done;
  626. }
  627. result = AWS_OP_SUCCESS;
  628. done:
  629. if (result != AWS_OP_SUCCESS) {
  630. AWS_LOGF_ERROR(AWS_LS_MQTT5_CLIENT, "Read UNSUBACK property decode failure");
  631. aws_raise_error(AWS_ERROR_MQTT5_DECODE_PROTOCOL_ERROR);
  632. }
  633. return result;
  634. }
  635. /* decodes an UNSUBACK packet whose data must be in the scratch buffer */
  636. static int s_aws_mqtt5_decoder_decode_unsuback(struct aws_mqtt5_decoder *decoder) {
  637. struct aws_mqtt5_packet_unsuback_storage storage;
  638. /*
  639. * Fixed Header
  640. * byte 1: MQTT5 Control Packet - Reserved 0
  641. * byte 2 - x: VLI Remaining Length
  642. *
  643. * Variable Header
  644. * byte 1-2: Packet Identifier
  645. * Byte 3 - x: VLI Property Length
  646. *
  647. * Properties
  648. * byte 1: Idenfier
  649. * bytes 2 - x: Property content
  650. *
  651. * Payload
  652. * 1 byte per reason code in order of unsub requests
  653. */
  654. if (aws_mqtt5_packet_unsuback_storage_init_from_external_storage(&storage, decoder->allocator)) {
  655. return AWS_OP_ERR;
  656. }
  657. int result = AWS_OP_ERR;
  658. struct aws_byte_cursor packet_cursor = decoder->packet_cursor;
  659. struct aws_mqtt5_packet_unsuback_view *storage_view = &storage.storage_view;
  660. AWS_MQTT5_DECODE_U16(&packet_cursor, &storage_view->packet_id, done);
  661. uint32_t property_length = 0;
  662. AWS_MQTT5_DECODE_VLI(&packet_cursor, &property_length, done);
  663. struct aws_byte_cursor properties_cursor = aws_byte_cursor_advance(&packet_cursor, property_length);
  664. while (properties_cursor.len > 0) {
  665. if (s_read_unsuback_property(&storage, &properties_cursor)) {
  666. goto done;
  667. }
  668. }
  669. aws_array_list_init_dynamic(
  670. &storage.reason_codes, decoder->allocator, packet_cursor.len, sizeof(enum aws_mqtt5_unsuback_reason_code));
  671. while (packet_cursor.len > 0) {
  672. uint8_t reason_code;
  673. AWS_MQTT5_DECODE_U8(&packet_cursor, &reason_code, done);
  674. enum aws_mqtt5_unsuback_reason_code reason_code_enum = reason_code;
  675. aws_array_list_push_back(&storage.reason_codes, &reason_code_enum);
  676. }
  677. storage_view->reason_code_count = aws_array_list_length(&storage.reason_codes);
  678. storage_view->reason_codes = storage.reason_codes.data;
  679. storage_view->user_property_count = aws_mqtt5_user_property_set_size(&storage.user_properties);
  680. storage_view->user_properties = storage.user_properties.properties.data;
  681. result = AWS_OP_SUCCESS;
  682. done:
  683. if (result == AWS_OP_SUCCESS) {
  684. if (decoder->options.on_packet_received != NULL) {
  685. result = (*decoder->options.on_packet_received)(
  686. AWS_MQTT5_PT_UNSUBACK, &storage.storage_view, decoder->options.callback_user_data);
  687. }
  688. } else {
  689. AWS_LOGF_ERROR(
  690. AWS_LS_MQTT5_GENERAL,
  691. "(%p) aws_mqtt5_decoder - UNSUBACK decode failure",
  692. decoder->options.callback_user_data);
  693. aws_raise_error(AWS_ERROR_MQTT5_DECODE_PROTOCOL_ERROR);
  694. }
  695. aws_mqtt5_packet_unsuback_storage_clean_up(&storage);
  696. return result;
  697. }
  698. /* decode function for all DISCONNECT properties */
  699. static int s_read_disconnect_property(
  700. struct aws_mqtt5_packet_disconnect_storage *storage,
  701. struct aws_byte_cursor *packet_cursor) {
  702. int result = AWS_OP_ERR;
  703. uint8_t property_type = 0;
  704. AWS_MQTT5_DECODE_U8(packet_cursor, &property_type, done);
  705. struct aws_mqtt5_packet_disconnect_view *storage_view = &storage->storage_view;
  706. switch (property_type) {
  707. case AWS_MQTT5_PROPERTY_TYPE_SESSION_EXPIRY_INTERVAL:
  708. AWS_MQTT5_DECODE_U32_OPTIONAL(
  709. packet_cursor,
  710. &storage->session_expiry_interval_seconds,
  711. &storage_view->session_expiry_interval_seconds,
  712. done);
  713. break;
  714. case AWS_MQTT5_PROPERTY_TYPE_SERVER_REFERENCE:
  715. AWS_MQTT5_DECODE_LENGTH_PREFIXED_CURSOR_OPTIONAL(
  716. packet_cursor, &storage->server_reference, &storage_view->server_reference, done);
  717. break;
  718. case AWS_MQTT5_PROPERTY_TYPE_REASON_STRING:
  719. AWS_MQTT5_DECODE_LENGTH_PREFIXED_CURSOR_OPTIONAL(
  720. packet_cursor, &storage->reason_string, &storage_view->reason_string, done);
  721. break;
  722. case AWS_MQTT5_PROPERTY_TYPE_USER_PROPERTY:
  723. if (aws_mqtt5_decode_user_property(packet_cursor, &storage->user_properties)) {
  724. goto done;
  725. }
  726. break;
  727. default:
  728. goto done;
  729. }
  730. result = AWS_OP_SUCCESS;
  731. done:
  732. if (result == AWS_OP_ERR) {
  733. AWS_LOGF_ERROR(AWS_LS_MQTT5_CLIENT, "Read DISCONNECT property decode failure");
  734. aws_raise_error(AWS_ERROR_MQTT5_DECODE_PROTOCOL_ERROR);
  735. }
  736. return result;
  737. }
  738. /* decodes a DISCONNECT packet whose data must be in the scratch buffer */
  739. static int s_aws_mqtt5_decoder_decode_disconnect(struct aws_mqtt5_decoder *decoder) {
  740. struct aws_mqtt5_packet_disconnect_storage storage;
  741. if (aws_mqtt5_packet_disconnect_storage_init_from_external_storage(&storage, decoder->allocator)) {
  742. return AWS_OP_ERR;
  743. }
  744. int result = AWS_OP_ERR;
  745. uint8_t first_byte = decoder->packet_first_byte;
  746. /* DISCONNECT flags must be zero by protocol */
  747. if ((first_byte & 0x0F) != 0) {
  748. goto done;
  749. }
  750. struct aws_byte_cursor packet_cursor = decoder->packet_cursor;
  751. uint32_t remaining_length = decoder->remaining_length;
  752. if (remaining_length != (uint32_t)packet_cursor.len) {
  753. goto done;
  754. }
  755. struct aws_mqtt5_packet_disconnect_view *storage_view = &storage.storage_view;
  756. if (remaining_length > 0) {
  757. uint8_t reason_code = 0;
  758. AWS_MQTT5_DECODE_U8(&packet_cursor, &reason_code, done);
  759. storage_view->reason_code = reason_code;
  760. if (packet_cursor.len == 0) {
  761. result = AWS_OP_SUCCESS;
  762. goto done;
  763. }
  764. uint32_t property_length = 0;
  765. AWS_MQTT5_DECODE_VLI(&packet_cursor, &property_length, done);
  766. if (property_length != (uint32_t)packet_cursor.len) {
  767. goto done;
  768. }
  769. while (packet_cursor.len > 0) {
  770. if (s_read_disconnect_property(&storage, &packet_cursor)) {
  771. goto done;
  772. }
  773. }
  774. }
  775. storage_view->user_property_count = aws_mqtt5_user_property_set_size(&storage.user_properties);
  776. storage_view->user_properties = storage.user_properties.properties.data;
  777. result = AWS_OP_SUCCESS;
  778. done:
  779. if (result == AWS_OP_SUCCESS) {
  780. if (decoder->options.on_packet_received != NULL) {
  781. result = (*decoder->options.on_packet_received)(
  782. AWS_MQTT5_PT_DISCONNECT, &storage.storage_view, decoder->options.callback_user_data);
  783. }
  784. } else {
  785. AWS_LOGF_ERROR(AWS_LS_MQTT5_CLIENT, "id=%p: DISCONNECT decode failure", decoder->options.callback_user_data);
  786. aws_raise_error(AWS_ERROR_MQTT5_DECODE_PROTOCOL_ERROR);
  787. }
  788. aws_mqtt5_packet_disconnect_storage_clean_up(&storage);
  789. return result;
  790. }
  791. static int s_aws_mqtt5_decoder_decode_pingresp(struct aws_mqtt5_decoder *decoder) {
  792. if (decoder->packet_cursor.len != 0) {
  793. goto error;
  794. }
  795. uint8_t expected_first_byte = aws_mqtt5_compute_fixed_header_byte1(AWS_MQTT5_PT_PINGRESP, 0);
  796. if (decoder->packet_first_byte != expected_first_byte || decoder->remaining_length != 0) {
  797. goto error;
  798. }
  799. int result = AWS_OP_SUCCESS;
  800. if (decoder->options.on_packet_received != NULL) {
  801. result =
  802. (*decoder->options.on_packet_received)(AWS_MQTT5_PT_PINGRESP, NULL, decoder->options.callback_user_data);
  803. }
  804. return result;
  805. error:
  806. AWS_LOGF_ERROR(AWS_LS_MQTT5_CLIENT, "id=%p: PINGRESP decode failure", decoder->options.callback_user_data);
  807. return aws_raise_error(AWS_ERROR_MQTT5_DECODE_PROTOCOL_ERROR);
  808. }
  809. static int s_aws_mqtt5_decoder_decode_packet(struct aws_mqtt5_decoder *decoder) {
  810. enum aws_mqtt5_packet_type packet_type = (enum aws_mqtt5_packet_type)(decoder->packet_first_byte >> 4);
  811. aws_mqtt5_decoding_fn *decoder_fn = decoder->options.decoder_table->decoders_by_packet_type[packet_type];
  812. if (decoder_fn == NULL) {
  813. AWS_LOGF_ERROR(AWS_LS_MQTT5_CLIENT, "Decoder decode packet function missing for enum: %d", packet_type);
  814. return aws_raise_error(AWS_ERROR_MQTT5_DECODE_PROTOCOL_ERROR);
  815. }
  816. return (*decoder_fn)(decoder);
  817. }
  818. /*
  819. * (Streaming) Given a packet type and a variable length integer specifying the packet length, this state either
  820. * (1) decodes directly from the cursor if possible
  821. * (2) reads the packet into the scratch buffer and then decodes it once it is completely present
  822. *
  823. */
  824. static enum aws_mqtt5_decode_result_type s_aws_mqtt5_decoder_read_packet_on_data(
  825. struct aws_mqtt5_decoder *decoder,
  826. struct aws_byte_cursor *data) {
  827. /* Are we able to decode directly from the channel message data buffer? */
  828. if (decoder->scratch_space.len == 0 && decoder->remaining_length <= data->len) {
  829. /* The cursor contains the entire packet, so decode directly from the backing io message buffer */
  830. decoder->packet_cursor = aws_byte_cursor_advance(data, decoder->remaining_length);
  831. } else {
  832. /* If the packet is fragmented across multiple io messages, then we buffer it internally */
  833. size_t unread_length = decoder->remaining_length - decoder->scratch_space.len;
  834. size_t copy_length = aws_min_size(unread_length, data->len);
  835. struct aws_byte_cursor copy_cursor = aws_byte_cursor_advance(data, copy_length);
  836. if (aws_byte_buf_append_dynamic(&decoder->scratch_space, &copy_cursor)) {
  837. return AWS_MQTT5_DRT_ERROR;
  838. }
  839. if (copy_length < unread_length) {
  840. return AWS_MQTT5_DRT_MORE_DATA;
  841. }
  842. decoder->packet_cursor = aws_byte_cursor_from_buf(&decoder->scratch_space);
  843. }
  844. if (s_aws_mqtt5_decoder_decode_packet(decoder)) {
  845. return AWS_MQTT5_DRT_ERROR;
  846. }
  847. s_enter_state(decoder, AWS_MQTT5_DS_READ_PACKET_TYPE);
  848. return AWS_MQTT5_DRT_SUCCESS;
  849. }
  850. /* top-level entry function for all new data received from the remote mqtt endpoint */
  851. int aws_mqtt5_decoder_on_data_received(struct aws_mqtt5_decoder *decoder, struct aws_byte_cursor data) {
  852. enum aws_mqtt5_decode_result_type result = AWS_MQTT5_DRT_SUCCESS;
  853. while (result == AWS_MQTT5_DRT_SUCCESS) {
  854. switch (decoder->state) {
  855. case AWS_MQTT5_DS_READ_PACKET_TYPE:
  856. result = s_aws_mqtt5_decoder_read_packet_type_on_data(decoder, &data);
  857. break;
  858. case AWS_MQTT5_DS_READ_REMAINING_LENGTH:
  859. result = s_aws_mqtt5_decoder_read_remaining_length_on_data(decoder, &data);
  860. break;
  861. case AWS_MQTT5_DS_READ_PACKET:
  862. result = s_aws_mqtt5_decoder_read_packet_on_data(decoder, &data);
  863. break;
  864. default:
  865. result = AWS_MQTT5_DRT_ERROR;
  866. break;
  867. }
  868. }
  869. if (result == AWS_MQTT5_DRT_ERROR) {
  870. aws_raise_error(AWS_ERROR_MQTT5_DECODE_PROTOCOL_ERROR);
  871. decoder->state = AWS_MQTT5_DS_FATAL_ERROR;
  872. return AWS_OP_ERR;
  873. }
  874. return AWS_OP_SUCCESS;
  875. }
  876. static struct aws_mqtt5_decoder_function_table s_aws_mqtt5_decoder_default_function_table = {
  877. .decoders_by_packet_type =
  878. {
  879. NULL, /* RESERVED = 0 */
  880. NULL, /* CONNECT */
  881. &s_aws_mqtt5_decoder_decode_connack, /* CONNACK */
  882. &s_aws_mqtt5_decoder_decode_publish, /* PUBLISH */
  883. &s_aws_mqtt5_decoder_decode_puback, /* PUBACK */
  884. NULL, /* PUBREC */
  885. NULL, /* PUBREL */
  886. NULL, /* PUBCOMP */
  887. NULL, /* SUBSCRIBE */
  888. &s_aws_mqtt5_decoder_decode_suback, /* SUBACK */
  889. NULL, /* UNSUBSCRIBE */
  890. &s_aws_mqtt5_decoder_decode_unsuback, /* UNSUBACK */
  891. NULL, /* PINGREQ */
  892. &s_aws_mqtt5_decoder_decode_pingresp, /* PINGRESP */
  893. &s_aws_mqtt5_decoder_decode_disconnect, /* DISCONNECT */
  894. NULL /* AUTH */
  895. },
  896. };
  897. const struct aws_mqtt5_decoder_function_table *g_aws_mqtt5_default_decoder_table =
  898. &s_aws_mqtt5_decoder_default_function_table;
  899. int aws_mqtt5_decoder_init(
  900. struct aws_mqtt5_decoder *decoder,
  901. struct aws_allocator *allocator,
  902. struct aws_mqtt5_decoder_options *options) {
  903. AWS_ZERO_STRUCT(*decoder);
  904. decoder->options = *options;
  905. if (decoder->options.decoder_table == NULL) {
  906. decoder->options.decoder_table = g_aws_mqtt5_default_decoder_table;
  907. }
  908. decoder->allocator = allocator;
  909. decoder->state = AWS_MQTT5_DS_READ_PACKET_TYPE;
  910. if (aws_byte_buf_init(&decoder->scratch_space, allocator, AWS_MQTT5_DECODER_BUFFER_START_SIZE)) {
  911. return AWS_OP_ERR;
  912. }
  913. return AWS_OP_SUCCESS;
  914. }
  915. void aws_mqtt5_decoder_reset(struct aws_mqtt5_decoder *decoder) {
  916. s_reset_decoder_for_new_packet(decoder);
  917. decoder->state = AWS_MQTT5_DS_READ_PACKET_TYPE;
  918. }
  919. void aws_mqtt5_decoder_clean_up(struct aws_mqtt5_decoder *decoder) {
  920. aws_byte_buf_clean_up(&decoder->scratch_space);
  921. }
  922. void aws_mqtt5_decoder_set_inbound_topic_alias_resolver(
  923. struct aws_mqtt5_decoder *decoder,
  924. struct aws_mqtt5_inbound_topic_alias_resolver *resolver) {
  925. decoder->topic_alias_resolver = resolver;
  926. }