aclk_tx_msgs.c 17 KB


  1. // SPDX-License-Identifier: GPL-3.0-or-later
  2. #include "aclk_tx_msgs.h"
  3. #include "daemon/common.h"
  4. #include "aclk_util.h"
  5. #include "aclk_stats.h"
  6. #include "aclk.h"
  7. #ifndef __GNUC__
  8. #pragma region aclk_tx_msgs helper functions
  9. #endif
  10. // version for aclk legacy (old cloud arch)
  11. #define ACLK_VERSION 2
  12. static void aclk_send_message_subtopic(mqtt_wss_client client, json_object *msg, enum aclk_topics subtopic)
  13. {
  14. uint16_t packet_id;
  15. const char *str = json_object_to_json_string_ext(msg, JSON_C_TO_STRING_PLAIN);
  16. const char *topic = aclk_get_topic(subtopic);
  17. if (unlikely(!topic)) {
  18. error("Couldn't get topic. Aborting message send");
  19. return;
  20. }
  21. mqtt_wss_publish_pid(client, topic, str, strlen(str), MQTT_WSS_PUB_QOS1, &packet_id);
  22. #ifdef NETDATA_INTERNAL_CHECKS
  23. aclk_stats_msg_published(packet_id);
  24. #endif
  25. #ifdef ACLK_LOG_CONVERSATION_DIR
  26. #define FN_MAX_LEN 1024
  27. char filename[FN_MAX_LEN];
  28. snprintf(filename, FN_MAX_LEN, ACLK_LOG_CONVERSATION_DIR "/%010d-tx.json", ACLK_GET_CONV_LOG_NEXT());
  29. json_object_to_file_ext(filename, msg, JSON_C_TO_STRING_PRETTY);
  30. #endif
  31. }
  32. uint16_t aclk_send_bin_message_subtopic_pid(mqtt_wss_client client, char *msg, size_t msg_len, enum aclk_topics subtopic, const char *msgname)
  33. {
  34. #ifndef ACLK_LOG_CONVERSATION_DIR
  35. UNUSED(msgname);
  36. #endif
  37. uint16_t packet_id;
  38. const char *topic = aclk_get_topic(subtopic);
  39. if (unlikely(!topic)) {
  40. error("Couldn't get topic. Aborting message send.");
  41. return 0;
  42. }
  43. if (use_mqtt_5)
  44. mqtt_wss_publish5(client, (char*)topic, NULL, msg, &freez, msg_len, MQTT_WSS_PUB_QOS1, &packet_id);
  45. else
  46. mqtt_wss_publish_pid(client, topic, msg, msg_len, MQTT_WSS_PUB_QOS1, &packet_id);
  47. #ifdef NETDATA_INTERNAL_CHECKS
  48. aclk_stats_msg_published(packet_id);
  49. #endif
  50. #ifdef ACLK_LOG_CONVERSATION_DIR
  51. #define FN_MAX_LEN 1024
  52. char filename[FN_MAX_LEN];
  53. snprintf(filename, FN_MAX_LEN, ACLK_LOG_CONVERSATION_DIR "/%010d-tx-%s.bin", ACLK_GET_CONV_LOG_NEXT(), msgname);
  54. FILE *fptr;
  55. if (fptr = fopen(filename,"w")) {
  56. fwrite(msg, msg_len, 1, fptr);
  57. fclose(fptr);
  58. }
  59. #endif
  60. return packet_id;
  61. }
  62. static uint16_t aclk_send_message_subtopic_pid(mqtt_wss_client client, json_object *msg, enum aclk_topics subtopic)
  63. {
  64. uint16_t packet_id;
  65. const char *str = json_object_to_json_string_ext(msg, JSON_C_TO_STRING_PLAIN);
  66. const char *topic = aclk_get_topic(subtopic);
  67. if (unlikely(!topic)) {
  68. error("Couldn't get topic. Aborting message send");
  69. return 0;
  70. }
  71. mqtt_wss_publish_pid(client, topic, str, strlen(str), MQTT_WSS_PUB_QOS1, &packet_id);
  72. #ifdef NETDATA_INTERNAL_CHECKS
  73. aclk_stats_msg_published(packet_id);
  74. #endif
  75. #ifdef ACLK_LOG_CONVERSATION_DIR
  76. #define FN_MAX_LEN 1024
  77. char filename[FN_MAX_LEN];
  78. snprintf(filename, FN_MAX_LEN, ACLK_LOG_CONVERSATION_DIR "/%010d-tx.json", ACLK_GET_CONV_LOG_NEXT());
  79. json_object_to_file_ext(filename, msg, JSON_C_TO_STRING_PRETTY);
  80. #endif
  81. return packet_id;
  82. }
  83. /* UNUSED now but can be used soon MVP1?
  84. static void aclk_send_message_topic(mqtt_wss_client client, json_object *msg, const char *topic)
  85. {
  86. if (unlikely(!topic || topic[0] != '/')) {
  87. error ("Full topic required!");
  88. return;
  89. }
  90. const char *str = json_object_to_json_string_ext(msg, JSON_C_TO_STRING_PLAIN);
  91. mqtt_wss_publish(client, topic, str, strlen(str), MQTT_WSS_PUB_QOS1);
  92. #ifdef NETDATA_INTERNAL_CHECKS
  93. aclk_stats_msg_published();
  94. #endif
  95. #ifdef ACLK_LOG_CONVERSATION_DIR
  96. #define FN_MAX_LEN 1024
  97. char filename[FN_MAX_LEN];
  98. snprintf(filename, FN_MAX_LEN, ACLK_LOG_CONVERSATION_DIR "/%010d-tx.json", ACLK_GET_CONV_LOG_NEXT());
  99. json_object_to_file_ext(filename, msg, JSON_C_TO_STRING_PRETTY);
  100. #endif
  101. }
  102. */
  103. #define TOPIC_MAX_LEN 512
  104. #define V2_BIN_PAYLOAD_SEPARATOR "\x0D\x0A\x0D\x0A"
  105. static int aclk_send_message_with_bin_payload(mqtt_wss_client client, json_object *msg, const char *topic, const void *payload, size_t payload_len)
  106. {
  107. uint16_t packet_id;
  108. const char *str;
  109. char *full_msg = NULL;
  110. int len, rc;
  111. if (unlikely(!topic || topic[0] != '/')) {
  112. error ("Full topic required!");
  113. return HTTP_RESP_INTERNAL_SERVER_ERROR;
  114. }
  115. str = json_object_to_json_string_ext(msg, JSON_C_TO_STRING_PLAIN);
  116. len = strlen(str);
  117. if (payload_len) {
  118. full_msg = mallocz(len + strlen(V2_BIN_PAYLOAD_SEPARATOR) + payload_len);
  119. memcpy(full_msg, str, len);
  120. memcpy(&full_msg[len], V2_BIN_PAYLOAD_SEPARATOR, strlen(V2_BIN_PAYLOAD_SEPARATOR));
  121. len += strlen(V2_BIN_PAYLOAD_SEPARATOR);
  122. memcpy(&full_msg[len], payload, payload_len);
  123. len += payload_len;
  124. }
  125. /* TODO
  126. #ifdef ACLK_LOG_CONVERSATION_DIR
  127. #define FN_MAX_LEN 1024
  128. char filename[FN_MAX_LEN];
  129. snprintf(filename, FN_MAX_LEN, ACLK_LOG_CONVERSATION_DIR "/%010d-tx.json", ACLK_GET_CONV_LOG_NEXT());
  130. json_object_to_file_ext(filename, msg, JSON_C_TO_STRING_PRETTY);
  131. #endif */
  132. if (use_mqtt_5)
  133. mqtt_wss_publish5(client, (char*)topic, NULL, (char*)(payload_len ? full_msg : str), NULL, len, MQTT_WSS_PUB_QOS1, &packet_id);
  134. else {
  135. rc = mqtt_wss_publish_pid_block(client, topic, payload_len ? full_msg : str, len, MQTT_WSS_PUB_QOS1, &packet_id, 5000);
  136. if (rc == MQTT_WSS_ERR_BLOCK_TIMEOUT) {
  137. error("Timeout sending binpacked message");
  138. freez(full_msg);
  139. return HTTP_RESP_BACKEND_FETCH_FAILED;
  140. }
  141. if (rc == MQTT_WSS_ERR_TX_BUF_TOO_SMALL) {
  142. error("Message is bigger than allowed maximum");
  143. freez(full_msg);
  144. return HTTP_RESP_FORBIDDEN;
  145. }
  146. }
  147. #ifdef NETDATA_INTERNAL_CHECKS
  148. aclk_stats_msg_published(packet_id);
  149. #endif
  150. freez(full_msg);
  151. return 0;
  152. }
  153. /*
  154. * Creates universal header common for all ACLK messages. User gets ownership of json object created.
  155. * Usually this is freed by send function after message has been sent.
  156. */
  157. static struct json_object *create_hdr(const char *type, const char *msg_id, time_t ts_secs, usec_t ts_us, int version)
  158. {
  159. uuid_t uuid;
  160. char uuid_str[36 + 1];
  161. json_object *tmp;
  162. json_object *obj = json_object_new_object();
  163. tmp = json_object_new_string(type);
  164. json_object_object_add(obj, "type", tmp);
  165. if (unlikely(!msg_id)) {
  166. uuid_generate(uuid);
  167. uuid_unparse(uuid, uuid_str);
  168. msg_id = uuid_str;
  169. }
  170. if (ts_secs == 0) {
  171. ts_us = now_realtime_usec();
  172. ts_secs = ts_us / USEC_PER_SEC;
  173. ts_us = ts_us % USEC_PER_SEC;
  174. }
  175. tmp = json_object_new_string(msg_id);
  176. json_object_object_add(obj, "msg-id", tmp);
  177. tmp = json_object_new_int64(ts_secs);
  178. json_object_object_add(obj, "timestamp", tmp);
  179. // TODO handle this somehow on older json-c
  180. // tmp = json_object_new_uint64(ts_us);
  181. // probably jso->_to_json_string -> custom function
  182. // jso->o.c_uint64 -> map this with pointer to signed int
  183. // commit that implements json_object_new_uint64 is 3c3b592
  184. // between 0.14 and 0.15
  185. tmp = json_object_new_int64(ts_us);
  186. json_object_object_add(obj, "timestamp-offset-usec", tmp);
  187. tmp = json_object_new_int64(aclk_session_sec);
  188. json_object_object_add(obj, "connect", tmp);
  189. // TODO handle this somehow see above
  190. // tmp = json_object_new_uint64(0 /* TODO aclk_session_us */);
  191. tmp = json_object_new_int64(aclk_session_us);
  192. json_object_object_add(obj, "connect-offset-usec", tmp);
  193. tmp = json_object_new_int(version);
  194. json_object_object_add(obj, "version", tmp);
  195. return obj;
  196. }
  197. static char *create_uuid()
  198. {
  199. uuid_t uuid;
  200. char *uuid_str = mallocz(36 + 1);
  201. uuid_generate(uuid);
  202. uuid_unparse(uuid, uuid_str);
  203. return uuid_str;
  204. }
  205. #ifndef __GNUC__
  206. #pragma endregion
  207. #endif
  208. #ifndef __GNUC__
  209. #pragma region aclk_tx_msgs message generators
  210. #endif
  211. /*
  212. * This will send the /api/v1/info
  213. */
  214. #define BUFFER_INITIAL_SIZE (1024 * 16)
  215. void aclk_send_info_metadata(mqtt_wss_client client, int metadata_submitted, RRDHOST *host)
  216. {
  217. BUFFER *local_buffer = buffer_create(BUFFER_INITIAL_SIZE);
  218. json_object *msg, *payload, *tmp;
  219. char *msg_id = create_uuid();
  220. buffer_flush(local_buffer);
  221. local_buffer->contenttype = CT_APPLICATION_JSON;
  222. // on_connect messages are sent on a health reload, if the on_connect message is real then we
  223. // use the session time as the fake timestamp to indicate that it starts the session. If it is
  224. // a fake on_connect message then use the real timestamp to indicate it is within the existing
  225. // session.
  226. if (metadata_submitted)
  227. msg = create_hdr("update", msg_id, 0, 0, ACLK_VERSION);
  228. else
  229. msg = create_hdr("connect", msg_id, aclk_session_sec, aclk_session_us, ACLK_VERSION);
  230. payload = json_object_new_object();
  231. json_object_object_add(msg, "payload", payload);
  232. web_client_api_request_v1_info_fill_buffer(host, local_buffer);
  233. tmp = json_tokener_parse(local_buffer->buffer);
  234. json_object_object_add(payload, "info", tmp);
  235. buffer_flush(local_buffer);
  236. charts2json(host, local_buffer, 1, 0);
  237. tmp = json_tokener_parse(local_buffer->buffer);
  238. json_object_object_add(payload, "charts", tmp);
  239. aclk_send_message_subtopic(client, msg, ACLK_TOPICID_METADATA);
  240. json_object_put(msg);
  241. freez(msg_id);
  242. buffer_free(local_buffer);
  243. }
  244. // TODO should include header instead
  245. void health_active_log_alarms_2json(RRDHOST *host, BUFFER *wb);
  246. void aclk_send_alarm_metadata(mqtt_wss_client client, int metadata_submitted)
  247. {
  248. BUFFER *local_buffer = buffer_create(BUFFER_INITIAL_SIZE);
  249. json_object *msg, *payload, *tmp;
  250. char *msg_id = create_uuid();
  251. buffer_flush(local_buffer);
  252. local_buffer->contenttype = CT_APPLICATION_JSON;
  253. // on_connect messages are sent on a health reload, if the on_connect message is real then we
  254. // use the session time as the fake timestamp to indicate that it starts the session. If it is
  255. // a fake on_connect message then use the real timestamp to indicate it is within the existing
  256. // session.
  257. if (metadata_submitted)
  258. msg = create_hdr("connect_alarms", msg_id, 0, 0, ACLK_VERSION);
  259. else
  260. msg = create_hdr("connect_alarms", msg_id, aclk_session_sec, aclk_session_us, ACLK_VERSION);
  261. payload = json_object_new_object();
  262. json_object_object_add(msg, "payload", payload);
  263. health_alarms2json(localhost, local_buffer, 1);
  264. tmp = json_tokener_parse(local_buffer->buffer);
  265. json_object_object_add(payload, "configured-alarms", tmp);
  266. buffer_flush(local_buffer);
  267. health_active_log_alarms_2json(localhost, local_buffer);
  268. tmp = json_tokener_parse(local_buffer->buffer);
  269. json_object_object_add(payload, "alarms-active", tmp);
  270. aclk_send_message_subtopic(client, msg, ACLK_TOPICID_ALARMS);
  271. json_object_put(msg);
  272. freez(msg_id);
  273. buffer_free(local_buffer);
  274. }
  275. void aclk_http_msg_v2_err(mqtt_wss_client client, const char *topic, const char *msg_id, int http_code, int ec, const char* emsg, const char *payload, size_t payload_len)
  276. {
  277. json_object *tmp, *msg;
  278. msg = create_hdr("http", msg_id, 0, 0, 2);
  279. tmp = json_object_new_int(http_code);
  280. json_object_object_add(msg, "http-code", tmp);
  281. tmp = json_object_new_int(ec);
  282. json_object_object_add(msg, "error-code", tmp);
  283. tmp = json_object_new_string(emsg);
  284. json_object_object_add(msg, "error-description", tmp);
  285. if (aclk_send_message_with_bin_payload(client, msg, topic, payload, payload_len)) {
  286. error("Failed to send cancelation message for http reply");
  287. }
  288. json_object_put(msg);
  289. }
  290. void aclk_http_msg_v2(mqtt_wss_client client, const char *topic, const char *msg_id, usec_t t_exec, usec_t created, int http_code, const char *payload, size_t payload_len)
  291. {
  292. json_object *tmp, *msg;
  293. msg = create_hdr("http", msg_id, 0, 0, 2);
  294. tmp = json_object_new_int64(t_exec);
  295. json_object_object_add(msg, "t-exec", tmp);
  296. tmp = json_object_new_int64(created);
  297. json_object_object_add(msg, "t-rx", tmp);
  298. tmp = json_object_new_int(http_code);
  299. json_object_object_add(msg, "http-code", tmp);
  300. int rc = aclk_send_message_with_bin_payload(client, msg, topic, payload, payload_len);
  301. json_object_put(msg);
  302. switch (rc) {
  303. case HTTP_RESP_FORBIDDEN:
  304. aclk_http_msg_v2_err(client, topic, msg_id, rc, CLOUD_EC_REQ_REPLY_TOO_BIG, CLOUD_EMSG_REQ_REPLY_TOO_BIG, payload, payload_len);
  305. break;
  306. case HTTP_RESP_INTERNAL_SERVER_ERROR:
  307. aclk_http_msg_v2_err(client, topic, msg_id, rc, CLOUD_EC_FAIL_TOPIC, CLOUD_EMSG_FAIL_TOPIC, payload, payload_len);
  308. break;
  309. case HTTP_RESP_BACKEND_FETCH_FAILED:
  310. aclk_http_msg_v2_err(client, topic, msg_id, rc, CLOUD_EC_SND_TIMEOUT, CLOUD_EMSG_SND_TIMEOUT, payload, payload_len);
  311. break;
  312. }
  313. }
  314. void aclk_chart_msg(mqtt_wss_client client, RRDHOST *host, const char *chart)
  315. {
  316. json_object *msg, *payload;
  317. BUFFER *tmp_buffer;
  318. RRDSET *st;
  319. st = rrdset_find(host, chart);
  320. if (!st)
  321. st = rrdset_find_byname(host, chart);
  322. if (!st) {
  323. info("FAILED to find chart %s", chart);
  324. return;
  325. }
  326. tmp_buffer = buffer_create(BUFFER_INITIAL_SIZE);
  327. rrdset2json(st, tmp_buffer, NULL, NULL, 1);
  328. payload = json_tokener_parse(tmp_buffer->buffer);
  329. if (!payload) {
  330. error("Failed to parse JSON from rrdset2json");
  331. buffer_free(tmp_buffer);
  332. return;
  333. }
  334. msg = create_hdr("chart", NULL, 0, 0, ACLK_VERSION);
  335. json_object_object_add(msg, "payload", payload);
  336. aclk_send_message_subtopic(client, msg, ACLK_TOPICID_CHART);
  337. buffer_free(tmp_buffer);
  338. json_object_put(msg);
  339. }
  340. void aclk_alarm_state_msg(mqtt_wss_client client, json_object *msg)
  341. {
  342. // we create header here on purpose (and not send message with it already as `msg` param)
  343. // timestamps etc. which in ACLK legacy would be wrong (because ACLK legacy
  344. // send message with timestamps already to Query Queue they would be incorrect at time
  345. // when query queue would get to send them)
  346. json_object *obj = create_hdr("status-change", NULL, 0, 0, ACLK_VERSION);
  347. json_object_object_add(obj, "payload", msg);
  348. aclk_send_message_subtopic(client, obj, ACLK_TOPICID_ALARMS);
  349. json_object_put(obj);
  350. }
  351. /*
  352. * Will generate disconnect message.
  353. * @param message if NULL it will generate LWT message (unexpected).
  354. * Otherwise string pointed to by this parameter will be used as
  355. * reason.
  356. */
  357. json_object *aclk_generate_disconnect(const char *message)
  358. {
  359. json_object *tmp, *msg;
  360. msg = create_hdr("disconnect", NULL, 0, 0, 2);
  361. tmp = json_object_new_string(message ? message : "unexpected");
  362. json_object_object_add(msg, "payload", tmp);
  363. return msg;
  364. }
  365. int aclk_send_app_layer_disconnect(mqtt_wss_client client, const char *message)
  366. {
  367. int pid;
  368. json_object *msg = aclk_generate_disconnect(message);
  369. pid = aclk_send_message_subtopic_pid(client, msg, ACLK_TOPICID_METADATA);
  370. json_object_put(msg);
  371. return pid;
  372. }
  373. #ifdef ENABLE_NEW_CLOUD_PROTOCOL
  374. // new protobuf msgs
  375. uint16_t aclk_send_agent_connection_update(mqtt_wss_client client, int reachable) {
  376. size_t len;
  377. uint16_t pid;
  378. struct capability agent_capabilities[] = {
  379. { .name = "json", .version = 2, .enabled = 0 },
  380. { .name = "proto", .version = 1, .enabled = 1 },
  381. #ifdef ENABLE_ML
  382. { .name = "ml", .version = 1, .enabled = ml_enabled(localhost) },
  383. #endif
  384. { .name = "mc", .version = enable_metric_correlations ? metric_correlations_version : 0, .enabled = enable_metric_correlations },
  385. { .name = NULL, .version = 0, .enabled = 0 }
  386. };
  387. update_agent_connection_t conn = {
  388. .reachable = (reachable ? 1 : 0),
  389. .lwt = 0,
  390. .session_id = aclk_session_newarch,
  391. .capabilities = agent_capabilities
  392. };
  393. rrdhost_aclk_state_lock(localhost);
  394. if (unlikely(!localhost->aclk_state.claimed_id)) {
  395. error("Internal error. Should not come here if not claimed");
  396. rrdhost_aclk_state_unlock(localhost);
  397. return 0;
  398. }
  399. if (localhost->aclk_state.prev_claimed_id)
  400. conn.claim_id = localhost->aclk_state.prev_claimed_id;
  401. else
  402. conn.claim_id = localhost->aclk_state.claimed_id;
  403. char *msg = generate_update_agent_connection(&len, &conn);
  404. rrdhost_aclk_state_unlock(localhost);
  405. if (!msg) {
  406. error("Error generating agent::v1::UpdateAgentConnection payload");
  407. return 0;
  408. }
  409. pid = aclk_send_bin_message_subtopic_pid(client, msg, len, ACLK_TOPICID_AGENT_CONN, "UpdateAgentConnection");
  410. if (!use_mqtt_5)
  411. freez(msg);
  412. if (localhost->aclk_state.prev_claimed_id) {
  413. freez(localhost->aclk_state.prev_claimed_id);
  414. localhost->aclk_state.prev_claimed_id = NULL;
  415. }
  416. return pid;
  417. }
  418. char *aclk_generate_lwt(size_t *size) {
  419. update_agent_connection_t conn = {
  420. .reachable = 0,
  421. .lwt = 1,
  422. .session_id = aclk_session_newarch,
  423. .capabilities = NULL
  424. };
  425. rrdhost_aclk_state_lock(localhost);
  426. if (unlikely(!localhost->aclk_state.claimed_id)) {
  427. error("Internal error. Should not come here if not claimed");
  428. rrdhost_aclk_state_unlock(localhost);
  429. return NULL;
  430. }
  431. conn.claim_id = localhost->aclk_state.claimed_id;
  432. char *msg = generate_update_agent_connection(size, &conn);
  433. rrdhost_aclk_state_unlock(localhost);
  434. if (!msg)
  435. error("Error generating agent::v1::UpdateAgentConnection payload for LWT");
  436. return msg;
  437. }
  438. #endif /* ENABLE_NEW_CLOUD_PROTOCOL */
  439. #ifndef __GNUC__
  440. #pragma endregion
  441. #endif