aclk_tx_msgs.c 17 KB


  1. // SPDX-License-Identifier: GPL-3.0-or-later
  2. #include "aclk_tx_msgs.h"
  3. #include "daemon/common.h"
  4. #include "aclk_util.h"
  5. #include "aclk_stats.h"
  6. #include "aclk.h"
  7. #ifndef __GNUC__
  8. #pragma region aclk_tx_msgs helper functions
  9. #endif
  10. // version for aclk legacy (old cloud arch)
  11. #define ACLK_VERSION 2
  12. static void aclk_send_message_subtopic(mqtt_wss_client client, json_object *msg, enum aclk_topics subtopic)
  13. {
  14. uint16_t packet_id;
  15. const char *str = json_object_to_json_string_ext(msg, JSON_C_TO_STRING_PLAIN);
  16. const char *topic = aclk_get_topic(subtopic);
  17. if (unlikely(!topic)) {
  18. error("Couldn't get topic. Aborting message send");
  19. return;
  20. }
  21. mqtt_wss_publish_pid(client, topic, str, strlen(str), MQTT_WSS_PUB_QOS1, &packet_id);
  22. #ifdef NETDATA_INTERNAL_CHECKS
  23. aclk_stats_msg_published(packet_id);
  24. #endif
  25. #ifdef ACLK_LOG_CONVERSATION_DIR
  26. #define FN_MAX_LEN 1024
  27. char filename[FN_MAX_LEN];
  28. snprintf(filename, FN_MAX_LEN, ACLK_LOG_CONVERSATION_DIR "/%010d-tx.json", ACLK_GET_CONV_LOG_NEXT());
  29. json_object_to_file_ext(filename, msg, JSON_C_TO_STRING_PRETTY);
  30. #endif
  31. }
  32. uint16_t aclk_send_bin_message_subtopic_pid(mqtt_wss_client client, char *msg, size_t msg_len, enum aclk_topics subtopic, const char *msgname)
  33. {
  34. #ifndef ACLK_LOG_CONVERSATION_DIR
  35. UNUSED(msgname);
  36. #endif
  37. uint16_t packet_id;
  38. const char *topic = aclk_get_topic(subtopic);
  39. if (unlikely(!topic)) {
  40. error("Couldn't get topic. Aborting message send.");
  41. return 0;
  42. }
  43. mqtt_wss_publish_pid(client, topic, msg, msg_len, MQTT_WSS_PUB_QOS1, &packet_id);
  44. #ifdef NETDATA_INTERNAL_CHECKS
  45. aclk_stats_msg_published(packet_id);
  46. #endif
  47. #ifdef ACLK_LOG_CONVERSATION_DIR
  48. #define FN_MAX_LEN 1024
  49. char filename[FN_MAX_LEN];
  50. snprintf(filename, FN_MAX_LEN, ACLK_LOG_CONVERSATION_DIR "/%010d-tx-%s.bin", ACLK_GET_CONV_LOG_NEXT(), msgname);
  51. FILE *fptr;
  52. if (fptr = fopen(filename,"w")) {
  53. fwrite(msg, msg_len, 1, fptr);
  54. fclose(fptr);
  55. }
  56. #endif
  57. return packet_id;
  58. }
  59. static uint16_t aclk_send_message_subtopic_pid(mqtt_wss_client client, json_object *msg, enum aclk_topics subtopic)
  60. {
  61. uint16_t packet_id;
  62. const char *str = json_object_to_json_string_ext(msg, JSON_C_TO_STRING_PLAIN);
  63. const char *topic = aclk_get_topic(subtopic);
  64. if (unlikely(!topic)) {
  65. error("Couldn't get topic. Aborting message send");
  66. return 0;
  67. }
  68. mqtt_wss_publish_pid(client, topic, str, strlen(str), MQTT_WSS_PUB_QOS1, &packet_id);
  69. #ifdef NETDATA_INTERNAL_CHECKS
  70. aclk_stats_msg_published(packet_id);
  71. #endif
  72. #ifdef ACLK_LOG_CONVERSATION_DIR
  73. #define FN_MAX_LEN 1024
  74. char filename[FN_MAX_LEN];
  75. snprintf(filename, FN_MAX_LEN, ACLK_LOG_CONVERSATION_DIR "/%010d-tx.json", ACLK_GET_CONV_LOG_NEXT());
  76. json_object_to_file_ext(filename, msg, JSON_C_TO_STRING_PRETTY);
  77. #endif
  78. return packet_id;
  79. }
  80. /* UNUSED now but can be used soon MVP1?
  81. static void aclk_send_message_topic(mqtt_wss_client client, json_object *msg, const char *topic)
  82. {
  83. if (unlikely(!topic || topic[0] != '/')) {
  84. error ("Full topic required!");
  85. return;
  86. }
  87. const char *str = json_object_to_json_string_ext(msg, JSON_C_TO_STRING_PLAIN);
  88. mqtt_wss_publish(client, topic, str, strlen(str), MQTT_WSS_PUB_QOS1);
  89. #ifdef NETDATA_INTERNAL_CHECKS
  90. aclk_stats_msg_published();
  91. #endif
  92. #ifdef ACLK_LOG_CONVERSATION_DIR
  93. #define FN_MAX_LEN 1024
  94. char filename[FN_MAX_LEN];
  95. snprintf(filename, FN_MAX_LEN, ACLK_LOG_CONVERSATION_DIR "/%010d-tx.json", ACLK_GET_CONV_LOG_NEXT());
  96. json_object_to_file_ext(filename, msg, JSON_C_TO_STRING_PRETTY);
  97. #endif
  98. }
  99. */
  100. #define TOPIC_MAX_LEN 512
  101. #define V2_BIN_PAYLOAD_SEPARATOR "\x0D\x0A\x0D\x0A"
  102. static int aclk_send_message_with_bin_payload(mqtt_wss_client client, json_object *msg, const char *topic, const void *payload, size_t payload_len)
  103. {
  104. uint16_t packet_id;
  105. const char *str;
  106. char *full_msg = NULL;
  107. int len, rc;
  108. if (unlikely(!topic || topic[0] != '/')) {
  109. error ("Full topic required!");
  110. return 500;
  111. }
  112. str = json_object_to_json_string_ext(msg, JSON_C_TO_STRING_PLAIN);
  113. len = strlen(str);
  114. if (payload_len) {
  115. full_msg = mallocz(len + strlen(V2_BIN_PAYLOAD_SEPARATOR) + payload_len);
  116. memcpy(full_msg, str, len);
  117. memcpy(&full_msg[len], V2_BIN_PAYLOAD_SEPARATOR, strlen(V2_BIN_PAYLOAD_SEPARATOR));
  118. len += strlen(V2_BIN_PAYLOAD_SEPARATOR);
  119. memcpy(&full_msg[len], payload, payload_len);
  120. len += payload_len;
  121. }
  122. /* TODO
  123. #ifdef ACLK_LOG_CONVERSATION_DIR
  124. #define FN_MAX_LEN 1024
  125. char filename[FN_MAX_LEN];
  126. snprintf(filename, FN_MAX_LEN, ACLK_LOG_CONVERSATION_DIR "/%010d-tx.json", ACLK_GET_CONV_LOG_NEXT());
  127. json_object_to_file_ext(filename, msg, JSON_C_TO_STRING_PRETTY);
  128. #endif */
  129. rc = mqtt_wss_publish_pid_block(client, topic, payload_len ? full_msg : str, len, MQTT_WSS_PUB_QOS1, &packet_id, 5000);
  130. if (rc == MQTT_WSS_ERR_BLOCK_TIMEOUT) {
  131. error("Timeout sending binpacked message");
  132. freez(full_msg);
  133. return 503;
  134. }
  135. if (rc == MQTT_WSS_ERR_TX_BUF_TOO_SMALL) {
  136. error("Message is bigger than allowed maximum");
  137. freez(full_msg);
  138. return 403;
  139. }
  140. #ifdef NETDATA_INTERNAL_CHECKS
  141. aclk_stats_msg_published(packet_id);
  142. #endif
  143. freez(full_msg);
  144. return 0;
  145. }
  146. /*
  147. * Creates universal header common for all ACLK messages. User gets ownership of json object created.
  148. * Usually this is freed by send function after message has been sent.
  149. */
  150. static struct json_object *create_hdr(const char *type, const char *msg_id, time_t ts_secs, usec_t ts_us, int version)
  151. {
  152. uuid_t uuid;
  153. char uuid_str[36 + 1];
  154. json_object *tmp;
  155. json_object *obj = json_object_new_object();
  156. tmp = json_object_new_string(type);
  157. json_object_object_add(obj, "type", tmp);
  158. if (unlikely(!msg_id)) {
  159. uuid_generate(uuid);
  160. uuid_unparse(uuid, uuid_str);
  161. msg_id = uuid_str;
  162. }
  163. if (ts_secs == 0) {
  164. ts_us = now_realtime_usec();
  165. ts_secs = ts_us / USEC_PER_SEC;
  166. ts_us = ts_us % USEC_PER_SEC;
  167. }
  168. tmp = json_object_new_string(msg_id);
  169. json_object_object_add(obj, "msg-id", tmp);
  170. tmp = json_object_new_int64(ts_secs);
  171. json_object_object_add(obj, "timestamp", tmp);
  172. // TODO handle this somehow on older json-c
  173. // tmp = json_object_new_uint64(ts_us);
  174. // probably jso->_to_json_string -> custom function
  175. // jso->o.c_uint64 -> map this with pointer to signed int
  176. // commit that implements json_object_new_uint64 is 3c3b592
  177. // between 0.14 and 0.15
  178. tmp = json_object_new_int64(ts_us);
  179. json_object_object_add(obj, "timestamp-offset-usec", tmp);
  180. tmp = json_object_new_int64(aclk_session_sec);
  181. json_object_object_add(obj, "connect", tmp);
  182. // TODO handle this somehow see above
  183. // tmp = json_object_new_uint64(0 /* TODO aclk_session_us */);
  184. tmp = json_object_new_int64(aclk_session_us);
  185. json_object_object_add(obj, "connect-offset-usec", tmp);
  186. tmp = json_object_new_int(version);
  187. json_object_object_add(obj, "version", tmp);
  188. return obj;
  189. }
  190. static char *create_uuid()
  191. {
  192. uuid_t uuid;
  193. char *uuid_str = mallocz(36 + 1);
  194. uuid_generate(uuid);
  195. uuid_unparse(uuid, uuid_str);
  196. return uuid_str;
  197. }
  198. #ifndef __GNUC__
  199. #pragma endregion
  200. #endif
  201. #ifndef __GNUC__
  202. #pragma region aclk_tx_msgs message generators
  203. #endif
  204. /*
  205. * This will send the /api/v1/info
  206. */
  207. #define BUFFER_INITIAL_SIZE (1024 * 16)
  208. void aclk_send_info_metadata(mqtt_wss_client client, int metadata_submitted, RRDHOST *host)
  209. {
  210. BUFFER *local_buffer = buffer_create(BUFFER_INITIAL_SIZE);
  211. json_object *msg, *payload, *tmp;
  212. char *msg_id = create_uuid();
  213. buffer_flush(local_buffer);
  214. local_buffer->contenttype = CT_APPLICATION_JSON;
  215. // on_connect messages are sent on a health reload, if the on_connect message is real then we
  216. // use the session time as the fake timestamp to indicate that it starts the session. If it is
  217. // a fake on_connect message then use the real timestamp to indicate it is within the existing
  218. // session.
  219. if (metadata_submitted)
  220. msg = create_hdr("update", msg_id, 0, 0, ACLK_VERSION);
  221. else
  222. msg = create_hdr("connect", msg_id, aclk_session_sec, aclk_session_us, ACLK_VERSION);
  223. payload = json_object_new_object();
  224. json_object_object_add(msg, "payload", payload);
  225. web_client_api_request_v1_info_fill_buffer(host, local_buffer);
  226. tmp = json_tokener_parse(local_buffer->buffer);
  227. json_object_object_add(payload, "info", tmp);
  228. buffer_flush(local_buffer);
  229. charts2json(host, local_buffer, 1, 0);
  230. tmp = json_tokener_parse(local_buffer->buffer);
  231. json_object_object_add(payload, "charts", tmp);
  232. aclk_send_message_subtopic(client, msg, ACLK_TOPICID_METADATA);
  233. json_object_put(msg);
  234. freez(msg_id);
  235. buffer_free(local_buffer);
  236. }
  237. // TODO should include header instead
  238. void health_active_log_alarms_2json(RRDHOST *host, BUFFER *wb);
  239. void aclk_send_alarm_metadata(mqtt_wss_client client, int metadata_submitted)
  240. {
  241. BUFFER *local_buffer = buffer_create(BUFFER_INITIAL_SIZE);
  242. json_object *msg, *payload, *tmp;
  243. char *msg_id = create_uuid();
  244. buffer_flush(local_buffer);
  245. local_buffer->contenttype = CT_APPLICATION_JSON;
  246. // on_connect messages are sent on a health reload, if the on_connect message is real then we
  247. // use the session time as the fake timestamp to indicate that it starts the session. If it is
  248. // a fake on_connect message then use the real timestamp to indicate it is within the existing
  249. // session.
  250. if (metadata_submitted)
  251. msg = create_hdr("connect_alarms", msg_id, 0, 0, ACLK_VERSION);
  252. else
  253. msg = create_hdr("connect_alarms", msg_id, aclk_session_sec, aclk_session_us, ACLK_VERSION);
  254. payload = json_object_new_object();
  255. json_object_object_add(msg, "payload", payload);
  256. health_alarms2json(localhost, local_buffer, 1);
  257. tmp = json_tokener_parse(local_buffer->buffer);
  258. json_object_object_add(payload, "configured-alarms", tmp);
  259. buffer_flush(local_buffer);
  260. health_active_log_alarms_2json(localhost, local_buffer);
  261. tmp = json_tokener_parse(local_buffer->buffer);
  262. json_object_object_add(payload, "alarms-active", tmp);
  263. aclk_send_message_subtopic(client, msg, ACLK_TOPICID_ALARMS);
  264. json_object_put(msg);
  265. freez(msg_id);
  266. buffer_free(local_buffer);
  267. }
  268. void aclk_http_msg_v2_err(mqtt_wss_client client, const char *topic, const char *msg_id, int http_code, int ec, const char* emsg, const char *payload, size_t payload_len)
  269. {
  270. json_object *tmp, *msg;
  271. msg = create_hdr("http", msg_id, 0, 0, 2);
  272. tmp = json_object_new_int(http_code);
  273. json_object_object_add(msg, "http-code", tmp);
  274. tmp = json_object_new_int(ec);
  275. json_object_object_add(msg, "error-code", tmp);
  276. tmp = json_object_new_string(emsg);
  277. json_object_object_add(msg, "error-description", tmp);
  278. if (aclk_send_message_with_bin_payload(client, msg, topic, payload, payload_len)) {
  279. error("Failed to send cancelation message for http reply");
  280. }
  281. json_object_put(msg);
  282. }
  283. void aclk_http_msg_v2(mqtt_wss_client client, const char *topic, const char *msg_id, usec_t t_exec, usec_t created, int http_code, const char *payload, size_t payload_len)
  284. {
  285. json_object *tmp, *msg;
  286. msg = create_hdr("http", msg_id, 0, 0, 2);
  287. tmp = json_object_new_int64(t_exec);
  288. json_object_object_add(msg, "t-exec", tmp);
  289. tmp = json_object_new_int64(created);
  290. json_object_object_add(msg, "t-rx", tmp);
  291. tmp = json_object_new_int(http_code);
  292. json_object_object_add(msg, "http-code", tmp);
  293. int rc = aclk_send_message_with_bin_payload(client, msg, topic, payload, payload_len);
  294. json_object_put(msg);
  295. switch (rc) {
  296. case 403:
  297. aclk_http_msg_v2_err(client, topic, msg_id, rc, CLOUD_EC_REQ_REPLY_TOO_BIG, CLOUD_EMSG_REQ_REPLY_TOO_BIG, payload, payload_len);
  298. break;
  299. case 500:
  300. aclk_http_msg_v2_err(client, topic, msg_id, rc, CLOUD_EC_FAIL_TOPIC, CLOUD_EMSG_FAIL_TOPIC, payload, payload_len);
  301. break;
  302. case 503:
  303. aclk_http_msg_v2_err(client, topic, msg_id, rc, CLOUD_EC_SND_TIMEOUT, CLOUD_EMSG_SND_TIMEOUT, payload, payload_len);
  304. break;
  305. }
  306. }
  307. void aclk_chart_msg(mqtt_wss_client client, RRDHOST *host, const char *chart)
  308. {
  309. json_object *msg, *payload;
  310. BUFFER *tmp_buffer;
  311. RRDSET *st;
  312. st = rrdset_find(host, chart);
  313. if (!st)
  314. st = rrdset_find_byname(host, chart);
  315. if (!st) {
  316. info("FAILED to find chart %s", chart);
  317. return;
  318. }
  319. tmp_buffer = buffer_create(BUFFER_INITIAL_SIZE);
  320. rrdset2json(st, tmp_buffer, NULL, NULL, 1);
  321. payload = json_tokener_parse(tmp_buffer->buffer);
  322. if (!payload) {
  323. error("Failed to parse JSON from rrdset2json");
  324. buffer_free(tmp_buffer);
  325. return;
  326. }
  327. msg = create_hdr("chart", NULL, 0, 0, ACLK_VERSION);
  328. json_object_object_add(msg, "payload", payload);
  329. aclk_send_message_subtopic(client, msg, ACLK_TOPICID_CHART);
  330. buffer_free(tmp_buffer);
  331. json_object_put(msg);
  332. }
  333. void aclk_alarm_state_msg(mqtt_wss_client client, json_object *msg)
  334. {
  335. // we create header here on purpose (and not send message with it already as `msg` param)
  336. // timestamps etc. which in ACLK legacy would be wrong (because ACLK legacy
  337. // send message with timestamps already to Query Queue they would be incorrect at time
  338. // when query queue would get to send them)
  339. json_object *obj = create_hdr("status-change", NULL, 0, 0, ACLK_VERSION);
  340. json_object_object_add(obj, "payload", msg);
  341. aclk_send_message_subtopic(client, obj, ACLK_TOPICID_ALARMS);
  342. json_object_put(obj);
  343. }
  344. /*
  345. * Will generate disconnect message.
  346. * @param message if NULL it will generate LWT message (unexpected).
  347. * Otherwise string pointed to by this parameter will be used as
  348. * reason.
  349. */
  350. json_object *aclk_generate_disconnect(const char *message)
  351. {
  352. json_object *tmp, *msg;
  353. msg = create_hdr("disconnect", NULL, 0, 0, 2);
  354. tmp = json_object_new_string(message ? message : "unexpected");
  355. json_object_object_add(msg, "payload", tmp);
  356. return msg;
  357. }
  358. int aclk_send_app_layer_disconnect(mqtt_wss_client client, const char *message)
  359. {
  360. int pid;
  361. json_object *msg = aclk_generate_disconnect(message);
  362. pid = aclk_send_message_subtopic_pid(client, msg, ACLK_TOPICID_METADATA);
  363. json_object_put(msg);
  364. return pid;
  365. }
  366. #ifdef ENABLE_NEW_CLOUD_PROTOCOL
  367. // new protobuf msgs
  368. uint16_t aclk_send_agent_connection_update(mqtt_wss_client client, int reachable) {
  369. size_t len;
  370. uint16_t pid;
  371. update_agent_connection_t conn = {
  372. .reachable = (reachable ? 1 : 0),
  373. .lwt = 0,
  374. .session_id = aclk_session_newarch
  375. };
  376. rrdhost_aclk_state_lock(localhost);
  377. if (unlikely(!localhost->aclk_state.claimed_id)) {
  378. error("Internal error. Should not come here if not claimed");
  379. rrdhost_aclk_state_unlock(localhost);
  380. return 0;
  381. }
  382. if (localhost->aclk_state.prev_claimed_id)
  383. conn.claim_id = localhost->aclk_state.prev_claimed_id;
  384. else
  385. conn.claim_id = localhost->aclk_state.claimed_id;
  386. char *msg = generate_update_agent_connection(&len, &conn);
  387. rrdhost_aclk_state_unlock(localhost);
  388. if (!msg) {
  389. error("Error generating agent::v1::UpdateAgentConnection payload");
  390. return 0;
  391. }
  392. pid = aclk_send_bin_message_subtopic_pid(client, msg, len, ACLK_TOPICID_AGENT_CONN, "UpdateAgentConnection");
  393. freez(msg);
  394. if (localhost->aclk_state.prev_claimed_id) {
  395. freez(localhost->aclk_state.prev_claimed_id);
  396. localhost->aclk_state.prev_claimed_id = NULL;
  397. }
  398. return pid;
  399. }
  400. char *aclk_generate_lwt(size_t *size) {
  401. update_agent_connection_t conn = {
  402. .reachable = 0,
  403. .lwt = 1,
  404. .session_id = aclk_session_newarch
  405. };
  406. rrdhost_aclk_state_lock(localhost);
  407. if (unlikely(!localhost->aclk_state.claimed_id)) {
  408. error("Internal error. Should not come here if not claimed");
  409. rrdhost_aclk_state_unlock(localhost);
  410. return NULL;
  411. }
  412. conn.claim_id = localhost->aclk_state.claimed_id;
  413. char *msg = generate_update_agent_connection(size, &conn);
  414. rrdhost_aclk_state_unlock(localhost);
  415. if (!msg)
  416. error("Error generating agent::v1::UpdateAgentConnection payload for LWT");
  417. return msg;
  418. }
  419. void aclk_generate_node_registration(mqtt_wss_client client, node_instance_creation_t *node_creation) {
  420. size_t len;
  421. char *msg = generate_node_instance_creation(&len, node_creation);
  422. if (!msg) {
  423. error("Error generating nodeinstance::create::v1::CreateNodeInstance");
  424. return;
  425. }
  426. aclk_send_bin_message_subtopic_pid(client, msg, len, ACLK_TOPICID_CREATE_NODE, "CreateNodeInstance");
  427. freez(msg);
  428. }
  429. void aclk_generate_node_state_update(mqtt_wss_client client, node_instance_connection_t *node_connection) {
  430. size_t len;
  431. char *msg = generate_node_instance_connection(&len, node_connection);
  432. if (!msg) {
  433. error("Error generating nodeinstance::v1::UpdateNodeInstanceConnection");
  434. return;
  435. }
  436. aclk_send_bin_message_subtopic_pid(client, msg, len, ACLK_TOPICID_NODE_CONN, "UpdateNodeInstanceConnection");
  437. freez(msg);
  438. }
  439. #endif /* ENABLE_NEW_CLOUD_PROTOCOL */
  440. #ifndef __GNUC__
  441. #pragma endregion
  442. #endif