aclk_tx_msgs.c 8.4 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276
  1. // SPDX-License-Identifier: GPL-3.0-or-later
  2. #include "aclk_tx_msgs.h"
  3. #include "daemon/common.h"
  4. #include "aclk_util.h"
  5. #include "aclk_stats.h"
  6. #include "aclk.h"
  7. #include "aclk_capas.h"
  8. #include "schema-wrappers/proto_2_json.h"
  9. #ifndef __GNUC__
  10. #pragma region aclk_tx_msgs helper functions
  11. #endif
  12. // version for aclk legacy (old cloud arch)
  13. #define ACLK_VERSION 2
  14. static void freez_aclk_publish5a(void *ptr) {
  15. freez(ptr);
  16. }
  17. static void freez_aclk_publish5b(void *ptr) {
  18. freez(ptr);
  19. }
  20. uint16_t aclk_send_bin_message_subtopic_pid(mqtt_wss_client client, char *msg, size_t msg_len, enum aclk_topics subtopic, const char *msgname)
  21. {
  22. #ifndef ACLK_LOG_CONVERSATION_DIR
  23. UNUSED(msgname);
  24. #endif
  25. uint16_t packet_id;
  26. const char *topic = aclk_get_topic(subtopic);
  27. if (unlikely(!topic)) {
  28. error("Couldn't get topic. Aborting message send.");
  29. return 0;
  30. }
  31. mqtt_wss_publish5(client, (char*)topic, NULL, msg, &freez_aclk_publish5a, msg_len, MQTT_WSS_PUB_QOS1, &packet_id);
  32. #ifdef NETDATA_INTERNAL_CHECKS
  33. aclk_stats_msg_published(packet_id);
  34. #endif
  35. if (aclklog_enabled) {
  36. char *json = protomsg_to_json(msg, msg_len, msgname);
  37. log_aclk_message_bin(json, strlen(json), 1, topic, msgname);
  38. freez(json);
  39. }
  40. return packet_id;
  41. }
  42. // json_object_put returns int unfortunately :D
  43. // we need void(*fnc)(void *);
  44. static void json_object_put_wrapper(void *jsonobj)
  45. {
  46. json_object_put(jsonobj);
  47. }
  48. #define TOPIC_MAX_LEN 512
  49. #define V2_BIN_PAYLOAD_SEPARATOR "\x0D\x0A\x0D\x0A"
  50. static int aclk_send_message_with_bin_payload(mqtt_wss_client client, json_object *msg, const char *topic, const void *payload, size_t payload_len)
  51. {
  52. uint16_t packet_id;
  53. const char *str;
  54. char *full_msg = NULL;
  55. int len;
  56. if (unlikely(!topic || topic[0] != '/')) {
  57. error ("Full topic required!");
  58. json_object_put(msg);
  59. return HTTP_RESP_INTERNAL_SERVER_ERROR;
  60. }
  61. str = json_object_to_json_string_ext(msg, JSON_C_TO_STRING_PLAIN);
  62. len = strlen(str);
  63. if (payload_len) {
  64. full_msg = mallocz(len + strlen(V2_BIN_PAYLOAD_SEPARATOR) + payload_len);
  65. memcpy(full_msg, str, len);
  66. json_object_put(msg);
  67. msg = NULL;
  68. memcpy(&full_msg[len], V2_BIN_PAYLOAD_SEPARATOR, strlen(V2_BIN_PAYLOAD_SEPARATOR));
  69. len += strlen(V2_BIN_PAYLOAD_SEPARATOR);
  70. memcpy(&full_msg[len], payload, payload_len);
  71. len += payload_len;
  72. }
  73. mqtt_wss_publish5(client, (char*)topic, NULL, (char*)(payload_len ? full_msg : str), (payload_len ? &freez_aclk_publish5b : &json_object_put_wrapper), len, MQTT_WSS_PUB_QOS1, &packet_id);
  74. #ifdef NETDATA_INTERNAL_CHECKS
  75. aclk_stats_msg_published(packet_id);
  76. #endif
  77. return 0;
  78. }
  79. /*
  80. * Creates universal header common for all ACLK messages. User gets ownership of json object created.
  81. * Usually this is freed by send function after message has been sent.
  82. */
  83. static struct json_object *create_hdr(const char *type, const char *msg_id, time_t ts_secs, usec_t ts_us, int version)
  84. {
  85. uuid_t uuid;
  86. char uuid_str[36 + 1];
  87. json_object *tmp;
  88. json_object *obj = json_object_new_object();
  89. tmp = json_object_new_string(type);
  90. json_object_object_add(obj, "type", tmp);
  91. if (unlikely(!msg_id)) {
  92. uuid_generate(uuid);
  93. uuid_unparse(uuid, uuid_str);
  94. msg_id = uuid_str;
  95. }
  96. if (ts_secs == 0) {
  97. ts_us = now_realtime_usec();
  98. ts_secs = ts_us / USEC_PER_SEC;
  99. ts_us = ts_us % USEC_PER_SEC;
  100. }
  101. tmp = json_object_new_string(msg_id);
  102. json_object_object_add(obj, "msg-id", tmp);
  103. tmp = json_object_new_int64(ts_secs);
  104. json_object_object_add(obj, "timestamp", tmp);
  105. // TODO handle this somehow on older json-c
  106. // tmp = json_object_new_uint64(ts_us);
  107. // probably jso->_to_json_string -> custom function
  108. // jso->o.c_uint64 -> map this with pointer to signed int
  109. // commit that implements json_object_new_uint64 is 3c3b592
  110. // between 0.14 and 0.15
  111. tmp = json_object_new_int64(ts_us);
  112. json_object_object_add(obj, "timestamp-offset-usec", tmp);
  113. tmp = json_object_new_int64(aclk_session_sec);
  114. json_object_object_add(obj, "connect", tmp);
  115. // TODO handle this somehow see above
  116. // tmp = json_object_new_uint64(0 /* TODO aclk_session_us */);
  117. tmp = json_object_new_int64(aclk_session_us);
  118. json_object_object_add(obj, "connect-offset-usec", tmp);
  119. tmp = json_object_new_int(version);
  120. json_object_object_add(obj, "version", tmp);
  121. return obj;
  122. }
  123. #ifndef __GNUC__
  124. #pragma endregion
  125. #endif
  126. #ifndef __GNUC__
  127. #pragma region aclk_tx_msgs message generators
  128. #endif
  129. void aclk_http_msg_v2_err(mqtt_wss_client client, const char *topic, const char *msg_id, int http_code, int ec, const char* emsg, const char *payload, size_t payload_len)
  130. {
  131. json_object *tmp, *msg;
  132. msg = create_hdr("http", msg_id, 0, 0, 2);
  133. tmp = json_object_new_int(http_code);
  134. json_object_object_add(msg, "http-code", tmp);
  135. tmp = json_object_new_int(ec);
  136. json_object_object_add(msg, "error-code", tmp);
  137. tmp = json_object_new_string(emsg);
  138. json_object_object_add(msg, "error-description", tmp);
  139. if (aclk_send_message_with_bin_payload(client, msg, topic, payload, payload_len)) {
  140. error("Failed to send cancelation message for http reply");
  141. }
  142. }
  143. void aclk_http_msg_v2(mqtt_wss_client client, const char *topic, const char *msg_id, usec_t t_exec, usec_t created, int http_code, const char *payload, size_t payload_len)
  144. {
  145. json_object *tmp, *msg;
  146. msg = create_hdr("http", msg_id, 0, 0, 2);
  147. tmp = json_object_new_int64(t_exec);
  148. json_object_object_add(msg, "t-exec", tmp);
  149. tmp = json_object_new_int64(created);
  150. json_object_object_add(msg, "t-rx", tmp);
  151. tmp = json_object_new_int(http_code);
  152. json_object_object_add(msg, "http-code", tmp);
  153. int rc = aclk_send_message_with_bin_payload(client, msg, topic, payload, payload_len);
  154. switch (rc) {
  155. case HTTP_RESP_FORBIDDEN:
  156. aclk_http_msg_v2_err(client, topic, msg_id, rc, CLOUD_EC_REQ_REPLY_TOO_BIG, CLOUD_EMSG_REQ_REPLY_TOO_BIG, payload, payload_len);
  157. break;
  158. case HTTP_RESP_INTERNAL_SERVER_ERROR:
  159. aclk_http_msg_v2_err(client, topic, msg_id, rc, CLOUD_EC_FAIL_TOPIC, CLOUD_EMSG_FAIL_TOPIC, payload, payload_len);
  160. break;
  161. case HTTP_RESP_BACKEND_FETCH_FAILED:
  162. aclk_http_msg_v2_err(client, topic, msg_id, rc, CLOUD_EC_SND_TIMEOUT, CLOUD_EMSG_SND_TIMEOUT, payload, payload_len);
  163. break;
  164. }
  165. }
  166. uint16_t aclk_send_agent_connection_update(mqtt_wss_client client, int reachable) {
  167. size_t len;
  168. uint16_t pid;
  169. update_agent_connection_t conn = {
  170. .reachable = (reachable ? 1 : 0),
  171. .lwt = 0,
  172. .session_id = aclk_session_newarch,
  173. .capabilities = aclk_get_agent_capas()
  174. };
  175. rrdhost_aclk_state_lock(localhost);
  176. if (unlikely(!localhost->aclk_state.claimed_id)) {
  177. error("Internal error. Should not come here if not claimed");
  178. rrdhost_aclk_state_unlock(localhost);
  179. return 0;
  180. }
  181. if (localhost->aclk_state.prev_claimed_id)
  182. conn.claim_id = localhost->aclk_state.prev_claimed_id;
  183. else
  184. conn.claim_id = localhost->aclk_state.claimed_id;
  185. char *msg = generate_update_agent_connection(&len, &conn);
  186. rrdhost_aclk_state_unlock(localhost);
  187. if (!msg) {
  188. error("Error generating agent::v1::UpdateAgentConnection payload");
  189. return 0;
  190. }
  191. pid = aclk_send_bin_message_subtopic_pid(client, msg, len, ACLK_TOPICID_AGENT_CONN, "UpdateAgentConnection");
  192. if (localhost->aclk_state.prev_claimed_id) {
  193. freez(localhost->aclk_state.prev_claimed_id);
  194. localhost->aclk_state.prev_claimed_id = NULL;
  195. }
  196. return pid;
  197. }
  198. char *aclk_generate_lwt(size_t *size) {
  199. update_agent_connection_t conn = {
  200. .reachable = 0,
  201. .lwt = 1,
  202. .session_id = aclk_session_newarch,
  203. .capabilities = NULL
  204. };
  205. rrdhost_aclk_state_lock(localhost);
  206. if (unlikely(!localhost->aclk_state.claimed_id)) {
  207. error("Internal error. Should not come here if not claimed");
  208. rrdhost_aclk_state_unlock(localhost);
  209. return NULL;
  210. }
  211. conn.claim_id = localhost->aclk_state.claimed_id;
  212. char *msg = generate_update_agent_connection(size, &conn);
  213. rrdhost_aclk_state_unlock(localhost);
  214. if (!msg)
  215. error("Error generating agent::v1::UpdateAgentConnection payload for LWT");
  216. return msg;
  217. }
  218. #ifndef __GNUC__
  219. #pragma endregion
  220. #endif