aclk_tx_msgs.c 10 KB


  1. // SPDX-License-Identifier: GPL-3.0-or-later
  2. #include "aclk_tx_msgs.h"
  3. #include "daemon/common.h"
  4. #include "aclk_util.h"
  5. #include "aclk_stats.h"
  6. #include "aclk.h"
  7. #include "schema-wrappers/proto_2_json.h"
  8. #ifndef __GNUC__
  9. #pragma region aclk_tx_msgs helper functions
  10. #endif
  11. // version for aclk legacy (old cloud arch)
  12. #define ACLK_VERSION 2
  13. uint16_t aclk_send_bin_message_subtopic_pid(mqtt_wss_client client, char *msg, size_t msg_len, enum aclk_topics subtopic, const char *msgname)
  14. {
  15. #ifndef ACLK_LOG_CONVERSATION_DIR
  16. UNUSED(msgname);
  17. #endif
  18. uint16_t packet_id;
  19. const char *topic = aclk_get_topic(subtopic);
  20. if (unlikely(!topic)) {
  21. error("Couldn't get topic. Aborting message send.");
  22. return 0;
  23. }
  24. if (use_mqtt_5)
  25. mqtt_wss_publish5(client, (char*)topic, NULL, msg, &freez, msg_len, MQTT_WSS_PUB_QOS1, &packet_id);
  26. else
  27. mqtt_wss_publish_pid(client, topic, msg, msg_len, MQTT_WSS_PUB_QOS1, &packet_id);
  28. #ifdef NETDATA_INTERNAL_CHECKS
  29. aclk_stats_msg_published(packet_id);
  30. char *json = protomsg_to_json(msg, msg_len, msgname);
  31. log_aclk_message_bin(json, strlen(json), 1, topic, msgname);
  32. freez(json);
  33. #endif
  34. return packet_id;
  35. }
  36. /* UNUSED now but can be used soon MVP1?
  37. static void aclk_send_message_topic(mqtt_wss_client client, json_object *msg, const char *topic)
  38. {
  39. if (unlikely(!topic || topic[0] != '/')) {
  40. error ("Full topic required!");
  41. return;
  42. }
  43. const char *str = json_object_to_json_string_ext(msg, JSON_C_TO_STRING_PLAIN);
  44. mqtt_wss_publish(client, topic, str, strlen(str), MQTT_WSS_PUB_QOS1);
  45. #ifdef NETDATA_INTERNAL_CHECKS
  46. aclk_stats_msg_published();
  47. #endif
  48. #ifdef ACLK_LOG_CONVERSATION_DIR
  49. #define FN_MAX_LEN 1024
  50. char filename[FN_MAX_LEN];
  51. snprintf(filename, FN_MAX_LEN, ACLK_LOG_CONVERSATION_DIR "/%010d-tx.json", ACLK_GET_CONV_LOG_NEXT());
  52. json_object_to_file_ext(filename, msg, JSON_C_TO_STRING_PRETTY);
  53. #endif
  54. }
  55. */
  56. #define TOPIC_MAX_LEN 512
  57. #define V2_BIN_PAYLOAD_SEPARATOR "\x0D\x0A\x0D\x0A"
  58. static int aclk_send_message_with_bin_payload(mqtt_wss_client client, json_object *msg, const char *topic, const void *payload, size_t payload_len)
  59. {
  60. uint16_t packet_id;
  61. const char *str;
  62. char *full_msg = NULL;
  63. int len, rc;
  64. if (unlikely(!topic || topic[0] != '/')) {
  65. error ("Full topic required!");
  66. return HTTP_RESP_INTERNAL_SERVER_ERROR;
  67. }
  68. str = json_object_to_json_string_ext(msg, JSON_C_TO_STRING_PLAIN);
  69. len = strlen(str);
  70. if (payload_len) {
  71. full_msg = mallocz(len + strlen(V2_BIN_PAYLOAD_SEPARATOR) + payload_len);
  72. memcpy(full_msg, str, len);
  73. memcpy(&full_msg[len], V2_BIN_PAYLOAD_SEPARATOR, strlen(V2_BIN_PAYLOAD_SEPARATOR));
  74. len += strlen(V2_BIN_PAYLOAD_SEPARATOR);
  75. memcpy(&full_msg[len], payload, payload_len);
  76. len += payload_len;
  77. }
  78. /* TODO
  79. #ifdef ACLK_LOG_CONVERSATION_DIR
  80. #define FN_MAX_LEN 1024
  81. char filename[FN_MAX_LEN];
  82. snprintf(filename, FN_MAX_LEN, ACLK_LOG_CONVERSATION_DIR "/%010d-tx.json", ACLK_GET_CONV_LOG_NEXT());
  83. json_object_to_file_ext(filename, msg, JSON_C_TO_STRING_PRETTY);
  84. #endif */
  85. if (use_mqtt_5)
  86. mqtt_wss_publish5(client, (char*)topic, NULL, (char*)(payload_len ? full_msg : str), NULL, len, MQTT_WSS_PUB_QOS1, &packet_id);
  87. else {
  88. rc = mqtt_wss_publish_pid_block(client, topic, payload_len ? full_msg : str, len, MQTT_WSS_PUB_QOS1, &packet_id, 5000);
  89. if (rc == MQTT_WSS_ERR_BLOCK_TIMEOUT) {
  90. error("Timeout sending binpacked message");
  91. freez(full_msg);
  92. return HTTP_RESP_BACKEND_FETCH_FAILED;
  93. }
  94. if (rc == MQTT_WSS_ERR_TX_BUF_TOO_SMALL) {
  95. error("Message is bigger than allowed maximum");
  96. freez(full_msg);
  97. return HTTP_RESP_FORBIDDEN;
  98. }
  99. }
  100. #ifdef NETDATA_INTERNAL_CHECKS
  101. aclk_stats_msg_published(packet_id);
  102. #endif
  103. freez(full_msg);
  104. return 0;
  105. }
  106. /*
  107. * Creates universal header common for all ACLK messages. User gets ownership of json object created.
  108. * Usually this is freed by send function after message has been sent.
  109. */
  110. static struct json_object *create_hdr(const char *type, const char *msg_id, time_t ts_secs, usec_t ts_us, int version)
  111. {
  112. uuid_t uuid;
  113. char uuid_str[36 + 1];
  114. json_object *tmp;
  115. json_object *obj = json_object_new_object();
  116. tmp = json_object_new_string(type);
  117. json_object_object_add(obj, "type", tmp);
  118. if (unlikely(!msg_id)) {
  119. uuid_generate(uuid);
  120. uuid_unparse(uuid, uuid_str);
  121. msg_id = uuid_str;
  122. }
  123. if (ts_secs == 0) {
  124. ts_us = now_realtime_usec();
  125. ts_secs = ts_us / USEC_PER_SEC;
  126. ts_us = ts_us % USEC_PER_SEC;
  127. }
  128. tmp = json_object_new_string(msg_id);
  129. json_object_object_add(obj, "msg-id", tmp);
  130. tmp = json_object_new_int64(ts_secs);
  131. json_object_object_add(obj, "timestamp", tmp);
  132. // TODO handle this somehow on older json-c
  133. // tmp = json_object_new_uint64(ts_us);
  134. // probably jso->_to_json_string -> custom function
  135. // jso->o.c_uint64 -> map this with pointer to signed int
  136. // commit that implements json_object_new_uint64 is 3c3b592
  137. // between 0.14 and 0.15
  138. tmp = json_object_new_int64(ts_us);
  139. json_object_object_add(obj, "timestamp-offset-usec", tmp);
  140. tmp = json_object_new_int64(aclk_session_sec);
  141. json_object_object_add(obj, "connect", tmp);
  142. // TODO handle this somehow see above
  143. // tmp = json_object_new_uint64(0 /* TODO aclk_session_us */);
  144. tmp = json_object_new_int64(aclk_session_us);
  145. json_object_object_add(obj, "connect-offset-usec", tmp);
  146. tmp = json_object_new_int(version);
  147. json_object_object_add(obj, "version", tmp);
  148. return obj;
  149. }
  150. #ifndef __GNUC__
  151. #pragma endregion
  152. #endif
  153. #ifndef __GNUC__
  154. #pragma region aclk_tx_msgs message generators
  155. #endif
  156. void aclk_http_msg_v2_err(mqtt_wss_client client, const char *topic, const char *msg_id, int http_code, int ec, const char* emsg, const char *payload, size_t payload_len)
  157. {
  158. json_object *tmp, *msg;
  159. msg = create_hdr("http", msg_id, 0, 0, 2);
  160. tmp = json_object_new_int(http_code);
  161. json_object_object_add(msg, "http-code", tmp);
  162. tmp = json_object_new_int(ec);
  163. json_object_object_add(msg, "error-code", tmp);
  164. tmp = json_object_new_string(emsg);
  165. json_object_object_add(msg, "error-description", tmp);
  166. if (aclk_send_message_with_bin_payload(client, msg, topic, payload, payload_len)) {
  167. error("Failed to send cancelation message for http reply");
  168. }
  169. json_object_put(msg);
  170. }
  171. void aclk_http_msg_v2(mqtt_wss_client client, const char *topic, const char *msg_id, usec_t t_exec, usec_t created, int http_code, const char *payload, size_t payload_len)
  172. {
  173. json_object *tmp, *msg;
  174. msg = create_hdr("http", msg_id, 0, 0, 2);
  175. tmp = json_object_new_int64(t_exec);
  176. json_object_object_add(msg, "t-exec", tmp);
  177. tmp = json_object_new_int64(created);
  178. json_object_object_add(msg, "t-rx", tmp);
  179. tmp = json_object_new_int(http_code);
  180. json_object_object_add(msg, "http-code", tmp);
  181. int rc = aclk_send_message_with_bin_payload(client, msg, topic, payload, payload_len);
  182. json_object_put(msg);
  183. switch (rc) {
  184. case HTTP_RESP_FORBIDDEN:
  185. aclk_http_msg_v2_err(client, topic, msg_id, rc, CLOUD_EC_REQ_REPLY_TOO_BIG, CLOUD_EMSG_REQ_REPLY_TOO_BIG, payload, payload_len);
  186. break;
  187. case HTTP_RESP_INTERNAL_SERVER_ERROR:
  188. aclk_http_msg_v2_err(client, topic, msg_id, rc, CLOUD_EC_FAIL_TOPIC, CLOUD_EMSG_FAIL_TOPIC, payload, payload_len);
  189. break;
  190. case HTTP_RESP_BACKEND_FETCH_FAILED:
  191. aclk_http_msg_v2_err(client, topic, msg_id, rc, CLOUD_EC_SND_TIMEOUT, CLOUD_EMSG_SND_TIMEOUT, payload, payload_len);
  192. break;
  193. }
  194. }
  195. uint16_t aclk_send_agent_connection_update(mqtt_wss_client client, int reachable) {
  196. size_t len;
  197. uint16_t pid;
  198. struct capability agent_capabilities[] = {
  199. { .name = "json", .version = 2, .enabled = 0 },
  200. { .name = "proto", .version = 1, .enabled = 1 },
  201. #ifdef ENABLE_ML
  202. { .name = "ml", .version = 1, .enabled = ml_enabled(localhost) },
  203. #endif
  204. { .name = "mc", .version = enable_metric_correlations ? metric_correlations_version : 0, .enabled = enable_metric_correlations },
  205. { .name = "ctx", .version = 1, .enabled = 1 },
  206. { .name = NULL, .version = 0, .enabled = 0 }
  207. };
  208. update_agent_connection_t conn = {
  209. .reachable = (reachable ? 1 : 0),
  210. .lwt = 0,
  211. .session_id = aclk_session_newarch,
  212. .capabilities = agent_capabilities
  213. };
  214. rrdhost_aclk_state_lock(localhost);
  215. if (unlikely(!localhost->aclk_state.claimed_id)) {
  216. error("Internal error. Should not come here if not claimed");
  217. rrdhost_aclk_state_unlock(localhost);
  218. return 0;
  219. }
  220. if (localhost->aclk_state.prev_claimed_id)
  221. conn.claim_id = localhost->aclk_state.prev_claimed_id;
  222. else
  223. conn.claim_id = localhost->aclk_state.claimed_id;
  224. char *msg = generate_update_agent_connection(&len, &conn);
  225. rrdhost_aclk_state_unlock(localhost);
  226. if (!msg) {
  227. error("Error generating agent::v1::UpdateAgentConnection payload");
  228. return 0;
  229. }
  230. pid = aclk_send_bin_message_subtopic_pid(client, msg, len, ACLK_TOPICID_AGENT_CONN, "UpdateAgentConnection");
  231. if (!use_mqtt_5)
  232. freez(msg);
  233. if (localhost->aclk_state.prev_claimed_id) {
  234. freez(localhost->aclk_state.prev_claimed_id);
  235. localhost->aclk_state.prev_claimed_id = NULL;
  236. }
  237. return pid;
  238. }
  239. char *aclk_generate_lwt(size_t *size) {
  240. update_agent_connection_t conn = {
  241. .reachable = 0,
  242. .lwt = 1,
  243. .session_id = aclk_session_newarch,
  244. .capabilities = NULL
  245. };
  246. rrdhost_aclk_state_lock(localhost);
  247. if (unlikely(!localhost->aclk_state.claimed_id)) {
  248. error("Internal error. Should not come here if not claimed");
  249. rrdhost_aclk_state_unlock(localhost);
  250. return NULL;
  251. }
  252. conn.claim_id = localhost->aclk_state.claimed_id;
  253. char *msg = generate_update_agent_connection(size, &conn);
  254. rrdhost_aclk_state_unlock(localhost);
  255. if (!msg)
  256. error("Error generating agent::v1::UpdateAgentConnection payload for LWT");
  257. return msg;
  258. }
  259. #ifndef __GNUC__
  260. #pragma endregion
  261. #endif