123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737738739740741742743744 |
- // Copyright (C) 2020 Timotej Šiškovič
- // SPDX-License-Identifier: GPL-3.0-only
- //
- // This program is free software: you can redistribute it and/or modify it
- // under the terms of the GNU General Public License as published by the Free Software Foundation, version 3.
- //
- // This program is distributed in the hope that it will be useful, but WITHOUT ANY WARRANTY;
- // without even the implied warranty of MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.
- // See the GNU General Public License for more details.
- //
- // You should have received a copy of the GNU General Public License along with this program.
- // If not, see <https://www.gnu.org/licenses/>.
- #include <fcntl.h>
- #include <unistd.h>
- #include <string.h>
- #include <errno.h>
- #include <ctype.h>
- #include <openssl/evp.h>
- #include "ws_client.h"
- #include "common_internal.h"
- #ifdef MQTT_WEBSOCKETS_DEBUG
- #include "../c-rbuf/src/ringbuffer_internal.h"
- #endif
- #define UNIT_LOG_PREFIX "ws_client: "
- #define FATAL(fmt, ...) mws_fatal(client->log, UNIT_LOG_PREFIX fmt, ##__VA_ARGS__)
- #define ERROR(fmt, ...) mws_error(client->log, UNIT_LOG_PREFIX fmt, ##__VA_ARGS__)
- #define WARN(fmt, ...) mws_warn (client->log, UNIT_LOG_PREFIX fmt, ##__VA_ARGS__)
- #define INFO(fmt, ...) mws_info (client->log, UNIT_LOG_PREFIX fmt, ##__VA_ARGS__)
- #define DEBUG(fmt, ...) mws_debug(client->log, UNIT_LOG_PREFIX fmt, ##__VA_ARGS__)
- const char *websocket_upgrage_hdr = "GET /mqtt HTTP/1.1\x0D\x0A"
- "Host: %s\x0D\x0A"
- "Upgrade: websocket\x0D\x0A"
- "Connection: Upgrade\x0D\x0A"
- "Sec-WebSocket-Key: %s\x0D\x0A"
- "Origin: http://example.com\x0D\x0A"
- "Sec-WebSocket-Protocol: mqtt\x0D\x0A"
- "Sec-WebSocket-Version: 13\x0D\x0A\x0D\x0A";
- const char *mqtt_protoid = "258EAFA5-E914-47DA-95CA-C5AB0DC85B11";
- #define DEFAULT_RINGBUFFER_SIZE (1024*128)
- #define ENTROPY_SOURCE "/dev/urandom"
- ws_client *ws_client_new(size_t buf_size, char **host, mqtt_wss_log_ctx_t log)
- {
- ws_client *client;
- if(!host)
- return NULL;
- client = mw_calloc(1, sizeof(ws_client));
- if (!client)
- return NULL;
- client->host = host;
- client->log = log;
- client->buf_read = rbuf_create(buf_size ? buf_size : DEFAULT_RINGBUFFER_SIZE);
- if (!client->buf_read)
- goto cleanup;
- client->buf_write = rbuf_create(buf_size ? buf_size : DEFAULT_RINGBUFFER_SIZE);
- if (!client->buf_write)
- goto cleanup_1;
- client->buf_to_mqtt = rbuf_create(buf_size ? buf_size : DEFAULT_RINGBUFFER_SIZE);
- if (!client->buf_to_mqtt)
- goto cleanup_2;
- client->entropy_fd = open(ENTROPY_SOURCE, O_RDONLY);
- if (client->entropy_fd < 1) {
- ERROR("Error opening entropy source \"" ENTROPY_SOURCE "\". Reason: \"%s\"", strerror(errno));
- goto cleanup_3;
- }
- return client;
- cleanup_3:
- rbuf_free(client->buf_to_mqtt);
- cleanup_2:
- rbuf_free(client->buf_write);
- cleanup_1:
- rbuf_free(client->buf_read);
- cleanup:
- mw_free(client);
- return NULL;
- }
- void ws_client_free_headers(ws_client *client)
- {
- struct http_header *ptr = client->hs.headers;
- struct http_header *tmp;
- while (ptr) {
- tmp = ptr;
- ptr = ptr->next;
- mw_free(tmp);
- }
- client->hs.headers = NULL;
- client->hs.headers_tail = NULL;
- client->hs.hdr_count = 0;
- }
- void ws_client_destroy(ws_client *client)
- {
- ws_client_free_headers(client);
- mw_free(client->hs.nonce_reply);
- mw_free(client->hs.http_reply_msg);
- close(client->entropy_fd);
- rbuf_free(client->buf_read);
- rbuf_free(client->buf_write);
- rbuf_free(client->buf_to_mqtt);
- mw_free(client);
- }
- void ws_client_reset(ws_client *client)
- {
- ws_client_free_headers(client);
- mw_free(client->hs.nonce_reply);
- client->hs.nonce_reply = NULL;
- mw_free(client->hs.http_reply_msg);
- client->hs.http_reply_msg = NULL;
- rbuf_flush(client->buf_read);
- rbuf_flush(client->buf_write);
- rbuf_flush(client->buf_to_mqtt);
- client->state = WS_RAW;
- client->hs.hdr_state = WS_HDR_HTTP;
- client->rx.parse_state = WS_FIRST_2BYTES;
- }
- #define MAX_HTTP_HDR_COUNT 128
- int ws_client_add_http_header(ws_client *client, struct http_header *hdr)
- {
- if (client->hs.hdr_count > MAX_HTTP_HDR_COUNT) {
- ERROR("Too many HTTP response header fields");
- return -1;
- }
- if (client->hs.headers)
- client->hs.headers_tail->next = hdr;
- else
- client->hs.headers = hdr;
- client->hs.headers_tail = hdr;
- client->hs.hdr_count++;
- return 0;
- }
- int ws_client_want_write(ws_client *client)
- {
- return rbuf_bytes_available(client->buf_write);
- }
- #define RAND_SRC "/dev/urandom"
- static int ws_client_get_nonce(ws_client *client, char *dest, unsigned int size)
- {
- // we do not need crypto secure random here
- // it's just used for protocol negotiation
- int rd;
- int f = open(RAND_SRC, O_RDONLY);
- if (f < 0) {
- ERROR("Error opening \"%s\". Err: \"%s\"", RAND_SRC, strerror(errno));
- return -2;
- }
- if ((rd = read(f, dest, size)) > 0) {
- close(f);
- return rd;
- }
- close(f);
- return -1;
- }
- #define WEBSOCKET_NONCE_SIZE 16
- #define TEMP_BUF_SIZE 4096
- int ws_client_start_handshake(ws_client *client)
- {
- char nonce[WEBSOCKET_NONCE_SIZE];
- char nonce_b64[256];
- char second[TEMP_BUF_SIZE];
- unsigned int md_len;
- unsigned char *digest;
- EVP_MD_CTX *md_ctx;
- const EVP_MD *md;
- if(!*client->host) {
- ERROR("Hostname has not been set. We should not be able to come here!");
- return 1;
- }
- ws_client_get_nonce(client, nonce, WEBSOCKET_NONCE_SIZE);
- EVP_EncodeBlock((unsigned char *)nonce_b64, (const unsigned char *)nonce, WEBSOCKET_NONCE_SIZE);
- snprintf(second, TEMP_BUF_SIZE, websocket_upgrage_hdr,
- *client->host,
- nonce_b64);
- if(rbuf_bytes_free(client->buf_write) < strlen(second)) {
- ERROR("Write buffer capacity too low.");
- return 1;
- }
- rbuf_push(client->buf_write, second, strlen(second));
- client->state = WS_HANDSHAKE;
- //Calculating expected Sec-WebSocket-Accept reply
- snprintf(second, TEMP_BUF_SIZE, "%s%s", nonce_b64, mqtt_protoid);
- #if (OPENSSL_VERSION_NUMBER < OPENSSL_VERSION_110)
- md_ctx = EVP_MD_CTX_create();
- #else
- md_ctx = EVP_MD_CTX_new();
- #endif
- if (md_ctx == NULL) {
- ERROR("Cant create EVP_MD Context");
- return 1;
- }
- md = EVP_get_digestbyname("sha1");
- if (!md) {
- ERROR("Unknown message digest");
- return 1;
- }
- if ((digest = (unsigned char *)OPENSSL_malloc(EVP_MD_size(EVP_sha256()))) == NULL) {
- ERROR("Cant alloc digest");
- return 1;
- }
- EVP_DigestInit_ex(md_ctx, md, NULL);
- EVP_DigestUpdate(md_ctx, second, strlen(second));
- EVP_DigestFinal_ex(md_ctx, digest, &md_len);
- EVP_EncodeBlock((unsigned char *)nonce_b64, digest, md_len);
- mw_free(client->hs.nonce_reply);
- client->hs.nonce_reply = mw_strdup(nonce_b64);
- OPENSSL_free(digest);
- #if (OPENSSL_VERSION_NUMBER < OPENSSL_VERSION_110)
- EVP_MD_CTX_destroy(md_ctx);
- #else
- EVP_MD_CTX_free(md_ctx);
- #endif
- return 0;
- }
- #define BUF_READ_MEMCMP_CONST(const, err) \
- if (rbuf_memcmp_n(client->buf_read, const, strlen(const))) { \
- ERROR(err); \
- rbuf_flush(client->buf_read); \
- return WS_CLIENT_PROTOCOL_ERROR; \
- }
- #define BUF_READ_CHECK_AT_LEAST(x) \
- if (rbuf_bytes_available(client->buf_read) < x) \
- return WS_CLIENT_NEED_MORE_BYTES;
- #define MAX_HTTP_LINE_LENGTH 1024*4
- #define HTTP_SC_LENGTH 4 // "XXX " http status code as C string
- #define WS_CLIENT_HTTP_HDR "HTTP/1.1 "
- #define WS_CONN_ACCEPT "sec-websocket-accept"
- #define HTTP_HDR_SEPARATOR ": "
- #define WS_NONCE_STRLEN_B64 28
- #define WS_HTTP_NEWLINE "\r\n"
- #define HTTP_HEADER_NAME_MAX_LEN 256
- #if HTTP_HEADER_NAME_MAX_LEN > MAX_HTTP_LINE_LENGTH
- #error "Buffer too small"
- #endif
- #if WS_NONCE_STRLEN_B64 > MAX_HTTP_LINE_LENGTH
- #error "Buffer too small"
- #endif
- #define HTTP_HDR_LINE_CHECK_LIMIT(x) if ((x) >= MAX_HTTP_LINE_LENGTH) \
- { \
- ERROR("HTTP line received is too long. Maximum is %d", MAX_HTTP_LINE_LENGTH); \
- return WS_CLIENT_PROTOCOL_ERROR; \
- }
- int ws_client_parse_handshake_resp(ws_client *client)
- {
- char buf[HTTP_SC_LENGTH];
- int idx_crlf, idx_sep;
- char *ptr;
- size_t bytes;
- switch (client->hs.hdr_state) {
- case WS_HDR_HTTP:
- BUF_READ_CHECK_AT_LEAST(strlen(WS_CLIENT_HTTP_HDR))
- BUF_READ_MEMCMP_CONST(WS_CLIENT_HTTP_HDR, "Expected \"HTTP1.1\" header");
- rbuf_bump_tail(client->buf_read, strlen(WS_CLIENT_HTTP_HDR));
- client->hs.hdr_state = WS_HDR_RC;
- break;
- case WS_HDR_RC:
- BUF_READ_CHECK_AT_LEAST(HTTP_SC_LENGTH); // "XXX " http return code
- rbuf_pop(client->buf_read, buf, HTTP_SC_LENGTH);
- if (buf[HTTP_SC_LENGTH - 1] != 0x20) {
- ERROR("HTTP status code received is not terminated by space (0x20)");
- return WS_CLIENT_PROTOCOL_ERROR;
- }
- buf[HTTP_SC_LENGTH - 1] = 0;
- client->hs.http_code = atoi(buf);
- if (client->hs.http_code < 100 || client->hs.http_code >= 600) {
- ERROR("HTTP status code received not in valid range 100-600");
- return WS_CLIENT_PROTOCOL_ERROR;
- }
- client->hs.hdr_state = WS_HDR_ENDLINE;
- break;
- case WS_HDR_ENDLINE:
- ptr = rbuf_find_bytes(client->buf_read, WS_HTTP_NEWLINE, strlen(WS_HTTP_NEWLINE), &idx_crlf);
- if (!ptr) {
- bytes = rbuf_bytes_available(client->buf_read);
- HTTP_HDR_LINE_CHECK_LIMIT(bytes);
- return WS_CLIENT_NEED_MORE_BYTES;
- }
- HTTP_HDR_LINE_CHECK_LIMIT(idx_crlf);
- client->hs.http_reply_msg = mw_malloc(idx_crlf+1);
- rbuf_pop(client->buf_read, client->hs.http_reply_msg, idx_crlf);
- client->hs.http_reply_msg[idx_crlf] = 0;
- rbuf_bump_tail(client->buf_read, strlen(WS_HTTP_NEWLINE));
- client->hs.hdr_state = WS_HDR_PARSE_HEADERS;
- break;
- case WS_HDR_PARSE_HEADERS:
- ptr = rbuf_find_bytes(client->buf_read, WS_HTTP_NEWLINE, strlen(WS_HTTP_NEWLINE), &idx_crlf);
- if (!ptr) {
- bytes = rbuf_bytes_available(client->buf_read);
- HTTP_HDR_LINE_CHECK_LIMIT(bytes);
- return WS_CLIENT_NEED_MORE_BYTES;
- }
- HTTP_HDR_LINE_CHECK_LIMIT(idx_crlf);
- if (!idx_crlf) { // empty line, header end
- rbuf_bump_tail(client->buf_read, strlen(WS_HTTP_NEWLINE));
- client->hs.hdr_state = WS_HDR_PARSE_DONE;
- return 0;
- }
- ptr = rbuf_find_bytes(client->buf_read, HTTP_HDR_SEPARATOR, strlen(HTTP_HDR_SEPARATOR), &idx_sep);
- if (!ptr || idx_sep > idx_crlf) {
- ERROR("Expected HTTP hdr field key/value separator \": \" before endline in non empty HTTP header line");
- return WS_CLIENT_PROTOCOL_ERROR;
- }
- if (idx_crlf == idx_sep + (int)strlen(HTTP_HDR_SEPARATOR)) {
- ERROR("HTTP Header value cannot be empty");
- return WS_CLIENT_PROTOCOL_ERROR;
- }
- if (idx_sep > HTTP_HEADER_NAME_MAX_LEN) {
- ERROR("HTTP header too long (%d)", idx_sep);
- return WS_CLIENT_PROTOCOL_ERROR;
- }
- struct http_header *hdr = mw_calloc(1, sizeof(struct http_header) + idx_crlf); //idx_crlf includes ": " that will be used as 2 \0 bytes
- hdr->key = ((char*)hdr) + sizeof(struct http_header);
- hdr->value = hdr->key + idx_sep + 1;
- bytes = rbuf_pop(client->buf_read, hdr->key, idx_sep);
- rbuf_bump_tail(client->buf_read, strlen(HTTP_HDR_SEPARATOR));
- bytes = rbuf_pop(client->buf_read, hdr->value, idx_crlf - idx_sep - strlen(HTTP_HDR_SEPARATOR));
- rbuf_bump_tail(client->buf_read, strlen(WS_HTTP_NEWLINE));
- for (int i = 0; hdr->key[i]; i++)
- hdr->key[i] = tolower(hdr->key[i]);
- // DEBUG("HTTP header \"%s\" received. Value \"%s\"", hdr->key, hdr->value);
- if (ws_client_add_http_header(client, hdr))
- return WS_CLIENT_PROTOCOL_ERROR;
- if (!strcmp(hdr->key, WS_CONN_ACCEPT)) {
- if (strcmp(client->hs.nonce_reply, hdr->value)) {
- ERROR("Received NONCE \"%s\" does not match expected nonce of \"%s\"", hdr->value, client->hs.nonce_reply);
- return WS_CLIENT_PROTOCOL_ERROR;
- }
- client->hs.nonce_matched = 1;
- }
- break;
- case WS_HDR_PARSE_DONE:
- if (!client->hs.nonce_matched) {
- ERROR("Missing " WS_CONN_ACCEPT " header");
- return WS_CLIENT_PROTOCOL_ERROR;
- }
- if (client->hs.http_code != 101) {
- ERROR("HTTP return code not 101. Received %d with msg \"%s\".", client->hs.http_code, client->hs.http_reply_msg);
- return WS_CLIENT_PROTOCOL_ERROR;
- }
- client->state = WS_ESTABLISHED;
- client->hs.hdr_state = WS_HDR_ALL_DONE;
- INFO("Websocket Connection Accepted By Server");
- return WS_CLIENT_PARSING_DONE;
- case WS_HDR_ALL_DONE:
- FATAL("This is error we should never come here!");
- return WS_CLIENT_PROTOCOL_ERROR;
- }
- return 0;
- }
- #define BYTE_MSB 0x80
- #define WS_FINAL_FRAG BYTE_MSB
- #define WS_PAYLOAD_MASKED BYTE_MSB
- static inline size_t get_ws_hdr_size(size_t payload_size)
- {
- size_t hdr_len = 2 + 4 /*mask*/;
- if(payload_size > 125)
- hdr_len += 2;
- if(payload_size > 65535)
- hdr_len += 6;
- return hdr_len;
- }
- #define MAX_POSSIBLE_HDR_LEN 14
- int ws_client_send(ws_client *client, enum websocket_opcode frame_type, const char *data, size_t size)
- {
- // TODO maybe? implement fragmenting, it is not necessary though
- // as both tested MQTT brokers have no reuirement of one MQTT envelope
- // be equal to one WebSockets envelope. Therefore there is no need to send
- // one big MQTT message as single fragmented WebSocket envelope
- char hdr[MAX_POSSIBLE_HDR_LEN];
- char *ptr = hdr;
- char *mask;
- int size_written = 0;
- size_t j = 0;
- size_t w_buff_free = rbuf_bytes_free(client->buf_write);
- size_t hdr_len = get_ws_hdr_size(size);
- if (w_buff_free < hdr_len * 2) {
- #ifdef DEBUG_ULTRA_VERBOSE
- DEBUG("Write buffer full. Can't write requested %d size.", size);
- #endif
- return 0;
- }
- if (w_buff_free < (hdr_len + size)) {
- #ifdef DEBUG_ULTRA_VERBOSE
- DEBUG("Can't write whole MQTT packet of %d bytes into the buffer. Will do partial send of %d.", size, w_buff_free - hdr_len);
- #endif
- size = w_buff_free - hdr_len;
- hdr_len = get_ws_hdr_size(size);
- // the actual needed header size might decrease if we cut number of bytes
- // if decrease of size crosses 65535 or 125 boundary
- // but I can live with that at least for now
- // worst case is we have 6 more bytes we could have written
- // no bigus dealus
- }
- *ptr++ = frame_type | WS_FINAL_FRAG;
- //generate length
- *ptr = WS_PAYLOAD_MASKED;
- if (size > 65535) {
- *ptr++ |= 0x7f;
- uint64_t be = htobe64(size);
- memcpy(ptr, (void *)&be, sizeof(be));
- ptr += sizeof(be);
- } else if (size > 125) {
- *ptr++ |= 0x7e;
- uint16_t be = htobe16(size);
- memcpy(ptr, (void *)&be, sizeof(be));
- ptr += sizeof(be);
- } else
- *ptr++ |= size;
-
- mask = ptr;
- if (read(client->entropy_fd, mask, sizeof(uint32_t)) < (ssize_t)sizeof(uint32_t)) {
- ERROR("Unable to get mask from \"" ENTROPY_SOURCE "\"");
- return -2;
- }
- rbuf_push(client->buf_write, hdr, hdr_len);
- if (!size)
- return 0;
- // copy and mask data in the write ringbuffer
- while (size - size_written) {
- size_t writable_bytes;
- char *w_ptr = rbuf_get_linear_insert_range(client->buf_write, &writable_bytes);
- if(!writable_bytes)
- break;
- writable_bytes = (writable_bytes > size) ? (size - size_written) : writable_bytes;
- memcpy(w_ptr, &data[size_written], writable_bytes);
- rbuf_bump_head(client->buf_write, writable_bytes);
- for (size_t i = 0; i < writable_bytes; i++, j++)
- w_ptr[i] ^= mask[j % 4];
- size_written += writable_bytes;
- }
- return size_written;
- }
- static int check_opcode(ws_client *client,enum websocket_opcode oc)
- {
- switch(oc) {
- case WS_OP_BINARY_FRAME:
- case WS_OP_CONNECTION_CLOSE:
- case WS_OP_PING:
- return 0;
- case WS_OP_CONTINUATION_FRAME:
- FATAL("WS_OP_CONTINUATION_FRAME NOT IMPLEMENTED YET!!!!");
- return 0;
- case WS_OP_TEXT_FRAME:
- FATAL("WS_OP_TEXT_FRAME NOT IMPLEMENTED YET!!!!");
- return 0;
- case WS_OP_PONG:
- FATAL("WS_OP_PONG NOT IMPLEMENTED YET!!!!");
- return 0;
- default:
- return WS_CLIENT_PROTOCOL_ERROR;
- }
- }
- static inline void ws_client_rx_post_hdr_state(ws_client *client)
- {
- switch(client->rx.opcode) {
- case WS_OP_BINARY_FRAME:
- client->rx.parse_state = WS_PAYLOAD_DATA;
- return;
- case WS_OP_CONNECTION_CLOSE:
- client->rx.parse_state = WS_PAYLOAD_CONNECTION_CLOSE;
- return;
- case WS_OP_PING:
- client->rx.parse_state = WS_PAYLOAD_PING_REQ_PAYLOAD;
- return;
- default:
- client->rx.parse_state = WS_PAYLOAD_SKIP_UNKNOWN_PAYLOAD;
- return;
- }
- }
- #define LONGEST_POSSIBLE_HDR_PART 8
- int ws_client_process_rx_ws(ws_client *client)
- {
- char buf[LONGEST_POSSIBLE_HDR_PART];
- size_t size;
- switch (client->rx.parse_state) {
- case WS_FIRST_2BYTES:
- BUF_READ_CHECK_AT_LEAST(2);
- rbuf_pop(client->buf_read, buf, 2);
- client->rx.opcode = buf[0] & (char)~BYTE_MSB;
- if (!(buf[0] & (char)~WS_FINAL_FRAG)) {
- ERROR("Not supporting fragmented messages yet!");
- return WS_CLIENT_PROTOCOL_ERROR;
- }
- if (check_opcode(client, client->rx.opcode) == WS_CLIENT_PROTOCOL_ERROR)
- return WS_CLIENT_PROTOCOL_ERROR;
- if (buf[1] & (char)WS_PAYLOAD_MASKED) {
- ERROR("Mask is not allowed in Server->Client Websocket direction.");
- return WS_CLIENT_PROTOCOL_ERROR;
- }
- switch (buf[1]) {
- case 127:
- client->rx.parse_state = WS_PAYLOAD_EXTENDED_64;
- break;
- case 126:
- client->rx.parse_state = WS_PAYLOAD_EXTENDED_16;
- break;
- default:
- client->rx.payload_length = buf[1];
- ws_client_rx_post_hdr_state(client);
- }
- break;
- case WS_PAYLOAD_EXTENDED_16:
- BUF_READ_CHECK_AT_LEAST(2);
- rbuf_pop(client->buf_read, buf, 2);
- client->rx.payload_length = be16toh(*((uint16_t *)buf));
- ws_client_rx_post_hdr_state(client);
- break;
- case WS_PAYLOAD_EXTENDED_64:
- BUF_READ_CHECK_AT_LEAST(LONGEST_POSSIBLE_HDR_PART);
- rbuf_pop(client->buf_read, buf, LONGEST_POSSIBLE_HDR_PART);
- client->rx.payload_length = be64toh(*((uint64_t *)buf));
- ws_client_rx_post_hdr_state(client);
- break;
- case WS_PAYLOAD_DATA:
- // TODO not pretty?
- while (client->rx.payload_processed < client->rx.payload_length) {
- size_t remaining = client->rx.payload_length - client->rx.payload_processed;
- if (!rbuf_bytes_available(client->buf_read))
- return WS_CLIENT_NEED_MORE_BYTES;
- char *insert = rbuf_get_linear_insert_range(client->buf_to_mqtt, &size);
- if (!insert) {
- #ifdef DEBUG_ULTRA_VERBOSE
- DEBUG("BUFFER TOO FULL. Avail %d req %d", (int)size, (int)remaining);
- #endif
- return WS_CLIENT_BUFFER_FULL;
- }
- size = (size > remaining) ? remaining : size;
- size = rbuf_pop(client->buf_read, insert, size);
- rbuf_bump_head(client->buf_to_mqtt, size);
- client->rx.payload_processed += size;
- }
- client->rx.parse_state = WS_PACKET_DONE;
- break;
- case WS_PAYLOAD_CONNECTION_CLOSE:
- // for WS_OP_CONNECTION_CLOSE allowed is
- // a) empty payload
- // b) 2byte reason code
- // c) 2byte reason code followed by message
- if (client->rx.payload_length == 1) {
- ERROR("WebScoket CONNECTION_CLOSE can't have payload of size 1");
- return WS_CLIENT_PROTOCOL_ERROR;
- }
- if (!client->rx.payload_length) {
- INFO("WebSocket server closed the connection without giving reason.");
- client->rx.parse_state = WS_PACKET_DONE;
- break;
- }
- client->rx.parse_state = WS_PAYLOAD_CONNECTION_CLOSE_EC;
- break;
- case WS_PAYLOAD_CONNECTION_CLOSE_EC:
- BUF_READ_CHECK_AT_LEAST(sizeof(uint16_t));
- rbuf_pop(client->buf_read, buf, sizeof(uint16_t));
- client->rx.specific_data.op_close.ec = be16toh(*((uint16_t *)buf));
- client->rx.payload_processed += sizeof(uint16_t);
- if(client->rx.payload_processed == client->rx.payload_length) {
- INFO("WebSocket server closed the connection with EC=%d. Without message.",
- client->rx.specific_data.op_close.ec);
- client->rx.parse_state = WS_PACKET_DONE;
- break;
- }
- client->rx.parse_state = WS_PAYLOAD_CONNECTION_CLOSE_MSG;
- break;
- case WS_PAYLOAD_CONNECTION_CLOSE_MSG:
- if (!client->rx.specific_data.op_close.reason)
- client->rx.specific_data.op_close.reason = mw_malloc(client->rx.payload_length + 1);
- while (client->rx.payload_processed < client->rx.payload_length) {
- if (!rbuf_bytes_available(client->buf_read))
- return WS_CLIENT_NEED_MORE_BYTES;
- client->rx.payload_processed += rbuf_pop(client->buf_read,
- &client->rx.specific_data.op_close.reason[client->rx.payload_processed - sizeof(uint16_t)],
- client->rx.payload_length - client->rx.payload_processed);
- }
- client->rx.specific_data.op_close.reason[client->rx.payload_length] = 0;
- INFO("WebSocket server closed the connection with EC=%d and reason \"%s\"",
- client->rx.specific_data.op_close.ec,
- client->rx.specific_data.op_close.reason);
- mw_free(client->rx.specific_data.op_close.reason);
- client->rx.specific_data.op_close.reason = NULL;
- client->rx.parse_state = WS_PACKET_DONE;
- break;
- case WS_PAYLOAD_SKIP_UNKNOWN_PAYLOAD:
- BUF_READ_CHECK_AT_LEAST(client->rx.payload_length);
- WARN("Skipping Websocket Packet of unsupported/unknown type");
- if (client->rx.payload_length)
- rbuf_bump_tail(client->buf_read, client->rx.payload_length);
- client->rx.parse_state = WS_PACKET_DONE;
- return WS_CLIENT_PARSING_DONE;
- case WS_PAYLOAD_PING_REQ_PAYLOAD:
- if (client->rx.payload_length > rbuf_get_capacity(client->buf_read) / 2) {
- ERROR("Ping arrived with payload which is too big!");
- return WS_CLIENT_INTERNAL_ERROR;
- }
- BUF_READ_CHECK_AT_LEAST(client->rx.payload_length);
- client->rx.specific_data.ping_msg = mw_malloc(client->rx.payload_length);
- rbuf_pop(client->buf_read, client->rx.specific_data.ping_msg, client->rx.payload_length);
- // TODO schedule this instead of sending right away
- // then attempt to send as soon as buffer space clears up
- size = ws_client_send(client, WS_OP_PONG, client->rx.specific_data.ping_msg, client->rx.payload_length);
- if (size != client->rx.payload_length) {
- ERROR("Unable to send the PONG as one packet back. Closing connection.");
- return WS_CLIENT_PROTOCOL_ERROR;
- }
- client->rx.parse_state = WS_PACKET_DONE;
- return WS_CLIENT_PARSING_DONE;
- case WS_PACKET_DONE:
- client->rx.parse_state = WS_FIRST_2BYTES;
- client->rx.payload_processed = 0;
- if (client->rx.opcode == WS_OP_CONNECTION_CLOSE)
- return WS_CLIENT_CONNECTION_CLOSED;
- return WS_CLIENT_PARSING_DONE;
- default:
- FATAL("Unknown parse state");
- return WS_CLIENT_INTERNAL_ERROR;
- }
- return 0;
- }
- int ws_client_process(ws_client *client)
- {
- int ret;
- switch(client->state) {
- case WS_RAW:
- if (ws_client_start_handshake(client))
- return WS_CLIENT_INTERNAL_ERROR;
- return WS_CLIENT_NEED_MORE_BYTES;
- case WS_HANDSHAKE:
- do {
- ret = ws_client_parse_handshake_resp(client);
- if (ret == WS_CLIENT_PROTOCOL_ERROR)
- client->state = WS_ERROR;
- if (ret == WS_CLIENT_PARSING_DONE && client->state == WS_ESTABLISHED)
- ret = WS_CLIENT_NEED_MORE_BYTES;
- } while (!ret);
- break;
- case WS_ESTABLISHED:
- do {
- ret = ws_client_process_rx_ws(client);
- switch(ret) {
- case WS_CLIENT_PROTOCOL_ERROR:
- client->state = WS_ERROR;
- break;
- case WS_CLIENT_CONNECTION_CLOSED:
- client->state = WS_CONN_CLOSED_GRACEFUL;
- break;
- }
- // if ret == 0 we can continue parsing
- // if ret == WS_CLIENT_PARSING_DONE we processed
- // one websocket packet and attempt processing
- // next one if data available in the buffer
- } while (!ret || ret == WS_CLIENT_PARSING_DONE);
- break;
- case WS_ERROR:
- ERROR("ws_client is in error state. Restart the connection!");
- return WS_CLIENT_PROTOCOL_ERROR;
- case WS_CONN_CLOSED_GRACEFUL:
- ERROR("Connection has been gracefully closed. Calling this is useless (and probably bug) until you reconnect again.");
- return WS_CLIENT_CONNECTION_CLOSED;
- default:
- FATAL("Unknown connection state! Probably memory corruption.");
- return WS_CLIENT_INTERNAL_ERROR;
- }
- return ret;
- }
|