packets.c 35 KB

12345678910111213141516171819202122232425262728293031323334353637383940414243444546474849505152535455565758596061626364656667686970717273747576777879808182838485868788899091929394959697989910010110210310410510610710810911011111211311411511611711811912012112212312412512612712812913013113213313413513613713813914014114214314414514614714814915015115215315415515615715815916016116216316416516616716816917017117217317417517617717817918018118218318418518618718818919019119219319419519619719819920020120220320420520620720820921021121221321421521621721821922022122222322422522622722822923023123223323423523623723823924024124224324424524624724824925025125225325425525625725825926026126226326426526626726826927027127227327427527627727827928028128228328428528628728828929029129229329429529629729829930030130230330430530630730830931031131231331431531631731831932032132232332432532632732832933033133233333433533633733833934034134234334434534634734834935035135235335435535635735835936036136236336436536636736836937037137237337437537637737837938038138238338438538638738838939039139239339439539639739839940040140240340440540640740840941041141241341441541641741841942042142242342442542642742842943043143243343443543643743843944044144244344444544644744844945045145245345445545645745845946046146246346446546646746846947047147247347447547647747847948048148248348448548648748848949049149249349449549649749849950050150250350450550650750850951051151251351451551651751851952052152252352452552652752852953053153253353453553653753853954054154254354454554654754854955055155255355455555655755855956056156256356456556656756856957057157257357457557657757857958058158258358458558658758858959059159259359459559659759859960060160260360460560660760860961061161261361461561661761861962062162262362462562662762862963063163263363463563663763863964064164264364464564664764864965065165265365465565665765865966066166266366466566666766866967067167267367467567667767867968068168268368468568668768868969069169269369469569669769869970070170270370470570670770870971071171271371471571671771871972072172272372472572672772872973073173273373473573673773873974074174274374474574674774874975075175275375475575675775875976076176276376476576676776876977077177277377477577677777877978078178278378478578678778878979079179279379479579679779879980080180280380480580680780880981081181281381481581681781881982082182282382482582682782882983083183283383483583683783883984084184284384484584684784884985085185285385485585685785885986086186286386486586686786886987087187287387487587687787887988088188288388488588688788888989089189289389489589689789889990090190290390490590690790890991091191291391491591691791891992092192292392492592692792892993093193293393493593693793893994094194294394494594694794894995095195295395495595695795895996096196296396496596696796896997097197297397497597697797897998098198298398498598698798898999099199299399499599699799899910001001100210031004100510061007100810091010101110121013101410151016101710181019102010211022102310241025102610271028102910301031103210331034103510361037103810391040104110421043104410451046104710481049105010511052105310541055105610571058105910601061106210631064106510661067106810691070107110721073107410751076107710781079108010811082108310841085108610871088108910901091109210931094109510961097109810991100110111021103110411051106110711081109111011111112111311141115111611171118111911201121112211231124112511261127112811291130113111321133113411351136113711381139114011411142
  1. /**
  2. * Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved.
  3. * SPDX-License-Identifier: Apache-2.0.
  4. */
  5. #include <aws/mqtt/private/packets.h>
  6. enum { S_PROTOCOL_LEVEL = 4 };
  7. enum { S_BIT_1_FLAGS = 0x2 };
  8. static struct aws_byte_cursor s_protocol_name = {
  9. .ptr = (uint8_t *)"MQTT",
  10. .len = 4,
  11. };
  12. static size_t s_sizeof_encoded_buffer(struct aws_byte_cursor *buf) {
  13. return sizeof(uint16_t) + buf->len;
  14. }
  15. static int s_encode_buffer(struct aws_byte_buf *buf, const struct aws_byte_cursor cur) {
  16. AWS_PRECONDITION(buf);
  17. AWS_PRECONDITION(aws_byte_cursor_is_valid(&cur));
  18. /* Make sure the buffer isn't too big */
  19. if (cur.len > UINT16_MAX) {
  20. return aws_raise_error(AWS_ERROR_MQTT_BUFFER_TOO_BIG);
  21. }
  22. /* Write the length */
  23. if (!aws_byte_buf_write_be16(buf, (uint16_t)cur.len)) {
  24. return aws_raise_error(AWS_ERROR_SHORT_BUFFER);
  25. }
  26. /* Write the data */
  27. if (!aws_byte_buf_write(buf, cur.ptr, cur.len)) {
  28. return aws_raise_error(AWS_ERROR_SHORT_BUFFER);
  29. }
  30. return AWS_OP_SUCCESS;
  31. }
  32. static int s_decode_buffer(struct aws_byte_cursor *cur, struct aws_byte_cursor *buf) {
  33. AWS_PRECONDITION(cur);
  34. AWS_PRECONDITION(buf);
  35. /* Read the length */
  36. uint16_t len;
  37. if (!aws_byte_cursor_read_be16(cur, &len)) {
  38. return aws_raise_error(AWS_ERROR_SHORT_BUFFER);
  39. }
  40. /* Store the data */
  41. *buf = aws_byte_cursor_advance(cur, len);
  42. return AWS_OP_SUCCESS;
  43. }
  44. /*****************************************************************************/
  45. /* Ack without payload */
  46. static void s_ack_init(struct aws_mqtt_packet_ack *packet, enum aws_mqtt_packet_type type, uint16_t packet_identifier) {
  47. AWS_PRECONDITION(packet);
  48. AWS_ZERO_STRUCT(*packet);
  49. packet->fixed_header.packet_type = type;
  50. packet->fixed_header.remaining_length = sizeof(uint16_t);
  51. packet->packet_identifier = packet_identifier;
  52. }
  53. int aws_mqtt_packet_ack_encode(struct aws_byte_buf *buf, const struct aws_mqtt_packet_ack *packet) {
  54. AWS_PRECONDITION(buf);
  55. AWS_PRECONDITION(packet);
  56. /*************************************************************************/
  57. /* Fixed Header */
  58. if (aws_mqtt_fixed_header_encode(buf, &packet->fixed_header)) {
  59. return AWS_OP_ERR;
  60. }
  61. /*************************************************************************/
  62. /* Variable Header */
  63. /* Write packet identifier */
  64. if (!aws_byte_buf_write_be16(buf, packet->packet_identifier)) {
  65. return aws_raise_error(AWS_ERROR_SHORT_BUFFER);
  66. }
  67. return AWS_OP_SUCCESS;
  68. }
  69. int aws_mqtt_packet_ack_decode(struct aws_byte_cursor *cur, struct aws_mqtt_packet_ack *packet) {
  70. AWS_PRECONDITION(cur);
  71. AWS_PRECONDITION(packet);
  72. /*************************************************************************/
  73. /* Fixed Header */
  74. if (aws_mqtt_fixed_header_decode(cur, &packet->fixed_header)) {
  75. return AWS_OP_ERR;
  76. }
  77. /* Validate flags */
  78. if (packet->fixed_header.flags != (aws_mqtt_packet_has_flags(&packet->fixed_header) ? S_BIT_1_FLAGS : 0U)) {
  79. return aws_raise_error(AWS_ERROR_MQTT_INVALID_RESERVED_BITS);
  80. }
  81. /*************************************************************************/
  82. /* Variable Header */
  83. /* Read packet identifier */
  84. if (!aws_byte_cursor_read_be16(cur, &packet->packet_identifier)) {
  85. return aws_raise_error(AWS_ERROR_SHORT_BUFFER);
  86. }
  87. return AWS_OP_SUCCESS;
  88. }
  89. /*****************************************************************************/
  90. /* Connect */
  91. int aws_mqtt_packet_connect_init(
  92. struct aws_mqtt_packet_connect *packet,
  93. struct aws_byte_cursor client_identifier,
  94. bool clean_session,
  95. uint16_t keep_alive) {
  96. AWS_PRECONDITION(packet);
  97. AWS_PRECONDITION(client_identifier.len > 0);
  98. AWS_ZERO_STRUCT(*packet);
  99. packet->fixed_header.packet_type = AWS_MQTT_PACKET_CONNECT;
  100. /* [MQTT-3.1.1] */
  101. packet->fixed_header.remaining_length = 10 + s_sizeof_encoded_buffer(&client_identifier);
  102. packet->client_identifier = client_identifier;
  103. packet->clean_session = clean_session;
  104. packet->keep_alive_timeout = keep_alive;
  105. return AWS_OP_SUCCESS;
  106. }
  107. int aws_mqtt_packet_connect_add_credentials(
  108. struct aws_mqtt_packet_connect *packet,
  109. struct aws_byte_cursor username,
  110. struct aws_byte_cursor password) {
  111. AWS_PRECONDITION(packet);
  112. AWS_PRECONDITION(username.len > 0);
  113. if (!packet->has_username) {
  114. /* If not already username, add size of length field */
  115. packet->fixed_header.remaining_length += 2;
  116. }
  117. /* Add change in size to remaining_length */
  118. packet->fixed_header.remaining_length += username.len - packet->username.len;
  119. packet->has_username = true;
  120. packet->username = username;
  121. if (password.len > 0) {
  122. if (!packet->has_password) {
  123. /* If not already password, add size of length field */
  124. packet->fixed_header.remaining_length += 2;
  125. }
  126. /* Add change in size to remaining_length */
  127. packet->fixed_header.remaining_length += password.len - packet->password.len;
  128. packet->has_password = true;
  129. packet->password = password;
  130. }
  131. return AWS_OP_SUCCESS;
  132. }
  133. int aws_mqtt_packet_connect_add_will(
  134. struct aws_mqtt_packet_connect *packet,
  135. struct aws_byte_cursor topic,
  136. enum aws_mqtt_qos qos,
  137. bool retain,
  138. struct aws_byte_cursor payload) {
  139. packet->has_will = true;
  140. packet->will_topic = topic;
  141. packet->will_qos = qos;
  142. packet->will_retain = retain;
  143. packet->will_message = payload;
  144. packet->fixed_header.remaining_length += s_sizeof_encoded_buffer(&topic) + s_sizeof_encoded_buffer(&payload);
  145. return AWS_OP_SUCCESS;
  146. }
  147. int aws_mqtt_packet_connect_encode(struct aws_byte_buf *buf, const struct aws_mqtt_packet_connect *packet) {
  148. AWS_PRECONDITION(buf);
  149. AWS_PRECONDITION(packet);
  150. /* Do validation */
  151. if (packet->has_password && !packet->has_username) {
  152. return aws_raise_error(AWS_ERROR_MQTT_INVALID_CREDENTIALS);
  153. }
  154. /*************************************************************************/
  155. /* Fixed Header */
  156. if (aws_mqtt_fixed_header_encode(buf, &packet->fixed_header)) {
  157. return AWS_OP_ERR;
  158. }
  159. /*************************************************************************/
  160. /* Variable Header */
  161. /* Write protocol name */
  162. if (s_encode_buffer(buf, s_protocol_name)) {
  163. return aws_raise_error(AWS_ERROR_SHORT_BUFFER);
  164. }
  165. /* Write protocol level */
  166. if (!aws_byte_buf_write_u8(buf, S_PROTOCOL_LEVEL)) {
  167. return aws_raise_error(AWS_ERROR_SHORT_BUFFER);
  168. }
  169. /* Write connect flags [MQTT-3.1.2.3] */
  170. uint8_t connect_flags = (uint8_t)(
  171. packet->clean_session << 1 | packet->has_will << 2 | packet->will_qos << 3 | packet->will_retain << 5 |
  172. packet->has_password << 6 | packet->has_username << 7);
  173. if (!aws_byte_buf_write_u8(buf, connect_flags)) {
  174. return aws_raise_error(AWS_ERROR_SHORT_BUFFER);
  175. }
  176. /* Write keep alive */
  177. if (!aws_byte_buf_write_be16(buf, packet->keep_alive_timeout)) {
  178. return aws_raise_error(AWS_ERROR_SHORT_BUFFER);
  179. }
  180. /*************************************************************************/
  181. /* Payload */
  182. /* Client identifier is required, write it */
  183. if (s_encode_buffer(buf, packet->client_identifier)) {
  184. return AWS_OP_ERR;
  185. }
  186. /* Write will */
  187. if (packet->has_will) {
  188. if (s_encode_buffer(buf, packet->will_topic)) {
  189. return AWS_OP_ERR;
  190. }
  191. if (s_encode_buffer(buf, packet->will_message)) {
  192. return AWS_OP_ERR;
  193. }
  194. }
  195. /* Write username */
  196. if (packet->has_username) {
  197. if (s_encode_buffer(buf, packet->username)) {
  198. return AWS_OP_ERR;
  199. }
  200. }
  201. /* Write password */
  202. if (packet->has_password) {
  203. if (s_encode_buffer(buf, packet->password)) {
  204. return AWS_OP_ERR;
  205. }
  206. }
  207. return AWS_OP_SUCCESS;
  208. }
  209. int aws_mqtt_packet_connect_decode(struct aws_byte_cursor *cur, struct aws_mqtt_packet_connect *packet) {
  210. AWS_PRECONDITION(cur);
  211. AWS_PRECONDITION(packet);
  212. /*************************************************************************/
  213. /* Fixed Header */
  214. if (aws_mqtt_fixed_header_decode(cur, &packet->fixed_header)) {
  215. return AWS_OP_ERR;
  216. }
  217. /*************************************************************************/
  218. /* Variable Header */
  219. /* Check protocol name */
  220. struct aws_byte_cursor protocol_name = {
  221. .ptr = NULL,
  222. .len = 0,
  223. };
  224. if (s_decode_buffer(cur, &protocol_name)) {
  225. return aws_raise_error(AWS_ERROR_SHORT_BUFFER);
  226. }
  227. AWS_ASSERT(protocol_name.ptr && protocol_name.len);
  228. if (protocol_name.len != s_protocol_name.len) {
  229. return aws_raise_error(AWS_ERROR_MQTT_UNSUPPORTED_PROTOCOL_NAME);
  230. }
  231. if (memcmp(protocol_name.ptr, s_protocol_name.ptr, s_protocol_name.len) != 0) {
  232. return aws_raise_error(AWS_ERROR_MQTT_UNSUPPORTED_PROTOCOL_NAME);
  233. }
  234. /* Check protocol level */
  235. struct aws_byte_cursor protocol_level = aws_byte_cursor_advance(cur, 1);
  236. if (protocol_level.len == 0) {
  237. return aws_raise_error(AWS_ERROR_SHORT_BUFFER);
  238. }
  239. if (*protocol_level.ptr != S_PROTOCOL_LEVEL) {
  240. return aws_raise_error(AWS_ERROR_MQTT_UNSUPPORTED_PROTOCOL_LEVEL);
  241. }
  242. /* Read connect flags [MQTT-3.1.2.3] */
  243. uint8_t connect_flags = 0;
  244. if (!aws_byte_cursor_read_u8(cur, &connect_flags)) {
  245. return aws_raise_error(AWS_ERROR_SHORT_BUFFER);
  246. }
  247. packet->clean_session = (connect_flags >> 1) & 0x1;
  248. packet->has_will = (connect_flags >> 2) & 0x1;
  249. packet->will_qos = (connect_flags >> 3) & 0x3;
  250. packet->will_retain = (connect_flags >> 5) & 0x1;
  251. packet->has_password = (connect_flags >> 6) & 0x1;
  252. packet->has_username = (connect_flags >> 7) & 0x1;
  253. /* Read keep alive */
  254. if (!aws_byte_cursor_read_be16(cur, &packet->keep_alive_timeout)) {
  255. return aws_raise_error(AWS_ERROR_SHORT_BUFFER);
  256. }
  257. /*************************************************************************/
  258. /* Payload */
  259. /* Client identifier is required, Read it */
  260. if (s_decode_buffer(cur, &packet->client_identifier)) {
  261. return AWS_OP_ERR;
  262. }
  263. /* Read will */
  264. if (packet->has_will) {
  265. if (s_decode_buffer(cur, &packet->will_topic)) {
  266. return AWS_OP_ERR;
  267. }
  268. if (s_decode_buffer(cur, &packet->will_message)) {
  269. return AWS_OP_ERR;
  270. }
  271. }
  272. /* Read username */
  273. if (packet->has_username) {
  274. if (s_decode_buffer(cur, &packet->username)) {
  275. return AWS_OP_ERR;
  276. }
  277. }
  278. /* Read password */
  279. if (packet->has_password) {
  280. if (s_decode_buffer(cur, &packet->password)) {
  281. return AWS_OP_ERR;
  282. }
  283. }
  284. /* Do validation */
  285. if (packet->has_password && !packet->has_username) {
  286. return aws_raise_error(AWS_ERROR_MQTT_INVALID_CREDENTIALS);
  287. }
  288. return AWS_OP_SUCCESS;
  289. }
  290. /*****************************************************************************/
  291. /* Connack */
  292. int aws_mqtt_packet_connack_init(
  293. struct aws_mqtt_packet_connack *packet,
  294. bool session_present,
  295. enum aws_mqtt_connect_return_code return_code) {
  296. AWS_PRECONDITION(packet);
  297. AWS_ZERO_STRUCT(*packet);
  298. packet->fixed_header.packet_type = AWS_MQTT_PACKET_CONNACK;
  299. packet->fixed_header.remaining_length = 1 + sizeof(packet->connect_return_code);
  300. packet->session_present = session_present;
  301. packet->connect_return_code = (uint8_t)return_code;
  302. return AWS_OP_SUCCESS;
  303. }
  304. int aws_mqtt_packet_connack_encode(struct aws_byte_buf *buf, const struct aws_mqtt_packet_connack *packet) {
  305. AWS_PRECONDITION(buf);
  306. AWS_PRECONDITION(packet);
  307. /*************************************************************************/
  308. /* Fixed Header */
  309. if (aws_mqtt_fixed_header_encode(buf, &packet->fixed_header)) {
  310. return AWS_OP_ERR;
  311. }
  312. /*************************************************************************/
  313. /* Variable Header */
  314. /* Read connack flags */
  315. uint8_t connack_flags = packet->session_present & 0x1;
  316. if (!aws_byte_buf_write_u8(buf, connack_flags)) {
  317. return aws_raise_error(AWS_ERROR_SHORT_BUFFER);
  318. }
  319. /* Read return code */
  320. if (!aws_byte_buf_write_u8(buf, packet->connect_return_code)) {
  321. return aws_raise_error(AWS_ERROR_SHORT_BUFFER);
  322. }
  323. return AWS_OP_SUCCESS;
  324. }
  325. int aws_mqtt_packet_connack_decode(struct aws_byte_cursor *cur, struct aws_mqtt_packet_connack *packet) {
  326. AWS_PRECONDITION(cur);
  327. AWS_PRECONDITION(packet);
  328. /*************************************************************************/
  329. /* Fixed Header */
  330. if (aws_mqtt_fixed_header_decode(cur, &packet->fixed_header)) {
  331. return AWS_OP_ERR;
  332. }
  333. /*************************************************************************/
  334. /* Variable Header */
  335. /* Read connack flags */
  336. uint8_t connack_flags = 0;
  337. if (!aws_byte_cursor_read_u8(cur, &connack_flags)) {
  338. return aws_raise_error(AWS_ERROR_SHORT_BUFFER);
  339. }
  340. packet->session_present = connack_flags & 0x1;
  341. /* Read return code */
  342. if (!aws_byte_cursor_read_u8(cur, &packet->connect_return_code)) {
  343. return aws_raise_error(AWS_ERROR_SHORT_BUFFER);
  344. }
  345. return AWS_OP_SUCCESS;
  346. }
  347. /*****************************************************************************/
  348. /* Publish */
  349. int aws_mqtt_packet_publish_init(
  350. struct aws_mqtt_packet_publish *packet,
  351. bool retain,
  352. enum aws_mqtt_qos qos,
  353. bool dup,
  354. struct aws_byte_cursor topic_name,
  355. uint16_t packet_identifier,
  356. struct aws_byte_cursor payload) {
  357. AWS_PRECONDITION(packet);
  358. AWS_FATAL_PRECONDITION(topic_name.len > 0); /* [MQTT-4.7.3-1] */
  359. AWS_ZERO_STRUCT(*packet);
  360. packet->fixed_header.packet_type = AWS_MQTT_PACKET_PUBLISH;
  361. packet->fixed_header.remaining_length = s_sizeof_encoded_buffer(&topic_name) + payload.len;
  362. if (qos > 0) {
  363. packet->fixed_header.remaining_length += sizeof(packet->packet_identifier);
  364. }
  365. /* [MQTT-2.2.2] */
  366. uint8_t publish_flags = (uint8_t)((retain & 0x1) | (qos & 0x3) << 1 | (dup & 0x1) << 3);
  367. packet->fixed_header.flags = publish_flags;
  368. packet->topic_name = topic_name;
  369. packet->packet_identifier = packet_identifier;
  370. packet->payload = payload;
  371. return AWS_OP_SUCCESS;
  372. }
  373. int aws_mqtt_packet_publish_encode(struct aws_byte_buf *buf, const struct aws_mqtt_packet_publish *packet) {
  374. if (aws_mqtt_packet_publish_encode_headers(buf, packet)) {
  375. return AWS_OP_ERR;
  376. }
  377. /*************************************************************************/
  378. /* Payload */
  379. if (!aws_byte_buf_write(buf, packet->payload.ptr, packet->payload.len)) {
  380. return aws_raise_error(AWS_ERROR_SHORT_BUFFER);
  381. }
  382. return AWS_OP_SUCCESS;
  383. }
  384. int aws_mqtt_packet_publish_encode_headers(struct aws_byte_buf *buf, const struct aws_mqtt_packet_publish *packet) {
  385. AWS_PRECONDITION(buf);
  386. AWS_PRECONDITION(packet);
  387. /*************************************************************************/
  388. /* Fixed Header */
  389. if (aws_mqtt_fixed_header_encode(buf, &packet->fixed_header)) {
  390. return AWS_OP_ERR;
  391. }
  392. /*************************************************************************/
  393. /* Variable Header */
  394. /* Write topic name */
  395. if (s_encode_buffer(buf, packet->topic_name)) {
  396. return AWS_OP_ERR;
  397. }
  398. enum aws_mqtt_qos qos = aws_mqtt_packet_publish_get_qos(packet);
  399. if (qos > 0) {
  400. /* Write packet identifier */
  401. if (!aws_byte_buf_write_be16(buf, packet->packet_identifier)) {
  402. return aws_raise_error(AWS_ERROR_SHORT_BUFFER);
  403. }
  404. }
  405. return AWS_OP_SUCCESS;
  406. }
  407. int aws_mqtt_packet_publish_decode(struct aws_byte_cursor *cur, struct aws_mqtt_packet_publish *packet) {
  408. AWS_PRECONDITION(cur);
  409. AWS_PRECONDITION(packet);
  410. /*************************************************************************/
  411. /* Fixed Header */
  412. if (aws_mqtt_fixed_header_decode(cur, &packet->fixed_header)) {
  413. return AWS_OP_ERR;
  414. }
  415. /*************************************************************************/
  416. /* Variable Header */
  417. /* Read topic name */
  418. if (s_decode_buffer(cur, &packet->topic_name)) {
  419. return AWS_OP_ERR;
  420. }
  421. size_t payload_size = packet->fixed_header.remaining_length - s_sizeof_encoded_buffer(&packet->topic_name);
  422. /* Read QoS */
  423. enum aws_mqtt_qos qos = aws_mqtt_packet_publish_get_qos(packet);
  424. if (qos > 2) {
  425. return aws_raise_error(AWS_ERROR_MQTT_PROTOCOL_ERROR);
  426. }
  427. /* Read packet identifier */
  428. if (qos > 0) {
  429. if (!aws_byte_cursor_read_be16(cur, &packet->packet_identifier)) {
  430. return aws_raise_error(AWS_ERROR_SHORT_BUFFER);
  431. }
  432. payload_size -= sizeof(packet->packet_identifier);
  433. } else {
  434. packet->packet_identifier = 0;
  435. }
  436. /*************************************************************************/
  437. /* Payload */
  438. packet->payload = aws_byte_cursor_advance(cur, payload_size);
  439. if (packet->payload.len != payload_size) {
  440. return aws_raise_error(AWS_ERROR_SHORT_BUFFER);
  441. }
  442. return AWS_OP_SUCCESS;
  443. }
  444. bool aws_mqtt_packet_publish_get_dup(const struct aws_mqtt_packet_publish *packet) {
  445. return packet->fixed_header.flags & (1 << 3); /* bit 3 */
  446. }
  447. enum aws_mqtt_qos aws_mqtt_packet_publish_get_qos(const struct aws_mqtt_packet_publish *packet) {
  448. return (packet->fixed_header.flags >> 1) & 0x3; /* bits 2,1 */
  449. }
  450. bool aws_mqtt_packet_publish_get_retain(const struct aws_mqtt_packet_publish *packet) {
  451. return packet->fixed_header.flags & 0x1; /* bit 0 */
  452. }
  453. /*****************************************************************************/
  454. /* Puback */
  455. int aws_mqtt_packet_puback_init(struct aws_mqtt_packet_ack *packet, uint16_t packet_identifier) {
  456. s_ack_init(packet, AWS_MQTT_PACKET_PUBACK, packet_identifier);
  457. return AWS_OP_SUCCESS;
  458. }
  459. /*****************************************************************************/
  460. /* Pubrec */
  461. int aws_mqtt_packet_pubrec_init(struct aws_mqtt_packet_ack *packet, uint16_t packet_identifier) {
  462. s_ack_init(packet, AWS_MQTT_PACKET_PUBREC, packet_identifier);
  463. return AWS_OP_SUCCESS;
  464. }
  465. /*****************************************************************************/
  466. /* Pubrel */
  467. int aws_mqtt_packet_pubrel_init(struct aws_mqtt_packet_ack *packet, uint16_t packet_identifier) {
  468. s_ack_init(packet, AWS_MQTT_PACKET_PUBREL, packet_identifier);
  469. packet->fixed_header.flags = S_BIT_1_FLAGS;
  470. return AWS_OP_SUCCESS;
  471. }
  472. /*****************************************************************************/
  473. /* Pubcomp */
  474. int aws_mqtt_packet_pubcomp_init(struct aws_mqtt_packet_ack *packet, uint16_t packet_identifier) {
  475. s_ack_init(packet, AWS_MQTT_PACKET_PUBCOMP, packet_identifier);
  476. return AWS_OP_SUCCESS;
  477. }
  478. /*****************************************************************************/
  479. /* Subscribe */
  480. int aws_mqtt_packet_subscribe_init(
  481. struct aws_mqtt_packet_subscribe *packet,
  482. struct aws_allocator *allocator,
  483. uint16_t packet_identifier) {
  484. AWS_PRECONDITION(packet);
  485. AWS_ZERO_STRUCT(*packet);
  486. packet->fixed_header.packet_type = AWS_MQTT_PACKET_SUBSCRIBE;
  487. packet->fixed_header.flags = S_BIT_1_FLAGS;
  488. packet->fixed_header.remaining_length = sizeof(uint16_t);
  489. packet->packet_identifier = packet_identifier;
  490. if (aws_array_list_init_dynamic(&packet->topic_filters, allocator, 1, sizeof(struct aws_mqtt_subscription))) {
  491. return AWS_OP_ERR;
  492. }
  493. return AWS_OP_SUCCESS;
  494. }
  495. void aws_mqtt_packet_subscribe_clean_up(struct aws_mqtt_packet_subscribe *packet) {
  496. AWS_PRECONDITION(packet);
  497. aws_array_list_clean_up(&packet->topic_filters);
  498. AWS_ZERO_STRUCT(*packet);
  499. }
  500. int aws_mqtt_packet_subscribe_add_topic(
  501. struct aws_mqtt_packet_subscribe *packet,
  502. struct aws_byte_cursor topic_filter,
  503. enum aws_mqtt_qos qos) {
  504. AWS_PRECONDITION(packet);
  505. /* Add to the array list */
  506. struct aws_mqtt_subscription subscription;
  507. subscription.topic_filter = topic_filter;
  508. subscription.qos = qos;
  509. if (aws_array_list_push_back(&packet->topic_filters, &subscription)) {
  510. return AWS_OP_ERR;
  511. }
  512. /* Add to the remaining length */
  513. packet->fixed_header.remaining_length += s_sizeof_encoded_buffer(&topic_filter) + 1;
  514. return AWS_OP_SUCCESS;
  515. }
  516. int aws_mqtt_packet_subscribe_encode(struct aws_byte_buf *buf, const struct aws_mqtt_packet_subscribe *packet) {
  517. AWS_PRECONDITION(buf);
  518. AWS_PRECONDITION(packet);
  519. /*************************************************************************/
  520. /* Fixed Header */
  521. if (aws_mqtt_fixed_header_encode(buf, &packet->fixed_header)) {
  522. return AWS_OP_ERR;
  523. }
  524. /*************************************************************************/
  525. /* Variable Header */
  526. /* Write packet identifier */
  527. if (!aws_byte_buf_write_be16(buf, packet->packet_identifier)) {
  528. return aws_raise_error(AWS_ERROR_SHORT_BUFFER);
  529. }
  530. /* Write topic filters */
  531. const size_t num_filters = aws_array_list_length(&packet->topic_filters);
  532. for (size_t i = 0; i < num_filters; ++i) {
  533. struct aws_mqtt_subscription *subscription;
  534. if (aws_array_list_get_at_ptr(&packet->topic_filters, (void **)&subscription, i)) {
  535. return AWS_OP_ERR;
  536. }
  537. s_encode_buffer(buf, subscription->topic_filter);
  538. uint8_t eos_byte = subscription->qos & 0x3;
  539. if (!aws_byte_buf_write_u8(buf, eos_byte)) {
  540. return aws_raise_error(AWS_ERROR_SHORT_BUFFER);
  541. }
  542. }
  543. return AWS_OP_SUCCESS;
  544. }
  545. int aws_mqtt_packet_subscribe_decode(struct aws_byte_cursor *cur, struct aws_mqtt_packet_subscribe *packet) {
  546. AWS_PRECONDITION(cur);
  547. AWS_PRECONDITION(packet);
  548. /*************************************************************************/
  549. /* Fixed Header */
  550. if (aws_mqtt_fixed_header_decode(cur, &packet->fixed_header)) {
  551. return AWS_OP_ERR;
  552. }
  553. if (packet->fixed_header.remaining_length < sizeof(uint16_t)) {
  554. return aws_raise_error(AWS_ERROR_MQTT_INVALID_REMAINING_LENGTH);
  555. }
  556. /*************************************************************************/
  557. /* Variable Header */
  558. /* Read packet identifier */
  559. if (!aws_byte_cursor_read_be16(cur, &packet->packet_identifier)) {
  560. return aws_raise_error(AWS_ERROR_SHORT_BUFFER);
  561. }
  562. /* Read topic filters */
  563. size_t remaining_length = packet->fixed_header.remaining_length - sizeof(uint16_t);
  564. while (remaining_length) {
  565. struct aws_mqtt_subscription subscription = {
  566. .topic_filter = {.ptr = NULL, .len = 0},
  567. .qos = 0,
  568. };
  569. if (s_decode_buffer(cur, &subscription.topic_filter)) {
  570. return AWS_OP_ERR;
  571. }
  572. uint8_t eos_byte = 0;
  573. if (!aws_byte_cursor_read_u8(cur, &eos_byte)) {
  574. return aws_raise_error(AWS_ERROR_SHORT_BUFFER);
  575. }
  576. if ((eos_byte >> 2) != 0) {
  577. return aws_raise_error(AWS_ERROR_MQTT_INVALID_RESERVED_BITS);
  578. }
  579. if (eos_byte == 0x3) {
  580. return aws_raise_error(AWS_ERROR_MQTT_INVALID_QOS);
  581. }
  582. subscription.qos = eos_byte & 0x3;
  583. aws_array_list_push_back(&packet->topic_filters, &subscription);
  584. remaining_length -= s_sizeof_encoded_buffer(&subscription.topic_filter) + 1;
  585. }
  586. return AWS_OP_SUCCESS;
  587. }
  588. /*****************************************************************************/
  589. /* Suback */
  590. int aws_mqtt_packet_suback_init(
  591. struct aws_mqtt_packet_suback *packet,
  592. struct aws_allocator *allocator,
  593. uint16_t packet_identifier) {
  594. AWS_PRECONDITION(packet);
  595. AWS_ZERO_STRUCT(*packet);
  596. packet->fixed_header.packet_type = AWS_MQTT_PACKET_SUBACK;
  597. packet->fixed_header.remaining_length = sizeof(uint16_t);
  598. packet->packet_identifier = packet_identifier;
  599. if (aws_array_list_init_dynamic(&packet->return_codes, allocator, 1, sizeof(uint8_t))) {
  600. return AWS_OP_ERR;
  601. }
  602. return AWS_OP_SUCCESS;
  603. }
  604. void aws_mqtt_packet_suback_clean_up(struct aws_mqtt_packet_suback *packet) {
  605. AWS_PRECONDITION(packet);
  606. aws_array_list_clean_up(&packet->return_codes);
  607. AWS_ZERO_STRUCT(*packet);
  608. }
  609. static bool s_return_code_check(uint8_t return_code) {
  610. if (return_code != AWS_MQTT_QOS_FAILURE && return_code != AWS_MQTT_QOS_AT_MOST_ONCE &&
  611. return_code != AWS_MQTT_QOS_AT_LEAST_ONCE && return_code != AWS_MQTT_QOS_EXACTLY_ONCE) {
  612. return false;
  613. }
  614. return true;
  615. }
  616. int aws_mqtt_packet_suback_add_return_code(struct aws_mqtt_packet_suback *packet, uint8_t return_code) {
  617. AWS_PRECONDITION(packet);
  618. if (!(s_return_code_check(return_code))) {
  619. return aws_raise_error(AWS_ERROR_MQTT_PROTOCOL_ERROR);
  620. }
  621. /* Add to the array list */
  622. if (aws_array_list_push_back(&packet->return_codes, &return_code)) {
  623. return AWS_OP_ERR;
  624. }
  625. /* Add to the remaining length, each return code takes one byte */
  626. packet->fixed_header.remaining_length += 1;
  627. return AWS_OP_SUCCESS;
  628. }
  629. int aws_mqtt_packet_suback_encode(struct aws_byte_buf *buf, const struct aws_mqtt_packet_suback *packet) {
  630. AWS_PRECONDITION(buf);
  631. AWS_PRECONDITION(packet);
  632. /*************************************************************************/
  633. /* Fixed Header */
  634. if (aws_mqtt_fixed_header_encode(buf, &packet->fixed_header)) {
  635. return AWS_OP_ERR;
  636. }
  637. /*************************************************************************/
  638. /* Variable Header */
  639. /* Write packet identifier */
  640. if (!aws_byte_buf_write_be16(buf, packet->packet_identifier)) {
  641. return aws_raise_error(AWS_ERROR_SHORT_BUFFER);
  642. }
  643. /*************************************************************************/
  644. /* Payload */
  645. /* Write topic filters */
  646. const size_t num_filters = aws_array_list_length(&packet->return_codes);
  647. for (size_t i = 0; i < num_filters; ++i) {
  648. uint8_t return_code = 0;
  649. if (aws_array_list_get_at(&packet->return_codes, (void *)&return_code, i)) {
  650. return AWS_OP_ERR;
  651. }
  652. if (!aws_byte_buf_write_u8(buf, return_code)) {
  653. return aws_raise_error(AWS_ERROR_SHORT_BUFFER);
  654. }
  655. }
  656. return AWS_OP_SUCCESS;
  657. }
  658. int aws_mqtt_packet_suback_decode(struct aws_byte_cursor *cur, struct aws_mqtt_packet_suback *packet) {
  659. AWS_PRECONDITION(cur);
  660. AWS_PRECONDITION(packet);
  661. /*************************************************************************/
  662. /* Fixed Header */
  663. if (aws_mqtt_fixed_header_decode(cur, &packet->fixed_header)) {
  664. return AWS_OP_ERR;
  665. }
  666. /* Validate flags */
  667. if (packet->fixed_header.flags != (aws_mqtt_packet_has_flags(&packet->fixed_header) ? S_BIT_1_FLAGS : 0U)) {
  668. return aws_raise_error(AWS_ERROR_MQTT_INVALID_RESERVED_BITS);
  669. }
  670. /*************************************************************************/
  671. /* Variable Header */
  672. /* Read packet identifier */
  673. if (!aws_byte_cursor_read_be16(cur, &packet->packet_identifier)) {
  674. return aws_raise_error(AWS_ERROR_SHORT_BUFFER);
  675. }
  676. /*************************************************************************/
  677. /* Payload */
  678. /* Read return codes */
  679. size_t remaining_length = packet->fixed_header.remaining_length - sizeof(uint16_t);
  680. while (remaining_length) {
  681. uint8_t return_code = 0;
  682. if (!aws_byte_cursor_read_u8(cur, &return_code)) {
  683. return aws_raise_error(AWS_ERROR_SHORT_BUFFER);
  684. }
  685. if (!(s_return_code_check(return_code))) {
  686. return aws_raise_error(AWS_ERROR_MQTT_PROTOCOL_ERROR);
  687. }
  688. aws_array_list_push_back(&packet->return_codes, &return_code);
  689. remaining_length -= 1;
  690. }
  691. return AWS_OP_SUCCESS;
  692. }
  693. /*****************************************************************************/
  694. /* Unsubscribe */
  695. int aws_mqtt_packet_unsubscribe_init(
  696. struct aws_mqtt_packet_unsubscribe *packet,
  697. struct aws_allocator *allocator,
  698. uint16_t packet_identifier) {
  699. AWS_PRECONDITION(packet);
  700. AWS_PRECONDITION(allocator);
  701. AWS_ZERO_STRUCT(*packet);
  702. packet->fixed_header.packet_type = AWS_MQTT_PACKET_UNSUBSCRIBE;
  703. packet->fixed_header.flags = S_BIT_1_FLAGS;
  704. packet->fixed_header.remaining_length = sizeof(uint16_t);
  705. packet->packet_identifier = packet_identifier;
  706. if (aws_array_list_init_dynamic(&packet->topic_filters, allocator, 1, sizeof(struct aws_byte_cursor))) {
  707. return AWS_OP_ERR;
  708. }
  709. return AWS_OP_SUCCESS;
  710. }
  711. void aws_mqtt_packet_unsubscribe_clean_up(struct aws_mqtt_packet_unsubscribe *packet) {
  712. AWS_PRECONDITION(packet);
  713. aws_array_list_clean_up(&packet->topic_filters);
  714. AWS_ZERO_STRUCT(*packet);
  715. }
  716. int aws_mqtt_packet_unsubscribe_add_topic(
  717. struct aws_mqtt_packet_unsubscribe *packet,
  718. struct aws_byte_cursor topic_filter) {
  719. AWS_PRECONDITION(packet);
  720. /* Add to the array list */
  721. if (aws_array_list_push_back(&packet->topic_filters, &topic_filter)) {
  722. return AWS_OP_ERR;
  723. }
  724. /* Add to the remaining length */
  725. packet->fixed_header.remaining_length += s_sizeof_encoded_buffer(&topic_filter);
  726. return AWS_OP_SUCCESS;
  727. }
  728. int aws_mqtt_packet_unsubscribe_encode(struct aws_byte_buf *buf, const struct aws_mqtt_packet_unsubscribe *packet) {
  729. AWS_PRECONDITION(buf);
  730. AWS_PRECONDITION(packet);
  731. /*************************************************************************/
  732. /* Fixed Header */
  733. if (aws_mqtt_fixed_header_encode(buf, &packet->fixed_header)) {
  734. return AWS_OP_ERR;
  735. }
  736. /*************************************************************************/
  737. /* Variable Header */
  738. /* Write packet identifier */
  739. if (!aws_byte_buf_write_be16(buf, packet->packet_identifier)) {
  740. return aws_raise_error(AWS_ERROR_SHORT_BUFFER);
  741. }
  742. /* Write topic filters */
  743. const size_t num_filters = aws_array_list_length(&packet->topic_filters);
  744. for (size_t i = 0; i < num_filters; ++i) {
  745. struct aws_byte_cursor topic_filter = {.ptr = NULL, .len = 0};
  746. if (aws_array_list_get_at(&packet->topic_filters, (void *)&topic_filter, i)) {
  747. return AWS_OP_ERR;
  748. }
  749. s_encode_buffer(buf, topic_filter);
  750. }
  751. return AWS_OP_SUCCESS;
  752. }
  753. int aws_mqtt_packet_unsubscribe_decode(struct aws_byte_cursor *cur, struct aws_mqtt_packet_unsubscribe *packet) {
  754. AWS_PRECONDITION(cur);
  755. AWS_PRECONDITION(packet);
  756. /*************************************************************************/
  757. /* Fixed Header */
  758. if (aws_mqtt_fixed_header_decode(cur, &packet->fixed_header)) {
  759. return AWS_OP_ERR;
  760. }
  761. if (packet->fixed_header.remaining_length < sizeof(uint16_t)) {
  762. return aws_raise_error(AWS_ERROR_MQTT_INVALID_REMAINING_LENGTH);
  763. }
  764. /*************************************************************************/
  765. /* Variable Header */
  766. /* Read packet identifier */
  767. if (!aws_byte_cursor_read_be16(cur, &packet->packet_identifier)) {
  768. return aws_raise_error(AWS_ERROR_SHORT_BUFFER);
  769. }
  770. /* Read topic filters */
  771. size_t remaining_length = packet->fixed_header.remaining_length - sizeof(uint16_t);
  772. while (remaining_length) {
  773. struct aws_byte_cursor topic_filter;
  774. AWS_ZERO_STRUCT(topic_filter);
  775. if (s_decode_buffer(cur, &topic_filter)) {
  776. return AWS_OP_ERR;
  777. }
  778. aws_array_list_push_back(&packet->topic_filters, &topic_filter);
  779. remaining_length -= s_sizeof_encoded_buffer(&topic_filter);
  780. }
  781. return AWS_OP_SUCCESS;
  782. }
  783. /*****************************************************************************/
  784. /* Unsuback */
  785. int aws_mqtt_packet_unsuback_init(struct aws_mqtt_packet_ack *packet, uint16_t packet_identifier) {
  786. s_ack_init(packet, AWS_MQTT_PACKET_UNSUBACK, packet_identifier);
  787. return AWS_OP_SUCCESS;
  788. }
  789. /*****************************************************************************/
  790. /* Ping request/response */
  791. static void s_connection_init(struct aws_mqtt_packet_connection *packet, enum aws_mqtt_packet_type type) {
  792. AWS_PRECONDITION(packet);
  793. AWS_ZERO_STRUCT(*packet);
  794. packet->fixed_header.packet_type = type;
  795. }
  796. int aws_mqtt_packet_pingreq_init(struct aws_mqtt_packet_connection *packet) {
  797. s_connection_init(packet, AWS_MQTT_PACKET_PINGREQ);
  798. return AWS_OP_SUCCESS;
  799. }
  800. int aws_mqtt_packet_pingresp_init(struct aws_mqtt_packet_connection *packet) {
  801. s_connection_init(packet, AWS_MQTT_PACKET_PINGRESP);
  802. return AWS_OP_SUCCESS;
  803. }
  804. int aws_mqtt_packet_disconnect_init(struct aws_mqtt_packet_connection *packet) {
  805. s_connection_init(packet, AWS_MQTT_PACKET_DISCONNECT);
  806. return AWS_OP_SUCCESS;
  807. }
  808. int aws_mqtt_packet_connection_encode(struct aws_byte_buf *buf, const struct aws_mqtt_packet_connection *packet) {
  809. AWS_PRECONDITION(buf);
  810. AWS_PRECONDITION(packet);
  811. /*************************************************************************/
  812. /* Fixed Header */
  813. if (aws_mqtt_fixed_header_encode(buf, &packet->fixed_header)) {
  814. return AWS_OP_ERR;
  815. }
  816. return AWS_OP_SUCCESS;
  817. }
  818. int aws_mqtt_packet_connection_decode(struct aws_byte_cursor *cur, struct aws_mqtt_packet_connection *packet) {
  819. AWS_PRECONDITION(cur);
  820. AWS_PRECONDITION(packet);
  821. /*************************************************************************/
  822. /* Fixed Header */
  823. if (aws_mqtt_fixed_header_decode(cur, &packet->fixed_header)) {
  824. return AWS_OP_ERR;
  825. }
  826. return AWS_OP_SUCCESS;
  827. }