aclk_lws_wss_client.c 21 KB


  1. // SPDX-License-Identifier: GPL-3.0-or-later
  2. #include "aclk_lws_wss_client.h"
  3. #include "libnetdata/libnetdata.h"
  4. #include "../../daemon/common.h"
  5. #include "aclk_common.h"
  6. #include "aclk_stats.h"
  7. extern int aclk_shutting_down;
  8. static int aclk_lws_wss_callback(struct lws *wsi, enum lws_callback_reasons reason, void *user, void *in, size_t len);
  9. struct aclk_lws_wss_perconnect_data {
  10. int todo;
  11. };
  12. static struct aclk_lws_wss_engine_instance *engine_instance = NULL;
  13. void lws_wss_check_queues(size_t *write_len, size_t *write_len_bytes, size_t *read_len)
  14. {
  15. if (write_len != NULL && write_len_bytes != NULL)
  16. {
  17. *write_len = 0;
  18. *write_len_bytes = 0;
  19. if (engine_instance != NULL)
  20. {
  21. aclk_lws_mutex_lock(&engine_instance->write_buf_mutex);
  22. struct lws_wss_packet_buffer *write_b;
  23. size_t w,wb;
  24. for(w=0, wb=0, write_b = engine_instance->write_buffer_head; write_b != NULL; write_b = write_b->next)
  25. {
  26. w++;
  27. wb += write_b->data_size - write_b->written;
  28. }
  29. *write_len = w;
  30. *write_len_bytes = wb;
  31. aclk_lws_mutex_unlock(&engine_instance->write_buf_mutex);
  32. }
  33. }
  34. else if (write_len != NULL)
  35. {
  36. *write_len = 0;
  37. if (engine_instance != NULL)
  38. {
  39. aclk_lws_mutex_lock(&engine_instance->write_buf_mutex);
  40. struct lws_wss_packet_buffer *write_b;
  41. size_t w;
  42. for(w=0, write_b = engine_instance->write_buffer_head; write_b != NULL; write_b = write_b->next)
  43. w++;
  44. *write_len = w;
  45. aclk_lws_mutex_unlock(&engine_instance->write_buf_mutex);
  46. }
  47. }
  48. if (read_len != NULL)
  49. {
  50. *read_len = 0;
  51. if (engine_instance != NULL)
  52. {
  53. aclk_lws_mutex_lock(&engine_instance->read_buf_mutex);
  54. *read_len = lws_ring_get_count_waiting_elements(engine_instance->read_ringbuffer, NULL);
  55. aclk_lws_mutex_unlock(&engine_instance->read_buf_mutex);
  56. }
  57. }
  58. }
  59. static inline struct lws_wss_packet_buffer *lws_wss_packet_buffer_new(void *data, size_t size)
  60. {
  61. struct lws_wss_packet_buffer *new = callocz(1, sizeof(struct lws_wss_packet_buffer));
  62. if (data) {
  63. new->data = mallocz(LWS_PRE + size);
  64. memcpy(new->data + LWS_PRE, data, size);
  65. new->data_size = size;
  66. new->written = 0;
  67. }
  68. return new;
  69. }
  70. static inline void lws_wss_packet_buffer_append(struct lws_wss_packet_buffer **list, struct lws_wss_packet_buffer *item)
  71. {
  72. struct lws_wss_packet_buffer *tail = *list;
  73. if (!*list) {
  74. *list = item;
  75. return;
  76. }
  77. while (tail->next) {
  78. tail = tail->next;
  79. }
  80. tail->next = item;
  81. }
  82. static inline struct lws_wss_packet_buffer *lws_wss_packet_buffer_pop(struct lws_wss_packet_buffer **list)
  83. {
  84. struct lws_wss_packet_buffer *ret = *list;
  85. if (ret != NULL)
  86. *list = ret->next;
  87. return ret;
  88. }
  89. static inline void lws_wss_packet_buffer_free(struct lws_wss_packet_buffer *item)
  90. {
  91. freez(item->data);
  92. freez(item);
  93. }
  94. static inline void _aclk_lws_wss_read_buffer_clear(struct lws_ring *ringbuffer)
  95. {
  96. size_t elems = lws_ring_get_count_waiting_elements(ringbuffer, NULL);
  97. if (elems > 0)
  98. lws_ring_consume(ringbuffer, NULL, NULL, elems);
  99. }
  100. static inline void _aclk_lws_wss_write_buffer_clear(struct lws_wss_packet_buffer **list)
  101. {
  102. struct lws_wss_packet_buffer *i;
  103. while ((i = lws_wss_packet_buffer_pop(list)) != NULL) {
  104. lws_wss_packet_buffer_free(i);
  105. }
  106. *list = NULL;
  107. }
  108. static inline void aclk_lws_wss_clear_io_buffers()
  109. {
  110. aclk_lws_mutex_lock(&engine_instance->read_buf_mutex);
  111. _aclk_lws_wss_read_buffer_clear(engine_instance->read_ringbuffer);
  112. aclk_lws_mutex_unlock(&engine_instance->read_buf_mutex);
  113. aclk_lws_mutex_lock(&engine_instance->write_buf_mutex);
  114. _aclk_lws_wss_write_buffer_clear(&engine_instance->write_buffer_head);
  115. aclk_lws_mutex_unlock(&engine_instance->write_buf_mutex);
  116. }
  117. static const struct lws_protocols protocols[] = { { "aclk-wss", aclk_lws_wss_callback,
  118. sizeof(struct aclk_lws_wss_perconnect_data), 32768*4, 0, 0, 32768*4 },
  119. { NULL, NULL, 0, 0, 0, 0, 0 } };
  120. static void aclk_lws_wss_log_divert(int level, const char *line)
  121. {
  122. switch (level) {
  123. case LLL_ERR:
  124. error("Libwebsockets Error: %s", line);
  125. break;
  126. case LLL_WARN:
  127. debug(D_ACLK, "Libwebsockets Warn: %s", line);
  128. break;
  129. default:
  130. error("Libwebsockets try to log with unknown log level (%d), msg: %s", level, line);
  131. }
  132. }
  133. static int aclk_lws_wss_client_init( char *target_hostname, int target_port)
  134. {
  135. static int lws_logging_initialized = 0;
  136. if (unlikely(!lws_logging_initialized)) {
  137. lws_set_log_level(LLL_ERR | LLL_WARN, aclk_lws_wss_log_divert);
  138. lws_logging_initialized = 1;
  139. }
  140. if (!target_hostname)
  141. return 1;
  142. engine_instance = callocz(1, sizeof(struct aclk_lws_wss_engine_instance));
  143. engine_instance->host = target_hostname;
  144. engine_instance->port = target_port;
  145. aclk_lws_mutex_init(&engine_instance->write_buf_mutex);
  146. aclk_lws_mutex_init(&engine_instance->read_buf_mutex);
  147. engine_instance->read_ringbuffer = lws_ring_create(1, ACLK_LWS_WSS_RECV_BUFF_SIZE_BYTES, NULL);
  148. if (!engine_instance->read_ringbuffer)
  149. goto failure_cleanup;
  150. return 0;
  151. failure_cleanup:
  152. freez(engine_instance);
  153. return 1;
  154. }
  155. void aclk_lws_wss_destroy_context()
  156. {
  157. if (!engine_instance)
  158. return;
  159. if (!engine_instance->lws_context)
  160. return;
  161. lws_context_destroy(engine_instance->lws_context);
  162. engine_instance->lws_context = NULL;
  163. }
  164. void aclk_lws_wss_client_destroy()
  165. {
  166. if (engine_instance == NULL)
  167. return;
  168. aclk_lws_wss_destroy_context();
  169. engine_instance->lws_wsi = NULL;
  170. aclk_lws_wss_clear_io_buffers();
  171. #ifdef ACLK_LWS_MOSQUITTO_IO_CALLS_MULTITHREADED
  172. pthread_mutex_destroy(&engine_instance->write_buf_mutex);
  173. pthread_mutex_destroy(&engine_instance->read_buf_mutex);
  174. #endif
  175. }
  176. #ifdef LWS_WITH_SOCKS5
  177. static int aclk_wss_set_socks(struct lws_vhost *vhost, const char *socks)
  178. {
  179. char *proxy = strstr(socks, ACLK_PROXY_PROTO_ADDR_SEPARATOR);
  180. if (!proxy)
  181. return -1;
  182. proxy += strlen(ACLK_PROXY_PROTO_ADDR_SEPARATOR);
  183. if (!*proxy)
  184. return -1;
  185. return lws_set_socks(vhost, proxy);
  186. }
  187. #endif
  188. void aclk_wss_set_proxy(struct lws_vhost *vhost)
  189. {
  190. const char *proxy;
  191. ACLK_PROXY_TYPE proxy_type;
  192. char *log;
  193. proxy = aclk_get_proxy(&proxy_type);
  194. #ifdef LWS_WITH_SOCKS5
  195. lws_set_socks(vhost, ":");
  196. #endif
  197. lws_set_proxy(vhost, ":");
  198. if (proxy_type == PROXY_TYPE_UNKNOWN) {
  199. error("Unknown proxy type");
  200. return;
  201. }
  202. if (proxy_type == PROXY_TYPE_SOCKS5 || proxy_type == PROXY_TYPE_HTTP) {
  203. log = strdupz(proxy);
  204. safe_log_proxy_censor(log);
  205. info("Connecting using %s proxy:\"%s\"", aclk_proxy_type_to_s(&proxy_type), log);
  206. freez(log);
  207. }
  208. if (proxy_type == PROXY_TYPE_SOCKS5) {
  209. #ifdef LWS_WITH_SOCKS5
  210. if (aclk_wss_set_socks(vhost, proxy))
  211. error("LWS failed to accept socks proxy.");
  212. return;
  213. #else
  214. fatal("We have no SOCKS5 support but we made it here. Programming error!");
  215. #endif
  216. }
  217. if (proxy_type == PROXY_TYPE_HTTP) {
  218. if (lws_set_proxy(vhost, proxy))
  219. error("LWS failed to accept http proxy.");
  220. return;
  221. }
  222. if (proxy_type != PROXY_DISABLED)
  223. error("Unknown proxy type");
  224. }
  225. // Return code indicates if connection attempt has started async.
  226. int aclk_lws_wss_connect(char *host, int port)
  227. {
  228. struct lws_client_connect_info i;
  229. struct lws_vhost *vhost;
  230. int n;
  231. if (!engine_instance) {
  232. if (aclk_lws_wss_client_init(host, port))
  233. return 1; // Propagate failure
  234. }
  235. if (!engine_instance->lws_context)
  236. {
  237. // First time through (on this connection), create the context
  238. struct lws_context_creation_info info;
  239. memset(&info, 0, sizeof(struct lws_context_creation_info));
  240. info.options = LWS_SERVER_OPTION_DO_SSL_GLOBAL_INIT;
  241. info.port = CONTEXT_PORT_NO_LISTEN;
  242. info.protocols = protocols;
  243. engine_instance->lws_context = lws_create_context(&info);
  244. if (!engine_instance->lws_context)
  245. {
  246. error("Failed to create lws_context, ACLK will not function");
  247. return 1;
  248. }
  249. return 0;
  250. // PROTOCOL_INIT callback will call again.
  251. }
  252. for (n = 0; n < ACLK_LWS_CALLBACK_HISTORY; n++)
  253. engine_instance->lws_callback_history[n] = 0;
  254. if (engine_instance->lws_wsi) {
  255. error("Already Connected. Only one connection supported at a time.");
  256. return 0;
  257. }
  258. memset(&i, 0, sizeof(i));
  259. i.context = engine_instance->lws_context;
  260. i.port = engine_instance->port;
  261. i.address = engine_instance->host;
  262. i.path = "/mqtt";
  263. i.host = engine_instance->host;
  264. i.protocol = "mqtt";
  265. // from LWS docu:
  266. // If option LWS_SERVER_OPTION_EXPLICIT_VHOSTS is given, no vhost is
  267. // created; you're expected to create your own vhosts afterwards using
  268. // lws_create_vhost(). Otherwise a vhost named "default" is also created
  269. // using the information in the vhost-related members, for compatibility.
  270. vhost = lws_get_vhost_by_name(engine_instance->lws_context, "default");
  271. if(!vhost)
  272. fatal("Could not find the default LWS vhost.");
  273. aclk_wss_set_proxy(vhost);
  274. #ifdef ACLK_SSL_ALLOW_SELF_SIGNED
  275. i.ssl_connection = LCCSCF_USE_SSL | LCCSCF_ALLOW_SELFSIGNED | LCCSCF_SKIP_SERVER_CERT_HOSTNAME_CHECK | LCCSCF_ALLOW_INSECURE;
  276. info("Disabling SSL certificate checks");
  277. #else
  278. i.ssl_connection = LCCSCF_USE_SSL;
  279. #endif
  280. #if defined(HAVE_X509_VERIFY_PARAM_set1_host) && HAVE_X509_VERIFY_PARAM_set1_host == 0
  281. #warning DISABLING SSL HOSTNAME VALIDATION BECAUSE IT IS NOT AVAILABLE ON THIS SYSTEM.
  282. i.ssl_connection |= LCCSCF_SKIP_SERVER_CERT_HOSTNAME_CHECK;
  283. #endif
  284. lws_client_connect_via_info(&i);
  285. return 0;
  286. }
  287. static inline int received_data_to_ringbuff(struct lws_ring *buffer, void *data, size_t len)
  288. {
  289. if (lws_ring_insert(buffer, data, len) != len) {
  290. error("ACLK_LWS_WSS_CLIENT: receive buffer full. Closing connection to prevent flooding.");
  291. return 0;
  292. }
  293. return 1;
  294. }
  295. static const char *aclk_lws_callback_name(enum lws_callback_reasons reason)
  296. {
  297. switch (reason) {
  298. case LWS_CALLBACK_CLIENT_WRITEABLE:
  299. return "LWS_CALLBACK_CLIENT_WRITEABLE";
  300. case LWS_CALLBACK_CLIENT_RECEIVE:
  301. return "LWS_CALLBACK_CLIENT_RECEIVE";
  302. case LWS_CALLBACK_PROTOCOL_INIT:
  303. return "LWS_CALLBACK_PROTOCOL_INIT";
  304. case LWS_CALLBACK_SERVER_NEW_CLIENT_INSTANTIATED:
  305. return "LWS_CALLBACK_SERVER_NEW_CLIENT_INSTANTIATED";
  306. case LWS_CALLBACK_USER:
  307. return "LWS_CALLBACK_USER";
  308. case LWS_CALLBACK_CLIENT_CONNECTION_ERROR:
  309. return "LWS_CALLBACK_CLIENT_CONNECTION_ERROR";
  310. case LWS_CALLBACK_CLIENT_CLOSED:
  311. return "LWS_CALLBACK_CLIENT_CLOSED";
  312. case LWS_CALLBACK_WS_PEER_INITIATED_CLOSE:
  313. return "LWS_CALLBACK_WS_PEER_INITIATED_CLOSE";
  314. case LWS_CALLBACK_WSI_DESTROY:
  315. return "LWS_CALLBACK_WSI_DESTROY";
  316. case LWS_CALLBACK_CLIENT_ESTABLISHED:
  317. return "LWS_CALLBACK_CLIENT_ESTABLISHED";
  318. case LWS_CALLBACK_OPENSSL_PERFORM_SERVER_CERT_VERIFICATION:
  319. return "LWS_CALLBACK_OPENSSL_PERFORM_SERVER_CERT_VERIFICATION";
  320. case LWS_CALLBACK_EVENT_WAIT_CANCELLED:
  321. return "LWS_CALLBACK_EVENT_WAIT_CANCELLED";
  322. default:
  323. // Not using an internal buffer here for thread-safety with unknown calling context.
  324. #ifdef ACLK_TRP_DEBUG_VERBOSE
  325. error("Unknown LWS callback %u", reason);
  326. #endif
  327. return "unknown";
  328. }
  329. }
  330. void aclk_lws_wss_fail_report()
  331. {
  332. int i;
  333. int anything_to_send = 0;
  334. BUFFER *buf;
  335. if (netdata_anonymous_statistics_enabled <= 0)
  336. return;
  337. // guess - most of the callback will be 1-99 + ',' + \0
  338. buf = buffer_create((ACLK_LWS_CALLBACK_HISTORY * 2) + 10);
  339. for (i = 0; i < ACLK_LWS_CALLBACK_HISTORY; i++)
  340. if (engine_instance->lws_callback_history[i]) {
  341. buffer_sprintf(buf, "%s%d", (i ? "," : ""), engine_instance->lws_callback_history[i]);
  342. anything_to_send = 1;
  343. }
  344. if (anything_to_send)
  345. send_statistics("ACLK_CONN_FAIL", "FAIL", buffer_tostring(buf));
  346. buffer_free(buf);
  347. }
  348. static int aclk_lws_wss_callback(struct lws *wsi, enum lws_callback_reasons reason, void *user, void *in, size_t len)
  349. {
  350. UNUSED(user);
  351. struct lws_wss_packet_buffer *data;
  352. int retval = 0;
  353. static int lws_shutting_down = 0;
  354. int i;
  355. for (i = ACLK_LWS_CALLBACK_HISTORY - 1; i > 0; i--)
  356. engine_instance->lws_callback_history[i] = engine_instance->lws_callback_history[i - 1];
  357. engine_instance->lws_callback_history[0] = (int)reason;
  358. if (unlikely(aclk_shutting_down && !lws_shutting_down)) {
  359. lws_shutting_down = 1;
  360. retval = -1;
  361. engine_instance->upstream_reconnect_request = 0;
  362. }
  363. // Callback servicing is forced when we are closed from above.
  364. if (engine_instance->upstream_reconnect_request) {
  365. error("Closing lws connectino due to libmosquitto error.");
  366. char *upstream_connection_error = "MQTT protocol error. Closing underlying wss connection.";
  367. lws_close_reason(
  368. wsi, LWS_CLOSE_STATUS_PROTOCOL_ERR, (unsigned char *)upstream_connection_error,
  369. strlen(upstream_connection_error));
  370. retval = -1;
  371. engine_instance->upstream_reconnect_request = 0;
  372. }
  373. // Don't log to info - volume is proportional to message flow on ACLK.
  374. switch (reason) {
  375. case LWS_CALLBACK_CLIENT_WRITEABLE:
  376. aclk_lws_mutex_lock(&engine_instance->write_buf_mutex);
  377. data = engine_instance->write_buffer_head;
  378. if (likely(data)) {
  379. size_t bytes_left = data->data_size - data->written;
  380. if ( bytes_left > FRAGMENT_SIZE)
  381. bytes_left = FRAGMENT_SIZE;
  382. int n = lws_write(wsi, data->data + LWS_PRE + data->written, bytes_left, LWS_WRITE_BINARY);
  383. if (n>=0) {
  384. data->written += n;
  385. if (aclk_stats_enabled) {
  386. ACLK_STATS_LOCK;
  387. aclk_metrics_per_sample.write_q_consumed += n;
  388. ACLK_STATS_UNLOCK;
  389. }
  390. }
  391. //error("lws_write(req=%u,written=%u) %zu of %zu",bytes_left, rc, data->written,data->data_size,rc);
  392. if (data->written == data->data_size)
  393. {
  394. lws_wss_packet_buffer_pop(&engine_instance->write_buffer_head);
  395. lws_wss_packet_buffer_free(data);
  396. }
  397. if (engine_instance->write_buffer_head)
  398. lws_callback_on_writable(engine_instance->lws_wsi);
  399. }
  400. aclk_lws_mutex_unlock(&engine_instance->write_buf_mutex);
  401. return retval;
  402. case LWS_CALLBACK_CLIENT_RECEIVE:
  403. aclk_lws_mutex_lock(&engine_instance->read_buf_mutex);
  404. if (!received_data_to_ringbuff(engine_instance->read_ringbuffer, in, len))
  405. retval = 1;
  406. aclk_lws_mutex_unlock(&engine_instance->read_buf_mutex);
  407. if (aclk_stats_enabled) {
  408. ACLK_STATS_LOCK;
  409. aclk_metrics_per_sample.read_q_added += len;
  410. ACLK_STATS_UNLOCK;
  411. }
  412. // to future myself -> do not call this while read lock is active as it will eventually
  413. // want to acquire same lock later in aclk_lws_wss_client_read() function
  414. aclk_lws_connection_data_received();
  415. return retval;
  416. case LWS_CALLBACK_WSI_CREATE:
  417. case LWS_CALLBACK_CLIENT_FILTER_PRE_ESTABLISH:
  418. case LWS_CALLBACK_CLIENT_APPEND_HANDSHAKE_HEADER:
  419. case LWS_CALLBACK_OPENSSL_LOAD_EXTRA_CLIENT_VERIFY_CERTS:
  420. case LWS_CALLBACK_GET_THREAD_ID: // ?
  421. case LWS_CALLBACK_EVENT_WAIT_CANCELLED:
  422. case LWS_CALLBACK_OPENSSL_PERFORM_SERVER_CERT_VERIFICATION:
  423. // Expected and safe to ignore.
  424. #ifdef ACLK_TRP_DEBUG_VERBOSE
  425. debug(D_ACLK, "Ignoring expected callback from LWS: %s", aclk_lws_callback_name(reason));
  426. #endif
  427. return retval;
  428. default:
  429. // Pass to next switch, this case removes compiler warnings.
  430. break;
  431. }
  432. // Log to info - volume is proportional to connection attempts.
  433. #ifdef ACLK_TRP_DEBUG_VERBOSE
  434. info("Processing callback %s", aclk_lws_callback_name(reason));
  435. #endif
  436. switch (reason) {
  437. case LWS_CALLBACK_PROTOCOL_INIT:
  438. aclk_lws_wss_connect(engine_instance->host, engine_instance->port); // Makes the outgoing connection
  439. break;
  440. case LWS_CALLBACK_SERVER_NEW_CLIENT_INSTANTIATED:
  441. if (engine_instance->lws_wsi != NULL && engine_instance->lws_wsi != wsi)
  442. error("Multiple connections on same WSI? %p vs %p", engine_instance->lws_wsi, wsi);
  443. engine_instance->lws_wsi = wsi;
  444. break;
  445. case LWS_CALLBACK_CLIENT_CONNECTION_ERROR:
  446. error(
  447. "Could not connect MQTT over WSS server \"%s:%d\". LwsReason:\"%s\"", engine_instance->host,
  448. engine_instance->port, (in ? (char *)in : "not given"));
  449. // Fall-through
  450. case LWS_CALLBACK_CLIENT_CLOSED:
  451. case LWS_CALLBACK_WS_PEER_INITIATED_CLOSE:
  452. engine_instance->lws_wsi = NULL; // inside libwebsockets lws_close_free_wsi is called after callback
  453. aclk_lws_connection_closed();
  454. return -1; // the callback response is ignored, hope the above remains true
  455. case LWS_CALLBACK_WSI_DESTROY:
  456. aclk_lws_wss_clear_io_buffers();
  457. if (!engine_instance->websocket_connection_up)
  458. aclk_lws_wss_fail_report();
  459. engine_instance->lws_wsi = NULL;
  460. engine_instance->websocket_connection_up = 0;
  461. aclk_lws_connection_closed();
  462. break;
  463. case LWS_CALLBACK_CLIENT_ESTABLISHED:
  464. engine_instance->websocket_connection_up = 1;
  465. aclk_lws_connection_established(engine_instance->host, engine_instance->port);
  466. break;
  467. default:
  468. #ifdef ACLK_TRP_DEBUG_VERBOSE
  469. error("Unexpected callback from libwebsockets %s", aclk_lws_callback_name(reason));
  470. #endif
  471. break;
  472. }
  473. return retval; //0-OK, other connection should be closed!
  474. }
  475. int aclk_lws_wss_client_write(void *buf, size_t count)
  476. {
  477. if (engine_instance && engine_instance->lws_wsi && engine_instance->websocket_connection_up) {
  478. aclk_lws_mutex_lock(&engine_instance->write_buf_mutex);
  479. lws_wss_packet_buffer_append(&engine_instance->write_buffer_head, lws_wss_packet_buffer_new(buf, count));
  480. aclk_lws_mutex_unlock(&engine_instance->write_buf_mutex);
  481. if (aclk_stats_enabled) {
  482. ACLK_STATS_LOCK;
  483. aclk_metrics_per_sample.write_q_added += count;
  484. ACLK_STATS_UNLOCK;
  485. }
  486. lws_callback_on_writable(engine_instance->lws_wsi);
  487. return count;
  488. }
  489. return 0;
  490. }
  491. int aclk_lws_wss_client_read(void *buf, size_t count)
  492. {
  493. size_t data_to_be_read = count;
  494. aclk_lws_mutex_lock(&engine_instance->read_buf_mutex);
  495. size_t readable_byte_count = lws_ring_get_count_waiting_elements(engine_instance->read_ringbuffer, NULL);
  496. if (unlikely(readable_byte_count == 0)) {
  497. errno = EAGAIN;
  498. data_to_be_read = -1;
  499. goto abort;
  500. }
  501. if (readable_byte_count < data_to_be_read)
  502. data_to_be_read = readable_byte_count;
  503. data_to_be_read = lws_ring_consume(engine_instance->read_ringbuffer, NULL, buf, data_to_be_read);
  504. if (data_to_be_read == readable_byte_count)
  505. engine_instance->data_to_read = 0;
  506. if (aclk_stats_enabled) {
  507. ACLK_STATS_LOCK;
  508. aclk_metrics_per_sample.read_q_consumed += data_to_be_read;
  509. ACLK_STATS_UNLOCK;
  510. }
  511. abort:
  512. aclk_lws_mutex_unlock(&engine_instance->read_buf_mutex);
  513. return data_to_be_read;
  514. }
  515. void aclk_lws_wss_service_loop()
  516. {
  517. if (engine_instance)
  518. {
  519. /*if (engine_instance->lws_wsi) {
  520. lws_cancel_service(engine_instance->lws_context);
  521. lws_callback_on_writable(engine_instance->lws_wsi);
  522. }*/
  523. lws_service(engine_instance->lws_context, 0);
  524. }
  525. }
  526. // in case the MQTT connection disconnect while lws transport is still operational
  527. // we should drop connection and reconnect
  528. // this function should be called when that happens to notify lws of that situation
  529. void aclk_lws_wss_mqtt_layer_disconect_notif()
  530. {
  531. if (!engine_instance)
  532. return;
  533. if (engine_instance->lws_wsi && engine_instance->websocket_connection_up) {
  534. engine_instance->upstream_reconnect_request = 1;
  535. lws_callback_on_writable(
  536. engine_instance->lws_wsi); //here we just do it to ensure we get callback called from lws, we don't need any actual data to be written.
  537. }
  538. }