ws_client.c 27 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737738739740741742743744
  1. // Copyright (C) 2020 Timotej Šiškovič
  2. // SPDX-License-Identifier: GPL-3.0-only
  3. //
  4. // This program is free software: you can redistribute it and/or modify it
  5. // under the terms of the GNU General Public License as published by the Free Software Foundation, version 3.
  6. //
  7. // This program is distributed in the hope that it will be useful, but WITHOUT ANY WARRANTY;
  8. // without even the implied warranty of MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.
  9. // See the GNU General Public License for more details.
  10. //
  11. // You should have received a copy of the GNU General Public License along with this program.
  12. // If not, see <https://www.gnu.org/licenses/>.
  13. #include <fcntl.h>
  14. #include <unistd.h>
  15. #include <string.h>
  16. #include <errno.h>
  17. #include <ctype.h>
  18. #include <openssl/evp.h>
  19. #include "ws_client.h"
  20. #include "common_internal.h"
  21. #ifdef MQTT_WEBSOCKETS_DEBUG
  22. #include "../c-rbuf/src/ringbuffer_internal.h"
  23. #endif
  24. #define UNIT_LOG_PREFIX "ws_client: "
  25. #define FATAL(fmt, ...) mws_fatal(client->log, UNIT_LOG_PREFIX fmt, ##__VA_ARGS__)
  26. #define ERROR(fmt, ...) mws_error(client->log, UNIT_LOG_PREFIX fmt, ##__VA_ARGS__)
  27. #define WARN(fmt, ...) mws_warn (client->log, UNIT_LOG_PREFIX fmt, ##__VA_ARGS__)
  28. #define INFO(fmt, ...) mws_info (client->log, UNIT_LOG_PREFIX fmt, ##__VA_ARGS__)
  29. #define DEBUG(fmt, ...) mws_debug(client->log, UNIT_LOG_PREFIX fmt, ##__VA_ARGS__)
  30. const char *websocket_upgrage_hdr = "GET /mqtt HTTP/1.1\x0D\x0A"
  31. "Host: %s\x0D\x0A"
  32. "Upgrade: websocket\x0D\x0A"
  33. "Connection: Upgrade\x0D\x0A"
  34. "Sec-WebSocket-Key: %s\x0D\x0A"
  35. "Origin: http://example.com\x0D\x0A"
  36. "Sec-WebSocket-Protocol: mqtt\x0D\x0A"
  37. "Sec-WebSocket-Version: 13\x0D\x0A\x0D\x0A";
  38. const char *mqtt_protoid = "258EAFA5-E914-47DA-95CA-C5AB0DC85B11";
  39. #define DEFAULT_RINGBUFFER_SIZE (1024*128)
  40. #define ENTROPY_SOURCE "/dev/urandom"
  41. ws_client *ws_client_new(size_t buf_size, char **host, mqtt_wss_log_ctx_t log)
  42. {
  43. ws_client *client;
  44. if(!host)
  45. return NULL;
  46. client = mw_calloc(1, sizeof(ws_client));
  47. if (!client)
  48. return NULL;
  49. client->host = host;
  50. client->log = log;
  51. client->buf_read = rbuf_create(buf_size ? buf_size : DEFAULT_RINGBUFFER_SIZE);
  52. if (!client->buf_read)
  53. goto cleanup;
  54. client->buf_write = rbuf_create(buf_size ? buf_size : DEFAULT_RINGBUFFER_SIZE);
  55. if (!client->buf_write)
  56. goto cleanup_1;
  57. client->buf_to_mqtt = rbuf_create(buf_size ? buf_size : DEFAULT_RINGBUFFER_SIZE);
  58. if (!client->buf_to_mqtt)
  59. goto cleanup_2;
  60. client->entropy_fd = open(ENTROPY_SOURCE, O_RDONLY);
  61. if (client->entropy_fd < 1) {
  62. ERROR("Error opening entropy source \"" ENTROPY_SOURCE "\". Reason: \"%s\"", strerror(errno));
  63. goto cleanup_3;
  64. }
  65. return client;
  66. cleanup_3:
  67. rbuf_free(client->buf_to_mqtt);
  68. cleanup_2:
  69. rbuf_free(client->buf_write);
  70. cleanup_1:
  71. rbuf_free(client->buf_read);
  72. cleanup:
  73. mw_free(client);
  74. return NULL;
  75. }
  76. void ws_client_free_headers(ws_client *client)
  77. {
  78. struct http_header *ptr = client->hs.headers;
  79. struct http_header *tmp;
  80. while (ptr) {
  81. tmp = ptr;
  82. ptr = ptr->next;
  83. mw_free(tmp);
  84. }
  85. client->hs.headers = NULL;
  86. client->hs.headers_tail = NULL;
  87. client->hs.hdr_count = 0;
  88. }
  89. void ws_client_destroy(ws_client *client)
  90. {
  91. ws_client_free_headers(client);
  92. mw_free(client->hs.nonce_reply);
  93. mw_free(client->hs.http_reply_msg);
  94. close(client->entropy_fd);
  95. rbuf_free(client->buf_read);
  96. rbuf_free(client->buf_write);
  97. rbuf_free(client->buf_to_mqtt);
  98. mw_free(client);
  99. }
  100. void ws_client_reset(ws_client *client)
  101. {
  102. ws_client_free_headers(client);
  103. mw_free(client->hs.nonce_reply);
  104. client->hs.nonce_reply = NULL;
  105. mw_free(client->hs.http_reply_msg);
  106. client->hs.http_reply_msg = NULL;
  107. rbuf_flush(client->buf_read);
  108. rbuf_flush(client->buf_write);
  109. rbuf_flush(client->buf_to_mqtt);
  110. client->state = WS_RAW;
  111. client->hs.hdr_state = WS_HDR_HTTP;
  112. client->rx.parse_state = WS_FIRST_2BYTES;
  113. }
  114. #define MAX_HTTP_HDR_COUNT 128
  115. int ws_client_add_http_header(ws_client *client, struct http_header *hdr)
  116. {
  117. if (client->hs.hdr_count > MAX_HTTP_HDR_COUNT) {
  118. ERROR("Too many HTTP response header fields");
  119. return -1;
  120. }
  121. if (client->hs.headers)
  122. client->hs.headers_tail->next = hdr;
  123. else
  124. client->hs.headers = hdr;
  125. client->hs.headers_tail = hdr;
  126. client->hs.hdr_count++;
  127. return 0;
  128. }
  129. int ws_client_want_write(ws_client *client)
  130. {
  131. return rbuf_bytes_available(client->buf_write);
  132. }
  133. #define RAND_SRC "/dev/urandom"
  134. static int ws_client_get_nonce(ws_client *client, char *dest, unsigned int size)
  135. {
  136. // we do not need crypto secure random here
  137. // it's just used for protocol negotiation
  138. int rd;
  139. int f = open(RAND_SRC, O_RDONLY);
  140. if (f < 0) {
  141. ERROR("Error opening \"%s\". Err: \"%s\"", RAND_SRC, strerror(errno));
  142. return -2;
  143. }
  144. if ((rd = read(f, dest, size)) > 0) {
  145. close(f);
  146. return rd;
  147. }
  148. close(f);
  149. return -1;
  150. }
  151. #define WEBSOCKET_NONCE_SIZE 16
  152. #define TEMP_BUF_SIZE 4096
  153. int ws_client_start_handshake(ws_client *client)
  154. {
  155. char nonce[WEBSOCKET_NONCE_SIZE];
  156. char nonce_b64[256];
  157. char second[TEMP_BUF_SIZE];
  158. unsigned int md_len;
  159. unsigned char *digest;
  160. EVP_MD_CTX *md_ctx;
  161. const EVP_MD *md;
  162. if(!*client->host) {
  163. ERROR("Hostname has not been set. We should not be able to come here!");
  164. return 1;
  165. }
  166. ws_client_get_nonce(client, nonce, WEBSOCKET_NONCE_SIZE);
  167. EVP_EncodeBlock((unsigned char *)nonce_b64, (const unsigned char *)nonce, WEBSOCKET_NONCE_SIZE);
  168. snprintf(second, TEMP_BUF_SIZE, websocket_upgrage_hdr,
  169. *client->host,
  170. nonce_b64);
  171. if(rbuf_bytes_free(client->buf_write) < strlen(second)) {
  172. ERROR("Write buffer capacity too low.");
  173. return 1;
  174. }
  175. rbuf_push(client->buf_write, second, strlen(second));
  176. client->state = WS_HANDSHAKE;
  177. //Calculating expected Sec-WebSocket-Accept reply
  178. snprintf(second, TEMP_BUF_SIZE, "%s%s", nonce_b64, mqtt_protoid);
  179. #if (OPENSSL_VERSION_NUMBER < OPENSSL_VERSION_110)
  180. md_ctx = EVP_MD_CTX_create();
  181. #else
  182. md_ctx = EVP_MD_CTX_new();
  183. #endif
  184. if (md_ctx == NULL) {
  185. ERROR("Cant create EVP_MD Context");
  186. return 1;
  187. }
  188. md = EVP_get_digestbyname("sha1");
  189. if (!md) {
  190. ERROR("Unknown message digest");
  191. return 1;
  192. }
  193. if ((digest = (unsigned char *)OPENSSL_malloc(EVP_MD_size(EVP_sha256()))) == NULL) {
  194. ERROR("Cant alloc digest");
  195. return 1;
  196. }
  197. EVP_DigestInit_ex(md_ctx, md, NULL);
  198. EVP_DigestUpdate(md_ctx, second, strlen(second));
  199. EVP_DigestFinal_ex(md_ctx, digest, &md_len);
  200. EVP_EncodeBlock((unsigned char *)nonce_b64, digest, md_len);
  201. mw_free(client->hs.nonce_reply);
  202. client->hs.nonce_reply = mw_strdup(nonce_b64);
  203. OPENSSL_free(digest);
  204. #if (OPENSSL_VERSION_NUMBER < OPENSSL_VERSION_110)
  205. EVP_MD_CTX_destroy(md_ctx);
  206. #else
  207. EVP_MD_CTX_free(md_ctx);
  208. #endif
  209. return 0;
  210. }
  211. #define BUF_READ_MEMCMP_CONST(const, err) \
  212. if (rbuf_memcmp_n(client->buf_read, const, strlen(const))) { \
  213. ERROR(err); \
  214. rbuf_flush(client->buf_read); \
  215. return WS_CLIENT_PROTOCOL_ERROR; \
  216. }
  217. #define BUF_READ_CHECK_AT_LEAST(x) \
  218. if (rbuf_bytes_available(client->buf_read) < x) \
  219. return WS_CLIENT_NEED_MORE_BYTES;
  220. #define MAX_HTTP_LINE_LENGTH 1024*4
  221. #define HTTP_SC_LENGTH 4 // "XXX " http status code as C string
  222. #define WS_CLIENT_HTTP_HDR "HTTP/1.1 "
  223. #define WS_CONN_ACCEPT "sec-websocket-accept"
  224. #define HTTP_HDR_SEPARATOR ": "
  225. #define WS_NONCE_STRLEN_B64 28
  226. #define WS_HTTP_NEWLINE "\r\n"
  227. #define HTTP_HEADER_NAME_MAX_LEN 256
  228. #if HTTP_HEADER_NAME_MAX_LEN > MAX_HTTP_LINE_LENGTH
  229. #error "Buffer too small"
  230. #endif
  231. #if WS_NONCE_STRLEN_B64 > MAX_HTTP_LINE_LENGTH
  232. #error "Buffer too small"
  233. #endif
  234. #define HTTP_HDR_LINE_CHECK_LIMIT(x) if ((x) >= MAX_HTTP_LINE_LENGTH) \
  235. { \
  236. ERROR("HTTP line received is too long. Maximum is %d", MAX_HTTP_LINE_LENGTH); \
  237. return WS_CLIENT_PROTOCOL_ERROR; \
  238. }
  239. int ws_client_parse_handshake_resp(ws_client *client)
  240. {
  241. char buf[HTTP_SC_LENGTH];
  242. int idx_crlf, idx_sep;
  243. char *ptr;
  244. size_t bytes;
  245. switch (client->hs.hdr_state) {
  246. case WS_HDR_HTTP:
  247. BUF_READ_CHECK_AT_LEAST(strlen(WS_CLIENT_HTTP_HDR))
  248. BUF_READ_MEMCMP_CONST(WS_CLIENT_HTTP_HDR, "Expected \"HTTP1.1\" header");
  249. rbuf_bump_tail(client->buf_read, strlen(WS_CLIENT_HTTP_HDR));
  250. client->hs.hdr_state = WS_HDR_RC;
  251. break;
  252. case WS_HDR_RC:
  253. BUF_READ_CHECK_AT_LEAST(HTTP_SC_LENGTH); // "XXX " http return code
  254. rbuf_pop(client->buf_read, buf, HTTP_SC_LENGTH);
  255. if (buf[HTTP_SC_LENGTH - 1] != 0x20) {
  256. ERROR("HTTP status code received is not terminated by space (0x20)");
  257. return WS_CLIENT_PROTOCOL_ERROR;
  258. }
  259. buf[HTTP_SC_LENGTH - 1] = 0;
  260. client->hs.http_code = atoi(buf);
  261. if (client->hs.http_code < 100 || client->hs.http_code >= 600) {
  262. ERROR("HTTP status code received not in valid range 100-600");
  263. return WS_CLIENT_PROTOCOL_ERROR;
  264. }
  265. client->hs.hdr_state = WS_HDR_ENDLINE;
  266. break;
  267. case WS_HDR_ENDLINE:
  268. ptr = rbuf_find_bytes(client->buf_read, WS_HTTP_NEWLINE, strlen(WS_HTTP_NEWLINE), &idx_crlf);
  269. if (!ptr) {
  270. bytes = rbuf_bytes_available(client->buf_read);
  271. HTTP_HDR_LINE_CHECK_LIMIT(bytes);
  272. return WS_CLIENT_NEED_MORE_BYTES;
  273. }
  274. HTTP_HDR_LINE_CHECK_LIMIT(idx_crlf);
  275. client->hs.http_reply_msg = mw_malloc(idx_crlf+1);
  276. rbuf_pop(client->buf_read, client->hs.http_reply_msg, idx_crlf);
  277. client->hs.http_reply_msg[idx_crlf] = 0;
  278. rbuf_bump_tail(client->buf_read, strlen(WS_HTTP_NEWLINE));
  279. client->hs.hdr_state = WS_HDR_PARSE_HEADERS;
  280. break;
  281. case WS_HDR_PARSE_HEADERS:
  282. ptr = rbuf_find_bytes(client->buf_read, WS_HTTP_NEWLINE, strlen(WS_HTTP_NEWLINE), &idx_crlf);
  283. if (!ptr) {
  284. bytes = rbuf_bytes_available(client->buf_read);
  285. HTTP_HDR_LINE_CHECK_LIMIT(bytes);
  286. return WS_CLIENT_NEED_MORE_BYTES;
  287. }
  288. HTTP_HDR_LINE_CHECK_LIMIT(idx_crlf);
  289. if (!idx_crlf) { // empty line, header end
  290. rbuf_bump_tail(client->buf_read, strlen(WS_HTTP_NEWLINE));
  291. client->hs.hdr_state = WS_HDR_PARSE_DONE;
  292. return 0;
  293. }
  294. ptr = rbuf_find_bytes(client->buf_read, HTTP_HDR_SEPARATOR, strlen(HTTP_HDR_SEPARATOR), &idx_sep);
  295. if (!ptr || idx_sep > idx_crlf) {
  296. ERROR("Expected HTTP hdr field key/value separator \": \" before endline in non empty HTTP header line");
  297. return WS_CLIENT_PROTOCOL_ERROR;
  298. }
  299. if (idx_crlf == idx_sep + (int)strlen(HTTP_HDR_SEPARATOR)) {
  300. ERROR("HTTP Header value cannot be empty");
  301. return WS_CLIENT_PROTOCOL_ERROR;
  302. }
  303. if (idx_sep > HTTP_HEADER_NAME_MAX_LEN) {
  304. ERROR("HTTP header too long (%d)", idx_sep);
  305. return WS_CLIENT_PROTOCOL_ERROR;
  306. }
  307. struct http_header *hdr = mw_calloc(1, sizeof(struct http_header) + idx_crlf); //idx_crlf includes ": " that will be used as 2 \0 bytes
  308. hdr->key = ((char*)hdr) + sizeof(struct http_header);
  309. hdr->value = hdr->key + idx_sep + 1;
  310. bytes = rbuf_pop(client->buf_read, hdr->key, idx_sep);
  311. rbuf_bump_tail(client->buf_read, strlen(HTTP_HDR_SEPARATOR));
  312. bytes = rbuf_pop(client->buf_read, hdr->value, idx_crlf - idx_sep - strlen(HTTP_HDR_SEPARATOR));
  313. rbuf_bump_tail(client->buf_read, strlen(WS_HTTP_NEWLINE));
  314. for (int i = 0; hdr->key[i]; i++)
  315. hdr->key[i] = tolower(hdr->key[i]);
  316. // DEBUG("HTTP header \"%s\" received. Value \"%s\"", hdr->key, hdr->value);
  317. if (ws_client_add_http_header(client, hdr))
  318. return WS_CLIENT_PROTOCOL_ERROR;
  319. if (!strcmp(hdr->key, WS_CONN_ACCEPT)) {
  320. if (strcmp(client->hs.nonce_reply, hdr->value)) {
  321. ERROR("Received NONCE \"%s\" does not match expected nonce of \"%s\"", hdr->value, client->hs.nonce_reply);
  322. return WS_CLIENT_PROTOCOL_ERROR;
  323. }
  324. client->hs.nonce_matched = 1;
  325. }
  326. break;
  327. case WS_HDR_PARSE_DONE:
  328. if (!client->hs.nonce_matched) {
  329. ERROR("Missing " WS_CONN_ACCEPT " header");
  330. return WS_CLIENT_PROTOCOL_ERROR;
  331. }
  332. if (client->hs.http_code != 101) {
  333. ERROR("HTTP return code not 101. Received %d with msg \"%s\".", client->hs.http_code, client->hs.http_reply_msg);
  334. return WS_CLIENT_PROTOCOL_ERROR;
  335. }
  336. client->state = WS_ESTABLISHED;
  337. client->hs.hdr_state = WS_HDR_ALL_DONE;
  338. INFO("Websocket Connection Accepted By Server");
  339. return WS_CLIENT_PARSING_DONE;
  340. case WS_HDR_ALL_DONE:
  341. FATAL("This is error we should never come here!");
  342. return WS_CLIENT_PROTOCOL_ERROR;
  343. }
  344. return 0;
  345. }
  346. #define BYTE_MSB 0x80
  347. #define WS_FINAL_FRAG BYTE_MSB
  348. #define WS_PAYLOAD_MASKED BYTE_MSB
  349. static inline size_t get_ws_hdr_size(size_t payload_size)
  350. {
  351. size_t hdr_len = 2 + 4 /*mask*/;
  352. if(payload_size > 125)
  353. hdr_len += 2;
  354. if(payload_size > 65535)
  355. hdr_len += 6;
  356. return hdr_len;
  357. }
  358. #define MAX_POSSIBLE_HDR_LEN 14
  359. int ws_client_send(ws_client *client, enum websocket_opcode frame_type, const char *data, size_t size)
  360. {
  361. // TODO maybe? implement fragmenting, it is not necessary though
  362. // as both tested MQTT brokers have no reuirement of one MQTT envelope
  363. // be equal to one WebSockets envelope. Therefore there is no need to send
  364. // one big MQTT message as single fragmented WebSocket envelope
  365. char hdr[MAX_POSSIBLE_HDR_LEN];
  366. char *ptr = hdr;
  367. char *mask;
  368. int size_written = 0;
  369. size_t j = 0;
  370. size_t w_buff_free = rbuf_bytes_free(client->buf_write);
  371. size_t hdr_len = get_ws_hdr_size(size);
  372. if (w_buff_free < hdr_len * 2) {
  373. #ifdef DEBUG_ULTRA_VERBOSE
  374. DEBUG("Write buffer full. Can't write requested %d size.", size);
  375. #endif
  376. return 0;
  377. }
  378. if (w_buff_free < (hdr_len + size)) {
  379. #ifdef DEBUG_ULTRA_VERBOSE
  380. DEBUG("Can't write whole MQTT packet of %d bytes into the buffer. Will do partial send of %d.", size, w_buff_free - hdr_len);
  381. #endif
  382. size = w_buff_free - hdr_len;
  383. hdr_len = get_ws_hdr_size(size);
  384. // the actual needed header size might decrease if we cut number of bytes
  385. // if decrease of size crosses 65535 or 125 boundary
  386. // but I can live with that at least for now
  387. // worst case is we have 6 more bytes we could have written
  388. // no bigus dealus
  389. }
  390. *ptr++ = frame_type | WS_FINAL_FRAG;
  391. //generate length
  392. *ptr = WS_PAYLOAD_MASKED;
  393. if (size > 65535) {
  394. *ptr++ |= 0x7f;
  395. uint64_t be = htobe64(size);
  396. memcpy(ptr, (void *)&be, sizeof(be));
  397. ptr += sizeof(be);
  398. } else if (size > 125) {
  399. *ptr++ |= 0x7e;
  400. uint16_t be = htobe16(size);
  401. memcpy(ptr, (void *)&be, sizeof(be));
  402. ptr += sizeof(be);
  403. } else
  404. *ptr++ |= size;
  405. mask = ptr;
  406. if (read(client->entropy_fd, mask, sizeof(uint32_t)) < (ssize_t)sizeof(uint32_t)) {
  407. ERROR("Unable to get mask from \"" ENTROPY_SOURCE "\"");
  408. return -2;
  409. }
  410. rbuf_push(client->buf_write, hdr, hdr_len);
  411. if (!size)
  412. return 0;
  413. // copy and mask data in the write ringbuffer
  414. while (size - size_written) {
  415. size_t writable_bytes;
  416. char *w_ptr = rbuf_get_linear_insert_range(client->buf_write, &writable_bytes);
  417. if(!writable_bytes)
  418. break;
  419. writable_bytes = (writable_bytes > size) ? (size - size_written) : writable_bytes;
  420. memcpy(w_ptr, &data[size_written], writable_bytes);
  421. rbuf_bump_head(client->buf_write, writable_bytes);
  422. for (size_t i = 0; i < writable_bytes; i++, j++)
  423. w_ptr[i] ^= mask[j % 4];
  424. size_written += writable_bytes;
  425. }
  426. return size_written;
  427. }
  428. static int check_opcode(ws_client *client,enum websocket_opcode oc)
  429. {
  430. switch(oc) {
  431. case WS_OP_BINARY_FRAME:
  432. case WS_OP_CONNECTION_CLOSE:
  433. case WS_OP_PING:
  434. return 0;
  435. case WS_OP_CONTINUATION_FRAME:
  436. FATAL("WS_OP_CONTINUATION_FRAME NOT IMPLEMENTED YET!!!!");
  437. return 0;
  438. case WS_OP_TEXT_FRAME:
  439. FATAL("WS_OP_TEXT_FRAME NOT IMPLEMENTED YET!!!!");
  440. return 0;
  441. case WS_OP_PONG:
  442. FATAL("WS_OP_PONG NOT IMPLEMENTED YET!!!!");
  443. return 0;
  444. default:
  445. return WS_CLIENT_PROTOCOL_ERROR;
  446. }
  447. }
  448. static inline void ws_client_rx_post_hdr_state(ws_client *client)
  449. {
  450. switch(client->rx.opcode) {
  451. case WS_OP_BINARY_FRAME:
  452. client->rx.parse_state = WS_PAYLOAD_DATA;
  453. return;
  454. case WS_OP_CONNECTION_CLOSE:
  455. client->rx.parse_state = WS_PAYLOAD_CONNECTION_CLOSE;
  456. return;
  457. case WS_OP_PING:
  458. client->rx.parse_state = WS_PAYLOAD_PING_REQ_PAYLOAD;
  459. return;
  460. default:
  461. client->rx.parse_state = WS_PAYLOAD_SKIP_UNKNOWN_PAYLOAD;
  462. return;
  463. }
  464. }
  465. #define LONGEST_POSSIBLE_HDR_PART 8
  466. int ws_client_process_rx_ws(ws_client *client)
  467. {
  468. char buf[LONGEST_POSSIBLE_HDR_PART];
  469. size_t size;
  470. switch (client->rx.parse_state) {
  471. case WS_FIRST_2BYTES:
  472. BUF_READ_CHECK_AT_LEAST(2);
  473. rbuf_pop(client->buf_read, buf, 2);
  474. client->rx.opcode = buf[0] & (char)~BYTE_MSB;
  475. if (!(buf[0] & (char)~WS_FINAL_FRAG)) {
  476. ERROR("Not supporting fragmented messages yet!");
  477. return WS_CLIENT_PROTOCOL_ERROR;
  478. }
  479. if (check_opcode(client, client->rx.opcode) == WS_CLIENT_PROTOCOL_ERROR)
  480. return WS_CLIENT_PROTOCOL_ERROR;
  481. if (buf[1] & (char)WS_PAYLOAD_MASKED) {
  482. ERROR("Mask is not allowed in Server->Client Websocket direction.");
  483. return WS_CLIENT_PROTOCOL_ERROR;
  484. }
  485. switch (buf[1]) {
  486. case 127:
  487. client->rx.parse_state = WS_PAYLOAD_EXTENDED_64;
  488. break;
  489. case 126:
  490. client->rx.parse_state = WS_PAYLOAD_EXTENDED_16;
  491. break;
  492. default:
  493. client->rx.payload_length = buf[1];
  494. ws_client_rx_post_hdr_state(client);
  495. }
  496. break;
  497. case WS_PAYLOAD_EXTENDED_16:
  498. BUF_READ_CHECK_AT_LEAST(2);
  499. rbuf_pop(client->buf_read, buf, 2);
  500. client->rx.payload_length = be16toh(*((uint16_t *)buf));
  501. ws_client_rx_post_hdr_state(client);
  502. break;
  503. case WS_PAYLOAD_EXTENDED_64:
  504. BUF_READ_CHECK_AT_LEAST(LONGEST_POSSIBLE_HDR_PART);
  505. rbuf_pop(client->buf_read, buf, LONGEST_POSSIBLE_HDR_PART);
  506. client->rx.payload_length = be64toh(*((uint64_t *)buf));
  507. ws_client_rx_post_hdr_state(client);
  508. break;
  509. case WS_PAYLOAD_DATA:
  510. // TODO not pretty?
  511. while (client->rx.payload_processed < client->rx.payload_length) {
  512. size_t remaining = client->rx.payload_length - client->rx.payload_processed;
  513. if (!rbuf_bytes_available(client->buf_read))
  514. return WS_CLIENT_NEED_MORE_BYTES;
  515. char *insert = rbuf_get_linear_insert_range(client->buf_to_mqtt, &size);
  516. if (!insert) {
  517. #ifdef DEBUG_ULTRA_VERBOSE
  518. DEBUG("BUFFER TOO FULL. Avail %d req %d", (int)size, (int)remaining);
  519. #endif
  520. return WS_CLIENT_BUFFER_FULL;
  521. }
  522. size = (size > remaining) ? remaining : size;
  523. size = rbuf_pop(client->buf_read, insert, size);
  524. rbuf_bump_head(client->buf_to_mqtt, size);
  525. client->rx.payload_processed += size;
  526. }
  527. client->rx.parse_state = WS_PACKET_DONE;
  528. break;
  529. case WS_PAYLOAD_CONNECTION_CLOSE:
  530. // for WS_OP_CONNECTION_CLOSE allowed is
  531. // a) empty payload
  532. // b) 2byte reason code
  533. // c) 2byte reason code followed by message
  534. if (client->rx.payload_length == 1) {
  535. ERROR("WebScoket CONNECTION_CLOSE can't have payload of size 1");
  536. return WS_CLIENT_PROTOCOL_ERROR;
  537. }
  538. if (!client->rx.payload_length) {
  539. INFO("WebSocket server closed the connection without giving reason.");
  540. client->rx.parse_state = WS_PACKET_DONE;
  541. break;
  542. }
  543. client->rx.parse_state = WS_PAYLOAD_CONNECTION_CLOSE_EC;
  544. break;
  545. case WS_PAYLOAD_CONNECTION_CLOSE_EC:
  546. BUF_READ_CHECK_AT_LEAST(sizeof(uint16_t));
  547. rbuf_pop(client->buf_read, buf, sizeof(uint16_t));
  548. client->rx.specific_data.op_close.ec = be16toh(*((uint16_t *)buf));
  549. client->rx.payload_processed += sizeof(uint16_t);
  550. if(client->rx.payload_processed == client->rx.payload_length) {
  551. INFO("WebSocket server closed the connection with EC=%d. Without message.",
  552. client->rx.specific_data.op_close.ec);
  553. client->rx.parse_state = WS_PACKET_DONE;
  554. break;
  555. }
  556. client->rx.parse_state = WS_PAYLOAD_CONNECTION_CLOSE_MSG;
  557. break;
  558. case WS_PAYLOAD_CONNECTION_CLOSE_MSG:
  559. if (!client->rx.specific_data.op_close.reason)
  560. client->rx.specific_data.op_close.reason = mw_malloc(client->rx.payload_length + 1);
  561. while (client->rx.payload_processed < client->rx.payload_length) {
  562. if (!rbuf_bytes_available(client->buf_read))
  563. return WS_CLIENT_NEED_MORE_BYTES;
  564. client->rx.payload_processed += rbuf_pop(client->buf_read,
  565. &client->rx.specific_data.op_close.reason[client->rx.payload_processed - sizeof(uint16_t)],
  566. client->rx.payload_length - client->rx.payload_processed);
  567. }
  568. client->rx.specific_data.op_close.reason[client->rx.payload_length] = 0;
  569. INFO("WebSocket server closed the connection with EC=%d and reason \"%s\"",
  570. client->rx.specific_data.op_close.ec,
  571. client->rx.specific_data.op_close.reason);
  572. mw_free(client->rx.specific_data.op_close.reason);
  573. client->rx.specific_data.op_close.reason = NULL;
  574. client->rx.parse_state = WS_PACKET_DONE;
  575. break;
  576. case WS_PAYLOAD_SKIP_UNKNOWN_PAYLOAD:
  577. BUF_READ_CHECK_AT_LEAST(client->rx.payload_length);
  578. WARN("Skipping Websocket Packet of unsupported/unknown type");
  579. if (client->rx.payload_length)
  580. rbuf_bump_tail(client->buf_read, client->rx.payload_length);
  581. client->rx.parse_state = WS_PACKET_DONE;
  582. return WS_CLIENT_PARSING_DONE;
  583. case WS_PAYLOAD_PING_REQ_PAYLOAD:
  584. if (client->rx.payload_length > rbuf_get_capacity(client->buf_read) / 2) {
  585. ERROR("Ping arrived with payload which is too big!");
  586. return WS_CLIENT_INTERNAL_ERROR;
  587. }
  588. BUF_READ_CHECK_AT_LEAST(client->rx.payload_length);
  589. client->rx.specific_data.ping_msg = mw_malloc(client->rx.payload_length);
  590. rbuf_pop(client->buf_read, client->rx.specific_data.ping_msg, client->rx.payload_length);
  591. // TODO schedule this instead of sending right away
  592. // then attempt to send as soon as buffer space clears up
  593. size = ws_client_send(client, WS_OP_PONG, client->rx.specific_data.ping_msg, client->rx.payload_length);
  594. if (size != client->rx.payload_length) {
  595. ERROR("Unable to send the PONG as one packet back. Closing connection.");
  596. return WS_CLIENT_PROTOCOL_ERROR;
  597. }
  598. client->rx.parse_state = WS_PACKET_DONE;
  599. return WS_CLIENT_PARSING_DONE;
  600. case WS_PACKET_DONE:
  601. client->rx.parse_state = WS_FIRST_2BYTES;
  602. client->rx.payload_processed = 0;
  603. if (client->rx.opcode == WS_OP_CONNECTION_CLOSE)
  604. return WS_CLIENT_CONNECTION_CLOSED;
  605. return WS_CLIENT_PARSING_DONE;
  606. default:
  607. FATAL("Unknown parse state");
  608. return WS_CLIENT_INTERNAL_ERROR;
  609. }
  610. return 0;
  611. }
  612. int ws_client_process(ws_client *client)
  613. {
  614. int ret;
  615. switch(client->state) {
  616. case WS_RAW:
  617. if (ws_client_start_handshake(client))
  618. return WS_CLIENT_INTERNAL_ERROR;
  619. return WS_CLIENT_NEED_MORE_BYTES;
  620. case WS_HANDSHAKE:
  621. do {
  622. ret = ws_client_parse_handshake_resp(client);
  623. if (ret == WS_CLIENT_PROTOCOL_ERROR)
  624. client->state = WS_ERROR;
  625. if (ret == WS_CLIENT_PARSING_DONE && client->state == WS_ESTABLISHED)
  626. ret = WS_CLIENT_NEED_MORE_BYTES;
  627. } while (!ret);
  628. break;
  629. case WS_ESTABLISHED:
  630. do {
  631. ret = ws_client_process_rx_ws(client);
  632. switch(ret) {
  633. case WS_CLIENT_PROTOCOL_ERROR:
  634. client->state = WS_ERROR;
  635. break;
  636. case WS_CLIENT_CONNECTION_CLOSED:
  637. client->state = WS_CONN_CLOSED_GRACEFUL;
  638. break;
  639. }
  640. // if ret == 0 we can continue parsing
  641. // if ret == WS_CLIENT_PARSING_DONE we processed
  642. // one websocket packet and attempt processing
  643. // next one if data available in the buffer
  644. } while (!ret || ret == WS_CLIENT_PARSING_DONE);
  645. break;
  646. case WS_ERROR:
  647. ERROR("ws_client is in error state. Restart the connection!");
  648. return WS_CLIENT_PROTOCOL_ERROR;
  649. case WS_CONN_CLOSED_GRACEFUL:
  650. ERROR("Connection has been gracefully closed. Calling this is useless (and probably bug) until you reconnect again.");
  651. return WS_CLIENT_CONNECTION_CLOSED;
  652. default:
  653. FATAL("Unknown connection state! Probably memory corruption.");
  654. return WS_CLIENT_INTERNAL_ERROR;
  655. }
  656. return ret;
  657. }