mqtt_wss_client.c 35 KB

1234567891011121314151617181920212223242526272829303132333435363738394041424344454647484950515253545556575859606162636465666768697071727374757677787980818283848586878889909192939495969798991001011021031041051061071081091101111121131141151161171181191201211221231241251261271281291301311321331341351361371381391401411421431441451461471481491501511521531541551561571581591601611621631641651661671681691701711721731741751761771781791801811821831841851861871881891901911921931941951961971981992002012022032042052062072082092102112122132142152162172182192202212222232242252262272282292302312322332342352362372382392402412422432442452462472482492502512522532542552562572582592602612622632642652662672682692702712722732742752762772782792802812822832842852862872882892902912922932942952962972982993003013023033043053063073083093103113123133143153163173183193203213223233243253263273283293303313323333343353363373383393403413423433443453463473483493503513523533543553563573583593603613623633643653663673683693703713723733743753763773783793803813823833843853863873883893903913923933943953963973983994004014024034044054064074084094104114124134144154164174184194204214224234244254264274284294304314324334344354364374384394404414424434444454464474484494504514524534544554564574584594604614624634644654664674684694704714724734744754764774784794804814824834844854864874884894904914924934944954964974984995005015025035045055065075085095105115125135145155165175185195205215225235245255265275285295305315325335345355365375385395405415425435445455465475485495505515525535545555565575585595605615625635645655665675685695705715725735745755765775785795805815825835845855865875885895905915925935945955965975985996006016026036046056066076086096106116126136146156166176186196206216226236246256266276286296306316326336346356366376386396406416426436446456466476486496506516526536546556566576586596606616626636646656666676686696706716726736746756766776786796806816826836846856866876886896906916926936946956966976986997007017027037047057067077087097107117127137147157167177187197207217227237247257267277287297307317327337347357367377387397407417427437447457467477487497507517527537547557567577587597607617627637647657667677687697707717727737747757767777787797807817827837847857867877887897907917927937947957967977987998008018028038048058068078088098108118128138148158168178188198208218228238248258268278288298308318328338348358368378388398408418428438448458468478488498508518528538548558568578588598608618628638648658668678688698708718728738748758768778788798808818828838848858868878888898908918928938948958968978988999009019029039049059069079089099109119129139149159169179189199209219229239249259269279289299309319329339349359369379389399409419429439449459469479489499509519529539549559569579589599609619629639649659669679689699709719729739749759769779789799809819829839849859869879889899909919929939949959969979989991000100110021003100410051006100710081009101010111012101310141015101610171018101910201021102210231024102510261027102810291030103110321033103410351036103710381039104010411042104310441045104610471048104910501051105210531054105510561057105810591060106110621063106410651066106710681069107010711072107310741075107610771078107910801081108210831084108510861087108810891090109110921093109410951096109710981099110011011102110311041105110611071108110911101111111211131114111511161117111811191120112111221123112411251126
  1. // SPDX-License-Identifier: GPL-3.0-only
  2. // Copyright (C) 2020 Timotej Šiškovič
  3. #ifndef _GNU_SOURCE
  4. #define _GNU_SOURCE
  5. #endif
  6. #include "mqtt_wss_client.h"
  7. #include "mqtt_ng.h"
  8. #include "ws_client.h"
  9. #include "common_internal.h"
  10. #include <stdlib.h>
  11. #include <fcntl.h>
  12. #include <unistd.h>
  13. #include <poll.h>
  14. #include <string.h>
  15. #include <time.h>
  16. #include <sys/socket.h>
  17. #include <netinet/in.h>
  18. #include <arpa/inet.h>
  19. #include <netinet/tcp.h> //TCP_NODELAY
  20. #include <netdb.h>
  21. #include <openssl/err.h>
  22. #include <openssl/ssl.h>
  23. #define PIPE_READ_END 0
  24. #define PIPE_WRITE_END 1
  25. #define POLLFD_SOCKET 0
  26. #define POLLFD_PIPE 1
  27. #if (OPENSSL_VERSION_NUMBER < OPENSSL_VERSION_110) && (SSLEAY_VERSION_NUMBER >= OPENSSL_VERSION_097)
  28. #include <openssl/conf.h>
  29. #endif
  30. //TODO MQTT_PUBLISH_RETAIN should not be needed anymore
  31. #define MQTT_PUBLISH_RETAIN 0x01
  32. #define MQTT_CONNECT_CLEAN_SESSION 0x02
  33. #define MQTT_CONNECT_WILL_RETAIN 0x20
  34. char *util_openssl_ret_err(int err)
  35. {
  36. switch(err){
  37. case SSL_ERROR_WANT_READ:
  38. return "SSL_ERROR_WANT_READ";
  39. case SSL_ERROR_WANT_WRITE:
  40. return "SSL_ERROR_WANT_WRITE";
  41. case SSL_ERROR_NONE:
  42. return "SSL_ERROR_NONE";
  43. case SSL_ERROR_ZERO_RETURN:
  44. return "SSL_ERROR_ZERO_RETURN";
  45. case SSL_ERROR_WANT_CONNECT:
  46. return "SSL_ERROR_WANT_CONNECT";
  47. case SSL_ERROR_WANT_ACCEPT:
  48. return "SSL_ERROR_WANT_ACCEPT";
  49. case SSL_ERROR_WANT_X509_LOOKUP:
  50. return "SSL_ERROR_WANT_X509_LOOKUP";
  51. #ifdef SSL_ERROR_WANT_ASYNC
  52. case SSL_ERROR_WANT_ASYNC:
  53. return "SSL_ERROR_WANT_ASYNC";
  54. #endif
  55. #ifdef SSL_ERROR_WANT_ASYNC_JOB
  56. case SSL_ERROR_WANT_ASYNC_JOB:
  57. return "SSL_ERROR_WANT_ASYNC_JOB";
  58. #endif
  59. #ifdef SSL_ERROR_WANT_CLIENT_HELLO_CB
  60. case SSL_ERROR_WANT_CLIENT_HELLO_CB:
  61. return "SSL_ERROR_WANT_CLIENT_HELLO_CB";
  62. #endif
  63. case SSL_ERROR_SYSCALL:
  64. return "SSL_ERROR_SYSCALL";
  65. case SSL_ERROR_SSL:
  66. return "SSL_ERROR_SSL";
  67. }
  68. return "UNKNOWN";
  69. }
  70. struct mqtt_wss_client_struct {
  71. ws_client *ws_client;
  72. mqtt_wss_log_ctx_t log;
  73. // immediate connection (e.g. proxy server)
  74. char *host;
  75. int port;
  76. // target of connection (e.g. where we want to connect to)
  77. char *target_host;
  78. int target_port;
  79. enum mqtt_wss_proxy_type proxy_type;
  80. char *proxy_uname;
  81. char *proxy_passwd;
  82. // nonblock IO related
  83. int sockfd;
  84. int write_notif_pipe[2];
  85. struct pollfd poll_fds[2];
  86. SSL_CTX *ssl_ctx;
  87. SSL *ssl;
  88. int ssl_flags;
  89. struct mqtt_ng_client *mqtt;
  90. int mqtt_keepalive;
  91. pthread_mutex_t pub_lock;
  92. // signifies that we didn't write all MQTT wanted
  93. // us to write during last cycle (e.g. due to buffer
  94. // size) and thus we should arm POLLOUT
  95. unsigned int mqtt_didnt_finish_write:1;
  96. unsigned int mqtt_connected:1;
  97. unsigned int mqtt_disconnecting:1;
  98. // Application layer callback pointers
  99. void (*msg_callback)(const char *, const void *, size_t, int);
  100. void (*puback_callback)(uint16_t packet_id);
  101. pthread_mutex_t stat_lock;
  102. struct mqtt_wss_stats stats;
  103. #ifdef MQTT_WSS_DEBUG
  104. void (*ssl_ctx_keylog_cb)(const SSL *ssl, const char *line);
  105. #endif
  106. };
  107. static void mws_connack_callback_ng(void *user_ctx, int code)
  108. {
  109. mqtt_wss_client client = user_ctx;
  110. switch(code) {
  111. case 0:
  112. client->mqtt_connected = 1;
  113. return;
  114. //TODO manual labor: all the CONNACK error codes with some nice error message
  115. default:
  116. mws_error(client->log, "MQTT CONNACK returned error %d", code);
  117. return;
  118. }
  119. }
  120. static ssize_t mqtt_send_cb(void *user_ctx, const void* buf, size_t len)
  121. {
  122. mqtt_wss_client mqtt_wss_client = user_ctx;
  123. #ifdef DEBUG_ULTRA_VERBOSE
  124. mws_debug(mqtt_wss_client->log, "mqtt_pal_sendall(len=%d)", len);
  125. #endif
  126. int ret = ws_client_send(mqtt_wss_client->ws_client, WS_OP_BINARY_FRAME, buf, len);
  127. if (ret >= 0 && (size_t)ret != len) {
  128. #ifdef DEBUG_ULTRA_VERBOSE
  129. mws_debug(mqtt_wss_client->log, "Not complete message sent (Msg=%d,Sent=%d). Need to arm POLLOUT!", len, ret);
  130. #endif
  131. mqtt_wss_client->mqtt_didnt_finish_write = 1;
  132. }
  133. return ret;
  134. }
  135. mqtt_wss_client mqtt_wss_new(const char *log_prefix,
  136. mqtt_wss_log_callback_t log_callback,
  137. msg_callback_fnc_t msg_callback,
  138. void (*puback_callback)(uint16_t packet_id))
  139. {
  140. mqtt_wss_log_ctx_t log;
  141. log = mqtt_wss_log_ctx_create(log_prefix, log_callback);
  142. if(!log)
  143. return NULL;
  144. SSL_library_init();
  145. SSL_load_error_strings();
  146. mqtt_wss_client client = mw_calloc(1, sizeof(struct mqtt_wss_client_struct));
  147. if (!client) {
  148. mws_error(log, "OOM alocating mqtt_wss_client");
  149. goto fail;
  150. }
  151. pthread_mutex_init(&client->pub_lock, NULL);
  152. pthread_mutex_init(&client->stat_lock, NULL);
  153. client->msg_callback = msg_callback;
  154. client->puback_callback = puback_callback;
  155. client->ws_client = ws_client_new(0, &client->target_host, log);
  156. if (!client->ws_client) {
  157. mws_error(log, "Error creating ws_client");
  158. goto fail_1;
  159. }
  160. client->log = log;
  161. #ifdef __APPLE__
  162. if (pipe(client->write_notif_pipe)) {
  163. #else
  164. if (pipe2(client->write_notif_pipe, O_CLOEXEC /*| O_DIRECT*/)) {
  165. #endif
  166. mws_error(log, "Couldn't create pipe");
  167. goto fail_2;
  168. }
  169. client->poll_fds[POLLFD_PIPE].fd = client->write_notif_pipe[PIPE_READ_END];
  170. client->poll_fds[POLLFD_PIPE].events = POLLIN;
  171. client->poll_fds[POLLFD_SOCKET].events = POLLIN;
  172. struct mqtt_ng_init settings = {
  173. .log = log,
  174. .data_in = client->ws_client->buf_to_mqtt,
  175. .data_out_fnc = &mqtt_send_cb,
  176. .user_ctx = client,
  177. .connack_callback = &mws_connack_callback_ng,
  178. .puback_callback = puback_callback,
  179. .msg_callback = msg_callback
  180. };
  181. if ( (client->mqtt = mqtt_ng_init(&settings)) == NULL ) {
  182. mws_error(log, "Error initializing internal MQTT client");
  183. goto fail_3;
  184. }
  185. return client;
  186. fail_3:
  187. close(client->write_notif_pipe[PIPE_WRITE_END]);
  188. close(client->write_notif_pipe[PIPE_READ_END]);
  189. fail_2:
  190. ws_client_destroy(client->ws_client);
  191. fail_1:
  192. mw_free(client);
  193. fail:
  194. mqtt_wss_log_ctx_destroy(log);
  195. return NULL;
  196. }
  197. void mqtt_wss_set_max_buf_size(mqtt_wss_client client, size_t size)
  198. {
  199. mqtt_ng_set_max_mem(client->mqtt, size);
  200. }
  201. void mqtt_wss_destroy(mqtt_wss_client client)
  202. {
  203. mqtt_ng_destroy(client->mqtt);
  204. close(client->write_notif_pipe[PIPE_WRITE_END]);
  205. close(client->write_notif_pipe[PIPE_READ_END]);
  206. ws_client_destroy(client->ws_client);
  207. // deleted after client->ws_client
  208. // as it "borrows" this pointer and might use it
  209. if (client->target_host == client->host)
  210. client->target_host = NULL;
  211. if (client->target_host)
  212. mw_free(client->target_host);
  213. if (client->host)
  214. mw_free(client->host);
  215. mw_free(client->proxy_passwd);
  216. mw_free(client->proxy_uname);
  217. if (client->ssl)
  218. SSL_free(client->ssl);
  219. if (client->ssl_ctx)
  220. SSL_CTX_free(client->ssl_ctx);
  221. if (client->sockfd > 0)
  222. close(client->sockfd);
  223. pthread_mutex_destroy(&client->pub_lock);
  224. pthread_mutex_destroy(&client->stat_lock);
  225. mqtt_wss_log_ctx_destroy(client->log);
  226. mw_free(client);
  227. }
  228. static int cert_verify_callback(int preverify_ok, X509_STORE_CTX *ctx)
  229. {
  230. SSL *ssl;
  231. X509 *err_cert;
  232. mqtt_wss_client client;
  233. int err = 0, depth;
  234. char *err_str;
  235. ssl = X509_STORE_CTX_get_ex_data(ctx, SSL_get_ex_data_X509_STORE_CTX_idx());
  236. client = SSL_get_ex_data(ssl, 0);
  237. // TODO handle depth as per https://www.openssl.org/docs/man1.0.2/man3/SSL_CTX_set_verify.html
  238. if (!preverify_ok) {
  239. err = X509_STORE_CTX_get_error(ctx);
  240. depth = X509_STORE_CTX_get_error_depth(ctx);
  241. err_cert = X509_STORE_CTX_get_current_cert(ctx);
  242. err_str = X509_NAME_oneline(X509_get_subject_name(err_cert), NULL, 0);
  243. mws_error(client->log, "verify error:num=%d:%s:depth=%d:%s", err,
  244. X509_verify_cert_error_string(err), depth, err_str);
  245. mw_free(err_str);
  246. }
  247. if (!preverify_ok && err == X509_V_ERR_DEPTH_ZERO_SELF_SIGNED_CERT &&
  248. client->ssl_flags & MQTT_WSS_SSL_ALLOW_SELF_SIGNED)
  249. {
  250. preverify_ok = 1;
  251. mws_error(client->log, "Self Signed Certificate Accepted as the connection was "
  252. "requested with MQTT_WSS_SSL_ALLOW_SELF_SIGNED");
  253. }
  254. return preverify_ok;
  255. }
  256. #define PROXY_CONNECT "CONNECT"
  257. #define PROXY_HTTP "HTTP/1.1"
  258. #define HTTP_ENDLINE "\x0D\x0A"
  259. #define HTTP_HDR_TERMINATOR "\x0D\x0A\x0D\x0A"
  260. #define HTTP_CODE_LEN 4
  261. #define HTTP_REASON_MAX_LEN 512
  262. static int http_parse_reply(mqtt_wss_client client, rbuf_t buf)
  263. {
  264. char *ptr;
  265. char http_code_s[4];
  266. int http_code;
  267. int idx;
  268. if (rbuf_memcmp_n(buf, PROXY_HTTP, strlen(PROXY_HTTP))) {
  269. mws_error(client->log, "http_proxy expected reply with \"" PROXY_HTTP "\"");
  270. return 1;
  271. }
  272. rbuf_bump_tail(buf, strlen(PROXY_HTTP));
  273. if (!rbuf_pop(buf, http_code_s, 1) || http_code_s[0] != 0x20) {
  274. mws_error(client->log, "http_proxy missing space after \"" PROXY_HTTP "\"");
  275. return 2;
  276. }
  277. if (!rbuf_pop(buf, http_code_s, HTTP_CODE_LEN)) {
  278. mws_error(client->log, "http_proxy missing HTTP code");
  279. return 3;
  280. }
  281. for (int i = 0; i < HTTP_CODE_LEN - 1; i++)
  282. if (http_code_s[i] > 0x39 || http_code_s[i] < 0x30) {
  283. mws_error(client->log, "http_proxy HTTP code non numeric");
  284. return 4;
  285. }
  286. http_code_s[HTTP_CODE_LEN - 1] = 0;
  287. http_code = atoi(http_code_s);
  288. // TODO check if we ever have more headers here
  289. rbuf_find_bytes(buf, HTTP_ENDLINE, strlen(HTTP_ENDLINE), &idx);
  290. if (idx >= HTTP_REASON_MAX_LEN) {
  291. mws_error(client->log, "http_proxy returned reason that is too long");
  292. return 5;
  293. }
  294. if (http_code != 200) {
  295. ptr = mw_malloc(idx + 1);
  296. if (!ptr)
  297. return 6;
  298. rbuf_pop(buf, ptr, idx);
  299. ptr[idx] = 0;
  300. mws_error(client->log, "http_proxy returned error code %d \"%s\"", http_code, ptr);
  301. mw_free(ptr);
  302. return 7;
  303. }/* else
  304. rbuf_bump_tail(buf, idx);*/
  305. rbuf_find_bytes(buf, HTTP_HDR_TERMINATOR, strlen(HTTP_HDR_TERMINATOR), &idx);
  306. if (idx)
  307. rbuf_bump_tail(buf, idx);
  308. rbuf_bump_tail(buf, strlen(HTTP_HDR_TERMINATOR));
  309. if (rbuf_bytes_available(buf)) {
  310. mws_error(client->log, "http_proxy unexpected trailing bytes after end of HTTP hdr");
  311. return 8;
  312. }
  313. mws_debug(client->log, "http_proxy CONNECT succeeded");
  314. return 0;
  315. }
  316. #if defined(OPENSSL_VERSION_NUMBER) && OPENSSL_VERSION_NUMBER < OPENSSL_VERSION_110
  317. static EVP_ENCODE_CTX *EVP_ENCODE_CTX_new(void)
  318. {
  319. EVP_ENCODE_CTX *ctx = OPENSSL_malloc(sizeof(*ctx));
  320. if (ctx != NULL) {
  321. memset(ctx, 0, sizeof(*ctx));
  322. }
  323. return ctx;
  324. }
  325. static void EVP_ENCODE_CTX_free(EVP_ENCODE_CTX *ctx)
  326. {
  327. OPENSSL_free(ctx);
  328. return;
  329. }
  330. #endif
  331. inline static int base64_encode_helper(unsigned char *out, int *outl, const unsigned char *in, int in_len)
  332. {
  333. int len;
  334. unsigned char *str = out;
  335. EVP_ENCODE_CTX *ctx = EVP_ENCODE_CTX_new();
  336. EVP_EncodeInit(ctx);
  337. EVP_EncodeUpdate(ctx, str, outl, in, in_len);
  338. str += *outl;
  339. EVP_EncodeFinal(ctx, str, &len);
  340. *outl += len;
  341. str = out;
  342. while(*str) {
  343. if (*str != 0x0D && *str != 0x0A)
  344. *out++ = *str++;
  345. else
  346. str++;
  347. }
  348. *out = 0;
  349. EVP_ENCODE_CTX_free(ctx);
  350. return 0;
  351. }
  352. static int http_proxy_connect(mqtt_wss_client client)
  353. {
  354. int rc;
  355. struct pollfd poll_fd;
  356. rbuf_t r_buf = rbuf_create(4096);
  357. if (!r_buf)
  358. return 1;
  359. char *r_buf_ptr;
  360. size_t r_buf_linear_insert_capacity;
  361. poll_fd.fd = client->sockfd;
  362. poll_fd.events = POLLIN;
  363. r_buf_ptr = rbuf_get_linear_insert_range(r_buf, &r_buf_linear_insert_capacity);
  364. snprintf(r_buf_ptr, r_buf_linear_insert_capacity,"%s %s:%d %s" HTTP_ENDLINE, PROXY_CONNECT, client->target_host, client->target_port, PROXY_HTTP);
  365. write(client->sockfd, r_buf_ptr, strlen(r_buf_ptr));
  366. if (client->proxy_uname) {
  367. size_t creds_plain_len = strlen(client->proxy_uname) + strlen(client->proxy_passwd) + 2;
  368. char *creds_plain = mw_malloc(creds_plain_len);
  369. if (!creds_plain) {
  370. mws_error(client->log, "OOM creds_plain");
  371. rc = 6;
  372. goto cleanup;
  373. }
  374. int creds_base64_len = (((4 * creds_plain_len / 3) + 3) & ~3);
  375. // OpenSSL encoder puts newline every 64 output bytes
  376. // we remove those but during encoding we need that space in the buffer
  377. creds_base64_len += (1+(creds_base64_len/64)) * strlen("\n");
  378. char *creds_base64 = mw_malloc(creds_base64_len + 1);
  379. if (!creds_base64) {
  380. mw_free(creds_plain);
  381. mws_error(client->log, "OOM creds_base64");
  382. rc = 6;
  383. goto cleanup;
  384. }
  385. char *ptr = creds_plain;
  386. strcpy(ptr, client->proxy_uname);
  387. ptr += strlen(client->proxy_uname);
  388. *ptr++ = ':';
  389. strcpy(ptr, client->proxy_passwd);
  390. int b64_len;
  391. base64_encode_helper((unsigned char*)creds_base64, &b64_len, (unsigned char*)creds_plain, strlen(creds_plain));
  392. mw_free(creds_plain);
  393. r_buf_ptr = rbuf_get_linear_insert_range(r_buf, &r_buf_linear_insert_capacity);
  394. snprintf(r_buf_ptr, r_buf_linear_insert_capacity,"Proxy-Authorization: Basic %s" HTTP_ENDLINE, creds_base64);
  395. write(client->sockfd, r_buf_ptr, strlen(r_buf_ptr));
  396. mw_free(creds_base64);
  397. }
  398. write(client->sockfd, HTTP_ENDLINE, strlen(HTTP_ENDLINE));
  399. // read until you find CRLF, CRLF (HTTP HDR end)
  400. // or ring buffer is full
  401. // or timeout
  402. while ((rc = poll(&poll_fd, 1, 1000)) >= 0) {
  403. if (!rc) {
  404. mws_error(client->log, "http_proxy timeout waiting reply from proxy server");
  405. rc = 2;
  406. goto cleanup;
  407. }
  408. r_buf_ptr = rbuf_get_linear_insert_range(r_buf, &r_buf_linear_insert_capacity);
  409. if (!r_buf_ptr) {
  410. mws_error(client->log, "http_proxy read ring buffer full");
  411. rc = 3;
  412. goto cleanup;
  413. }
  414. if ((rc = read(client->sockfd, r_buf_ptr, r_buf_linear_insert_capacity)) < 0) {
  415. if (errno == EWOULDBLOCK || errno == EAGAIN) {
  416. continue;
  417. }
  418. mws_error(client->log, "http_proxy error reading from socket \"%s\"", strerror(errno));
  419. rc = 4;
  420. goto cleanup;
  421. }
  422. rbuf_bump_head(r_buf, rc);
  423. if (rbuf_find_bytes(r_buf, HTTP_HDR_TERMINATOR, strlen(HTTP_HDR_TERMINATOR), &rc)) {
  424. rc = 0;
  425. if (http_parse_reply(client, r_buf))
  426. rc = 5;
  427. goto cleanup;
  428. }
  429. }
  430. mws_error(client->log, "proxy negotiation poll error \"%s\"", strerror(errno));
  431. rc = 5;
  432. cleanup:
  433. rbuf_free(r_buf);
  434. return rc;
  435. }
  436. int mqtt_wss_connect(mqtt_wss_client client, char *host, int port, struct mqtt_connect_params *mqtt_params, int ssl_flags, struct mqtt_wss_proxy *proxy)
  437. {
  438. struct sockaddr_in addr;
  439. memset(&addr, 0, sizeof(addr));
  440. addr.sin_family = AF_INET;
  441. struct hostent *he;
  442. struct in_addr **addr_list;
  443. if (!mqtt_params) {
  444. mws_error(client->log, "mqtt_params can't be null!");
  445. return -1;
  446. }
  447. // reset state in case this is reconnect
  448. client->mqtt_didnt_finish_write = 0;
  449. client->mqtt_connected = 0;
  450. client->mqtt_disconnecting = 0;
  451. ws_client_reset(client->ws_client);
  452. if (client->target_host == client->host)
  453. client->target_host = NULL;
  454. if (client->target_host)
  455. mw_free(client->target_host);
  456. if (client->host)
  457. mw_free(client->host);
  458. if (proxy && proxy->type != MQTT_WSS_DIRECT) {
  459. client->host = mw_strdup(proxy->host);
  460. client->port = proxy->port;
  461. client->target_host = mw_strdup(host);
  462. client->target_port = port;
  463. client->proxy_type = proxy->type;
  464. if (proxy->username)
  465. client->proxy_uname = mw_strdup(proxy->username);
  466. if (proxy->password)
  467. client->proxy_passwd = mw_strdup(proxy->password);
  468. } else {
  469. client->host = mw_strdup(host);
  470. client->port = port;
  471. client->target_host = client->host;
  472. client->target_port = port;
  473. }
  474. client->ssl_flags = ssl_flags;
  475. //TODO gethostbyname -> getaddinfo
  476. // hstrerror -> gai_strerror
  477. if ((he = gethostbyname(client->host)) == NULL) {
  478. mws_error(client->log, "gethostbyname() error \"%s\"", hstrerror(h_errno));
  479. return -1;
  480. }
  481. addr_list = (struct in_addr **)he->h_addr_list;
  482. if(!addr_list[0]) {
  483. mws_error(client->log, "No IP addr resolved");
  484. return -1;
  485. }
  486. mws_debug(client->log, "Resolved IP: %s", inet_ntoa(*addr_list[0]));
  487. addr.sin_addr = *addr_list[0];
  488. addr.sin_port = htons(client->port);
  489. if (client->sockfd > 0)
  490. close(client->sockfd);
  491. client->sockfd = socket(AF_INET, SOCK_STREAM, 0);
  492. if (client->sockfd < 0) {
  493. mws_error(client->log, "Couldn't create socket()");
  494. return -1;
  495. }
  496. int flag = 1;
  497. int result = setsockopt(client->sockfd,
  498. IPPROTO_TCP,
  499. TCP_NODELAY,
  500. &flag,
  501. sizeof(int));
  502. if (result < 0)
  503. mws_error(client->log, "Could not dissable NAGLE");
  504. if (connect(client->sockfd, (struct sockaddr *)&addr, sizeof(addr)) < 0) {
  505. mws_error(client->log, "Could not connect to remote endpoint \"%s\", port %d.\n", client->host, client->port);
  506. return -3;
  507. }
  508. client->poll_fds[POLLFD_SOCKET].fd = client->sockfd;
  509. if (fcntl(client->sockfd, F_SETFL, fcntl(client->sockfd, F_GETFL, 0) | O_NONBLOCK) == -1) {
  510. mws_error(client->log, "Error setting O_NONBLOCK to TCP socket. \"%s\"", strerror(errno));
  511. return -8;
  512. }
  513. if (client->proxy_type != MQTT_WSS_DIRECT)
  514. if (http_proxy_connect(client))
  515. return -4;
  516. #if OPENSSL_VERSION_NUMBER < OPENSSL_VERSION_110
  517. #if (SSLEAY_VERSION_NUMBER >= OPENSSL_VERSION_097)
  518. OPENSSL_config(NULL);
  519. #endif
  520. SSL_load_error_strings();
  521. SSL_library_init();
  522. #else
  523. if (OPENSSL_init_ssl(OPENSSL_INIT_LOAD_CONFIG, NULL) != 1) {
  524. mws_error(client->log, "Failed to initialize SSL");
  525. return -1;
  526. };
  527. #endif
  528. // free SSL structs from possible previous connections
  529. if (client->ssl)
  530. SSL_free(client->ssl);
  531. if (client->ssl_ctx)
  532. SSL_CTX_free(client->ssl_ctx);
  533. client->ssl_ctx = SSL_CTX_new(SSLv23_client_method());
  534. if (!(client->ssl_flags & MQTT_WSS_SSL_DONT_CHECK_CERTS)) {
  535. SSL_CTX_set_default_verify_paths(client->ssl_ctx);
  536. SSL_CTX_set_verify(client->ssl_ctx, SSL_VERIFY_PEER | SSL_VERIFY_CLIENT_ONCE, cert_verify_callback);
  537. } else
  538. mws_error(client->log, "SSL Certificate checking completely disabled!!!");
  539. #ifdef MQTT_WSS_DEBUG
  540. if(client->ssl_ctx_keylog_cb)
  541. SSL_CTX_set_keylog_callback(client->ssl_ctx, client->ssl_ctx_keylog_cb);
  542. #endif
  543. client->ssl = SSL_new(client->ssl_ctx);
  544. if (!(client->ssl_flags & MQTT_WSS_SSL_DONT_CHECK_CERTS)) {
  545. if (!SSL_set_ex_data(client->ssl, 0, client)) {
  546. mws_error(client->log, "Could not SSL_set_ex_data");
  547. return -4;
  548. }
  549. }
  550. SSL_set_fd(client->ssl, client->sockfd);
  551. SSL_set_connect_state(client->ssl);
  552. if (!SSL_set_tlsext_host_name(client->ssl, client->target_host)) {
  553. mws_error(client->log, "Error setting TLS SNI host");
  554. return -7;
  555. }
  556. result = SSL_connect(client->ssl);
  557. if (result != -1 && result != 1) {
  558. mws_error(client->log, "SSL could not connect");
  559. return -5;
  560. }
  561. if (result == -1) {
  562. int ec = SSL_get_error(client->ssl, result);
  563. if (ec != SSL_ERROR_WANT_READ && ec != SSL_ERROR_WANT_WRITE) {
  564. mws_error(client->log, "Failed to start SSL connection");
  565. return -6;
  566. }
  567. }
  568. client->mqtt_keepalive = (mqtt_params->keep_alive ? mqtt_params->keep_alive : 400);
  569. mws_info(client->log, "Going to connect using internal MQTT 5 implementation");
  570. struct mqtt_auth_properties auth;
  571. auth.client_id = (char*)mqtt_params->clientid;
  572. auth.client_id_free = NULL;
  573. auth.username = (char*)mqtt_params->username;
  574. auth.username_free = NULL;
  575. auth.password = (char*)mqtt_params->password;
  576. auth.password_free = NULL;
  577. struct mqtt_lwt_properties lwt;
  578. lwt.will_topic = (char*)mqtt_params->will_topic;
  579. lwt.will_topic_free = NULL;
  580. lwt.will_message = (void*)mqtt_params->will_msg;
  581. lwt.will_message_free = NULL; // TODO expose no copy version to API
  582. lwt.will_message_size = mqtt_params->will_msg_len;
  583. lwt.will_qos = (mqtt_params->will_flags & MQTT_WSS_PUB_QOSMASK);
  584. lwt.will_retain = mqtt_params->will_flags & MQTT_WSS_PUB_RETAIN;
  585. int ret = mqtt_ng_connect(client->mqtt, &auth, mqtt_params->will_msg ? &lwt : NULL, 1, client->mqtt_keepalive);
  586. if (ret) {
  587. mws_error(client->log, "Error generating MQTT connect");
  588. return 1;
  589. }
  590. client->poll_fds[POLLFD_PIPE].events = POLLIN;
  591. client->poll_fds[POLLFD_SOCKET].events = POLLIN;
  592. // wait till MQTT connection is established
  593. while (!client->mqtt_connected) {
  594. if(mqtt_wss_service(client, -1)) {
  595. mws_error(client->log, "Error connecting to MQTT WSS server \"%s\", port %d.", host, port);
  596. return 2;
  597. }
  598. }
  599. return 0;
  600. }
  601. #define NSEC_PER_USEC 1000ULL
  602. #define USEC_PER_SEC 1000000ULL
  603. #define NSEC_PER_MSEC 1000000ULL
  604. #define NSEC_PER_SEC 1000000000ULL
  605. static inline uint64_t boottime_usec(mqtt_wss_client client) {
  606. struct timespec ts;
  607. #if defined(__APPLE__) || defined(__FreeBSD__)
  608. if (clock_gettime(CLOCK_MONOTONIC, &ts) == -1) {
  609. #else
  610. if (clock_gettime(CLOCK_BOOTTIME, &ts) == -1) {
  611. #endif
  612. mws_error(client->log, "clock_gettimte failed");
  613. return 0;
  614. }
  615. return (uint64_t)ts.tv_sec * USEC_PER_SEC + (ts.tv_nsec % NSEC_PER_SEC) / NSEC_PER_USEC;
  616. }
  617. #define MWS_TIMED_OUT 1
  618. #define MWS_ERROR 2
  619. #define MWS_OK 0
  620. static inline const char *mqtt_wss_error_tos(int ec)
  621. {
  622. switch(ec) {
  623. case MWS_TIMED_OUT:
  624. return "Error: Operation was not able to finish in time";
  625. case MWS_ERROR:
  626. return "Unspecified Error";
  627. default:
  628. return "Unknown Error Code!";
  629. }
  630. }
  631. static inline int mqtt_wss_service_all(mqtt_wss_client client, int timeout_ms)
  632. {
  633. uint64_t exit_by = boottime_usec(client) + (timeout_ms * NSEC_PER_MSEC);
  634. uint64_t now;
  635. client->poll_fds[POLLFD_SOCKET].events |= POLLOUT; // TODO when entering mwtt_wss_service use out buffer size to arm POLLOUT
  636. while (rbuf_bytes_available(client->ws_client->buf_write)) {
  637. now = boottime_usec(client);
  638. if (now >= exit_by)
  639. return MWS_TIMED_OUT;
  640. if (mqtt_wss_service(client, exit_by - now))
  641. return MWS_ERROR;
  642. }
  643. return MWS_OK;
  644. }
  645. void mqtt_wss_disconnect(mqtt_wss_client client, int timeout_ms)
  646. {
  647. int ret;
  648. // block application from sending more MQTT messages
  649. client->mqtt_disconnecting = 1;
  650. // send whatever was left at the time of calling this function
  651. ret = mqtt_wss_service_all(client, timeout_ms / 4);
  652. if(ret)
  653. mws_error(client->log,
  654. "Error while trying to send all remaining data in an attempt "
  655. "to gracefully disconnect! EC=%d Desc:\"%s\"",
  656. ret,
  657. mqtt_wss_error_tos(ret));
  658. // schedule and send MQTT disconnect
  659. mqtt_ng_disconnect(client->mqtt, 0);
  660. mqtt_ng_sync(client->mqtt);
  661. ret = mqtt_wss_service_all(client, timeout_ms / 4);
  662. if(ret)
  663. mws_error(client->log,
  664. "Error while trying to send MQTT disconnect message in an attempt "
  665. "to gracefully disconnect! EC=%d Desc:\"%s\"",
  666. ret,
  667. mqtt_wss_error_tos(ret));
  668. // send WebSockets close message
  669. uint16_t ws_rc = htobe16(1000);
  670. ws_client_send(client->ws_client, WS_OP_CONNECTION_CLOSE, (const char*)&ws_rc, sizeof(ws_rc));
  671. ret = mqtt_wss_service_all(client, timeout_ms / 4);
  672. if(ret) {
  673. // Some MQTT/WSS servers will close socket on receipt of MQTT disconnect and
  674. // do not wait for WebSocket to be closed properly
  675. mws_warn(client->log,
  676. "Error while trying to send WebSocket disconnect message in an attempt "
  677. "to gracefully disconnect! EC=%d Desc:\"%s\".",
  678. ret,
  679. mqtt_wss_error_tos(ret));
  680. }
  681. // Service WSS connection until remote closes connection (usual)
  682. // or timeout happens (unusual) in which case we close
  683. mqtt_wss_service_all(client, timeout_ms / 4);
  684. close(client->sockfd);
  685. client->sockfd = -1;
  686. }
  687. static inline void mqtt_wss_wakeup(mqtt_wss_client client)
  688. {
  689. #ifdef DEBUG_ULTRA_VERBOSE
  690. mws_debug(client->log, "mqtt_wss_wakup - forcing wake up of main loop");
  691. #endif
  692. write(client->write_notif_pipe[PIPE_WRITE_END], " ", 1);
  693. }
  694. #define THROWAWAY_BUF_SIZE 32
  695. char throwaway[THROWAWAY_BUF_SIZE];
  696. static inline void util_clear_pipe(int fd)
  697. {
  698. (void)read(fd, throwaway, THROWAWAY_BUF_SIZE);
  699. }
  700. static inline void set_socket_pollfds(mqtt_wss_client client, int ssl_ret) {
  701. if (ssl_ret == SSL_ERROR_WANT_WRITE)
  702. client->poll_fds[POLLFD_SOCKET].events |= POLLOUT;
  703. if (ssl_ret == SSL_ERROR_WANT_READ)
  704. client->poll_fds[POLLFD_SOCKET].events |= POLLIN;
  705. }
  706. static int handle_mqtt_internal(mqtt_wss_client client)
  707. {
  708. int rc = mqtt_ng_sync(client->mqtt);
  709. if (rc) {
  710. mws_error(client->log, "mqtt_ng_sync returned %d != 0", rc);
  711. client->mqtt_connected = 0;
  712. return 1;
  713. }
  714. return 0;
  715. }
  716. #define SEC_TO_MSEC 1000
  717. static inline long long int t_till_next_keepalive_ms(mqtt_wss_client client)
  718. {
  719. time_t last_send = mqtt_ng_last_send_time(client->mqtt);
  720. long long int next_mqtt_keep_alive = (last_send * SEC_TO_MSEC)
  721. + (client->mqtt_keepalive * (SEC_TO_MSEC * 0.75 /* SEND IN ADVANCE */));
  722. return(next_mqtt_keep_alive - (time(NULL) * SEC_TO_MSEC));
  723. }
  724. #ifdef MQTT_WSS_CPUSTATS
  725. static inline uint64_t mqtt_wss_now_usec(mqtt_wss_client client) {
  726. struct timespec ts;
  727. if(clock_gettime(CLOCK_MONOTONIC, &ts) == -1) {
  728. mws_error(client->log, "clock_gettime(CLOCK_MONOTONIC, &timespec) failed.");
  729. return 0;
  730. }
  731. return (uint64_t)ts.tv_sec * USEC_PER_SEC + (ts.tv_nsec % NSEC_PER_SEC) / NSEC_PER_USEC;
  732. }
  733. #endif
  734. int mqtt_wss_service(mqtt_wss_client client, int timeout_ms)
  735. {
  736. char *ptr;
  737. size_t size;
  738. int ret;
  739. int send_keepalive = 0;
  740. #ifdef MQTT_WSS_CPUSTATS
  741. uint64_t t1,t2;
  742. t1 = mqtt_wss_now_usec(client);
  743. #endif
  744. #ifdef DEBUG_ULTRA_VERBOSE
  745. mws_debug(client->log, ">>>>> mqtt_wss_service <<<<<");
  746. mws_debug(client->log, "Waiting for events: %s%s%s",
  747. (client->poll_fds[POLLFD_SOCKET].events & POLLIN) ? "SOCKET_POLLIN " : "",
  748. (client->poll_fds[POLLFD_SOCKET].events & POLLOUT) ? "SOCKET_POLLOUT " : "",
  749. (client->poll_fds[POLLFD_PIPE].events & POLLIN) ? "PIPE_POLLIN" : "" );
  750. #endif
  751. // Check user requested TO doesn't interfere with MQTT keep alives
  752. long long int till_next_keep_alive = t_till_next_keepalive_ms(client);
  753. if (client->mqtt_connected && (timeout_ms < 0 || timeout_ms >= till_next_keep_alive)) {
  754. #ifdef DEBUG_ULTRA_VERBOSE
  755. mws_debug(client->log, "Shortening Timeout requested %d to %lld to ensure keep-alive can be sent", timeout_ms, till_next_keep_alive);
  756. #endif
  757. timeout_ms = till_next_keep_alive;
  758. send_keepalive = 1;
  759. }
  760. #ifdef MQTT_WSS_CPUSTATS
  761. t2 = mqtt_wss_now_usec(client);
  762. client->stats.time_keepalive += t2 - t1;
  763. #endif
  764. if ((ret = poll(client->poll_fds, 2, timeout_ms >= 0 ? timeout_ms : -1)) < 0) {
  765. if (errno == EINTR) {
  766. mws_warn(client->log, "poll interrupted by EINTR");
  767. return 0;
  768. }
  769. mws_error(client->log, "poll error \"%s\"", strerror(errno));
  770. return -2;
  771. }
  772. #ifdef DEBUG_ULTRA_VERBOSE
  773. mws_debug(client->log, "Poll events happened: %s%s%s%s",
  774. (client->poll_fds[POLLFD_SOCKET].revents & POLLIN) ? "SOCKET_POLLIN " : "",
  775. (client->poll_fds[POLLFD_SOCKET].revents & POLLOUT) ? "SOCKET_POLLOUT " : "",
  776. (client->poll_fds[POLLFD_PIPE].revents & POLLIN) ? "PIPE_POLLIN " : "",
  777. (!ret) ? "POLL_TIMEOUT" : "");
  778. #endif
  779. #ifdef MQTT_WSS_CPUSTATS
  780. t1 = mqtt_wss_now_usec(client);
  781. #endif
  782. if (ret == 0) {
  783. if (send_keepalive) {
  784. // otherwise we shortened the timeout ourselves to take care of
  785. // MQTT keep alives
  786. #ifdef DEBUG_ULTRA_VERBOSE
  787. mws_debug(client->log, "Forcing MQTT Ping/keep-alive");
  788. #endif
  789. mqtt_ng_ping(client->mqtt);
  790. } else {
  791. // if poll timed out and user requested timeout was being used
  792. // return here let user do his work and he will call us back soon
  793. return 0;
  794. }
  795. }
  796. #ifdef MQTT_WSS_CPUSTATS
  797. t2 = mqtt_wss_now_usec(client);
  798. client->stats.time_keepalive += t2 - t1;
  799. #endif
  800. client->poll_fds[POLLFD_SOCKET].events = 0;
  801. if ((ptr = rbuf_get_linear_insert_range(client->ws_client->buf_read, &size))) {
  802. if((ret = SSL_read(client->ssl, ptr, size)) > 0) {
  803. #ifdef DEBUG_ULTRA_VERBOSE
  804. mws_debug(client->log, "SSL_Read: Read %d.", ret);
  805. #endif
  806. pthread_mutex_lock(&client->stat_lock);
  807. client->stats.bytes_rx += ret;
  808. pthread_mutex_unlock(&client->stat_lock);
  809. rbuf_bump_head(client->ws_client->buf_read, ret);
  810. } else {
  811. int errnobkp = errno;
  812. ret = SSL_get_error(client->ssl, ret);
  813. #ifdef DEBUG_ULTRA_VERBOSE
  814. mws_debug(client->log, "Read Err: %s", util_openssl_ret_err(ret));
  815. #endif
  816. set_socket_pollfds(client, ret);
  817. if (ret != SSL_ERROR_WANT_READ &&
  818. ret != SSL_ERROR_WANT_WRITE) {
  819. mws_error(client->log, "SSL_read error: %d %s", ret, util_openssl_ret_err(ret));
  820. if (ret == SSL_ERROR_SYSCALL)
  821. mws_error(client->log, "SSL_read SYSCALL errno: %d %s", errnobkp, strerror(errnobkp));
  822. return MQTT_WSS_ERR_CONN_DROP;
  823. }
  824. }
  825. }
  826. #ifdef MQTT_WSS_CPUSTATS
  827. t1 = mqtt_wss_now_usec(client);
  828. client->stats.time_read_socket += t1 - t2;
  829. #endif
  830. ret = ws_client_process(client->ws_client);
  831. switch(ret) {
  832. case WS_CLIENT_PROTOCOL_ERROR:
  833. return MQTT_WSS_ERR_PROTO_WS;
  834. case WS_CLIENT_NEED_MORE_BYTES:
  835. #ifdef DEBUG_ULTRA_VERBOSE
  836. mws_debug(client->log, "WSCLIENT WANT READ");
  837. #endif
  838. client->poll_fds[POLLFD_SOCKET].events |= POLLIN;
  839. break;
  840. case WS_CLIENT_CONNECTION_CLOSED:
  841. return MQTT_WSS_ERR_CONN_DROP;
  842. }
  843. #ifdef MQTT_WSS_CPUSTATS
  844. t2 = mqtt_wss_now_usec(client);
  845. client->stats.time_process_websocket += t2 - t1;
  846. #endif
  847. // process MQTT stuff
  848. if(client->ws_client->state == WS_ESTABLISHED)
  849. if (handle_mqtt_internal(client))
  850. return MQTT_WSS_ERR_PROTO_MQTT;
  851. if (client->mqtt_didnt_finish_write) {
  852. client->mqtt_didnt_finish_write = 0;
  853. client->poll_fds[POLLFD_SOCKET].events |= POLLOUT;
  854. }
  855. #ifdef MQTT_WSS_CPUSTATS
  856. t1 = mqtt_wss_now_usec(client);
  857. client->stats.time_process_mqtt += t1 - t2;
  858. #endif
  859. if ((ptr = rbuf_get_linear_read_range(client->ws_client->buf_write, &size))) {
  860. #ifdef DEBUG_ULTRA_VERBOSE
  861. mws_debug(client->log, "Have data to write to SSL");
  862. #endif
  863. if ((ret = SSL_write(client->ssl, ptr, size)) > 0) {
  864. #ifdef DEBUG_ULTRA_VERBOSE
  865. mws_debug(client->log, "SSL_Write: Written %d of avail %d.", ret, size);
  866. #endif
  867. pthread_mutex_lock(&client->stat_lock);
  868. client->stats.bytes_tx += ret;
  869. pthread_mutex_unlock(&client->stat_lock);
  870. rbuf_bump_tail(client->ws_client->buf_write, ret);
  871. } else {
  872. int errnobkp = errno;
  873. ret = SSL_get_error(client->ssl, ret);
  874. #ifdef DEBUG_ULTRA_VERBOSE
  875. mws_debug(client->log, "Write Err: %s", util_openssl_ret_err(ret));
  876. #endif
  877. set_socket_pollfds(client, ret);
  878. if (ret != SSL_ERROR_WANT_READ &&
  879. ret != SSL_ERROR_WANT_WRITE) {
  880. mws_error(client->log, "SSL_write error: %d %s", ret, util_openssl_ret_err(ret));
  881. if (ret == SSL_ERROR_SYSCALL)
  882. mws_error(client->log, "SSL_write SYSCALL errno: %d %s", errnobkp, strerror(errnobkp));
  883. return MQTT_WSS_ERR_CONN_DROP;
  884. }
  885. }
  886. }
  887. if(client->poll_fds[POLLFD_PIPE].revents & POLLIN)
  888. util_clear_pipe(client->write_notif_pipe[PIPE_READ_END]);
  889. #ifdef MQTT_WSS_CPUSTATS
  890. t2 = mqtt_wss_now_usec(client);
  891. client->stats.time_write_socket += t2 - t1;
  892. #endif
  893. return MQTT_WSS_OK;
  894. }
  895. int mqtt_wss_publish5(mqtt_wss_client client,
  896. char *topic,
  897. free_fnc_t topic_free,
  898. void *msg,
  899. free_fnc_t msg_free,
  900. size_t msg_len,
  901. uint8_t publish_flags,
  902. uint16_t *packet_id)
  903. {
  904. if (client->mqtt_disconnecting) {
  905. mws_error(client->log, "mqtt_wss is disconnecting can't publish");
  906. return 1;
  907. }
  908. if (!client->mqtt_connected) {
  909. mws_error(client->log, "MQTT is offline. Can't send message.");
  910. return 1;
  911. }
  912. uint8_t mqtt_flags = 0;
  913. mqtt_flags = (publish_flags & MQTT_WSS_PUB_QOSMASK) << 1;
  914. if (publish_flags & MQTT_WSS_PUB_RETAIN)
  915. mqtt_flags |= MQTT_PUBLISH_RETAIN;
  916. int rc = mqtt_ng_publish(client->mqtt, topic, topic_free, msg, msg_free, msg_len, mqtt_flags, packet_id);
  917. if (rc == MQTT_NG_MSGGEN_MSG_TOO_BIG)
  918. return MQTT_WSS_ERR_TOO_BIG_FOR_SERVER;
  919. mqtt_wss_wakeup(client);
  920. return rc;
  921. }
  922. int mqtt_wss_subscribe(mqtt_wss_client client, char *topic, int max_qos_level)
  923. {
  924. (void)max_qos_level; //TODO now hardcoded
  925. if (!client->mqtt_connected) {
  926. mws_error(client->log, "MQTT is offline. Can't subscribe.");
  927. return 1;
  928. }
  929. if (client->mqtt_disconnecting) {
  930. mws_error(client->log, "mqtt_wss is disconnecting can't subscribe");
  931. return 1;
  932. }
  933. struct mqtt_sub sub = {
  934. .topic = topic,
  935. .topic_free = NULL,
  936. .options = /* max_qos_level & 0x3 TODO when QOS > 1 implemented */ 0x01 | (0x01 << 3)
  937. };
  938. mqtt_ng_subscribe(client->mqtt, &sub, 1);
  939. mqtt_wss_wakeup(client);
  940. return 0;
  941. }
  942. struct mqtt_wss_stats mqtt_wss_get_stats(mqtt_wss_client client)
  943. {
  944. struct mqtt_wss_stats current;
  945. pthread_mutex_lock(&client->stat_lock);
  946. current = client->stats;
  947. memset(&client->stats, 0, sizeof(client->stats));
  948. pthread_mutex_unlock(&client->stat_lock);
  949. mqtt_ng_get_stats(client->mqtt, &current.mqtt);
  950. return current;
  951. }
  952. int mqtt_wss_set_topic_alias(mqtt_wss_client client, const char *topic)
  953. {
  954. return mqtt_ng_set_topic_alias(client->mqtt, topic);
  955. }
  956. #ifdef MQTT_WSS_DEBUG
  957. void mqtt_wss_set_SSL_CTX_keylog_cb(mqtt_wss_client client, void (*ssl_ctx_keylog_cb)(const SSL *ssl, const char *line))
  958. {
  959. client->ssl_ctx_keylog_cb = ssl_ctx_keylog_cb;
  960. }
  961. #endif