123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540 |
- // SPDX-License-Identifier: GPL-3.0-or-later
- #include "aclk_tx_msgs.h"
- #include "daemon/common.h"
- #include "aclk_util.h"
- #include "aclk_stats.h"
- #include "aclk.h"
- #ifndef __GNUC__
- #pragma region aclk_tx_msgs helper functions
- #endif
- // version for aclk legacy (old cloud arch)
- #define ACLK_VERSION 2
- static void aclk_send_message_subtopic(mqtt_wss_client client, json_object *msg, enum aclk_topics subtopic)
- {
- uint16_t packet_id;
- const char *str = json_object_to_json_string_ext(msg, JSON_C_TO_STRING_PLAIN);
- const char *topic = aclk_get_topic(subtopic);
- if (unlikely(!topic)) {
- error("Couldn't get topic. Aborting message send");
- return;
- }
- mqtt_wss_publish_pid(client, topic, str, strlen(str), MQTT_WSS_PUB_QOS1, &packet_id);
- #ifdef NETDATA_INTERNAL_CHECKS
- aclk_stats_msg_published(packet_id);
- #endif
- #ifdef ACLK_LOG_CONVERSATION_DIR
- #define FN_MAX_LEN 1024
- char filename[FN_MAX_LEN];
- snprintf(filename, FN_MAX_LEN, ACLK_LOG_CONVERSATION_DIR "/%010d-tx.json", ACLK_GET_CONV_LOG_NEXT());
- json_object_to_file_ext(filename, msg, JSON_C_TO_STRING_PRETTY);
- #endif
- }
- uint16_t aclk_send_bin_message_subtopic_pid(mqtt_wss_client client, char *msg, size_t msg_len, enum aclk_topics subtopic, const char *msgname)
- {
- #ifndef ACLK_LOG_CONVERSATION_DIR
- UNUSED(msgname);
- #endif
- uint16_t packet_id;
- const char *topic = aclk_get_topic(subtopic);
- if (unlikely(!topic)) {
- error("Couldn't get topic. Aborting message send.");
- return 0;
- }
- mqtt_wss_publish_pid(client, topic, msg, msg_len, MQTT_WSS_PUB_QOS1, &packet_id);
- #ifdef NETDATA_INTERNAL_CHECKS
- aclk_stats_msg_published(packet_id);
- #endif
- #ifdef ACLK_LOG_CONVERSATION_DIR
- #define FN_MAX_LEN 1024
- char filename[FN_MAX_LEN];
- snprintf(filename, FN_MAX_LEN, ACLK_LOG_CONVERSATION_DIR "/%010d-tx-%s.bin", ACLK_GET_CONV_LOG_NEXT(), msgname);
- FILE *fptr;
- if (fptr = fopen(filename,"w")) {
- fwrite(msg, msg_len, 1, fptr);
- fclose(fptr);
- }
- #endif
- return packet_id;
- }
- static uint16_t aclk_send_message_subtopic_pid(mqtt_wss_client client, json_object *msg, enum aclk_topics subtopic)
- {
- uint16_t packet_id;
- const char *str = json_object_to_json_string_ext(msg, JSON_C_TO_STRING_PLAIN);
- const char *topic = aclk_get_topic(subtopic);
- if (unlikely(!topic)) {
- error("Couldn't get topic. Aborting message send");
- return 0;
- }
- mqtt_wss_publish_pid(client, topic, str, strlen(str), MQTT_WSS_PUB_QOS1, &packet_id);
- #ifdef NETDATA_INTERNAL_CHECKS
- aclk_stats_msg_published(packet_id);
- #endif
- #ifdef ACLK_LOG_CONVERSATION_DIR
- #define FN_MAX_LEN 1024
- char filename[FN_MAX_LEN];
- snprintf(filename, FN_MAX_LEN, ACLK_LOG_CONVERSATION_DIR "/%010d-tx.json", ACLK_GET_CONV_LOG_NEXT());
- json_object_to_file_ext(filename, msg, JSON_C_TO_STRING_PRETTY);
- #endif
- return packet_id;
- }
- /* UNUSED now but can be used soon MVP1?
- static void aclk_send_message_topic(mqtt_wss_client client, json_object *msg, const char *topic)
- {
- if (unlikely(!topic || topic[0] != '/')) {
- error ("Full topic required!");
- return;
- }
- const char *str = json_object_to_json_string_ext(msg, JSON_C_TO_STRING_PLAIN);
- mqtt_wss_publish(client, topic, str, strlen(str), MQTT_WSS_PUB_QOS1);
- #ifdef NETDATA_INTERNAL_CHECKS
- aclk_stats_msg_published();
- #endif
- #ifdef ACLK_LOG_CONVERSATION_DIR
- #define FN_MAX_LEN 1024
- char filename[FN_MAX_LEN];
- snprintf(filename, FN_MAX_LEN, ACLK_LOG_CONVERSATION_DIR "/%010d-tx.json", ACLK_GET_CONV_LOG_NEXT());
- json_object_to_file_ext(filename, msg, JSON_C_TO_STRING_PRETTY);
- #endif
- }
- */
- #define TOPIC_MAX_LEN 512
- #define V2_BIN_PAYLOAD_SEPARATOR "\x0D\x0A\x0D\x0A"
- static int aclk_send_message_with_bin_payload(mqtt_wss_client client, json_object *msg, const char *topic, const void *payload, size_t payload_len)
- {
- uint16_t packet_id;
- const char *str;
- char *full_msg = NULL;
- int len, rc;
- if (unlikely(!topic || topic[0] != '/')) {
- error ("Full topic required!");
- return 500;
- }
- str = json_object_to_json_string_ext(msg, JSON_C_TO_STRING_PLAIN);
- len = strlen(str);
- if (payload_len) {
- full_msg = mallocz(len + strlen(V2_BIN_PAYLOAD_SEPARATOR) + payload_len);
- memcpy(full_msg, str, len);
- memcpy(&full_msg[len], V2_BIN_PAYLOAD_SEPARATOR, strlen(V2_BIN_PAYLOAD_SEPARATOR));
- len += strlen(V2_BIN_PAYLOAD_SEPARATOR);
- memcpy(&full_msg[len], payload, payload_len);
- len += payload_len;
- }
- /* TODO
- #ifdef ACLK_LOG_CONVERSATION_DIR
- #define FN_MAX_LEN 1024
- char filename[FN_MAX_LEN];
- snprintf(filename, FN_MAX_LEN, ACLK_LOG_CONVERSATION_DIR "/%010d-tx.json", ACLK_GET_CONV_LOG_NEXT());
- json_object_to_file_ext(filename, msg, JSON_C_TO_STRING_PRETTY);
- #endif */
- rc = mqtt_wss_publish_pid_block(client, topic, payload_len ? full_msg : str, len, MQTT_WSS_PUB_QOS1, &packet_id, 5000);
- if (rc == MQTT_WSS_ERR_BLOCK_TIMEOUT) {
- error("Timeout sending binpacked message");
- freez(full_msg);
- return 503;
- }
- if (rc == MQTT_WSS_ERR_TX_BUF_TOO_SMALL) {
- error("Message is bigger than allowed maximum");
- freez(full_msg);
- return 403;
- }
- #ifdef NETDATA_INTERNAL_CHECKS
- aclk_stats_msg_published(packet_id);
- #endif
- freez(full_msg);
- return 0;
- }
- /*
- * Creates universal header common for all ACLK messages. User gets ownership of json object created.
- * Usually this is freed by send function after message has been sent.
- */
- static struct json_object *create_hdr(const char *type, const char *msg_id, time_t ts_secs, usec_t ts_us, int version)
- {
- uuid_t uuid;
- char uuid_str[36 + 1];
- json_object *tmp;
- json_object *obj = json_object_new_object();
- tmp = json_object_new_string(type);
- json_object_object_add(obj, "type", tmp);
- if (unlikely(!msg_id)) {
- uuid_generate(uuid);
- uuid_unparse(uuid, uuid_str);
- msg_id = uuid_str;
- }
- if (ts_secs == 0) {
- ts_us = now_realtime_usec();
- ts_secs = ts_us / USEC_PER_SEC;
- ts_us = ts_us % USEC_PER_SEC;
- }
- tmp = json_object_new_string(msg_id);
- json_object_object_add(obj, "msg-id", tmp);
- tmp = json_object_new_int64(ts_secs);
- json_object_object_add(obj, "timestamp", tmp);
- // TODO handle this somehow on older json-c
- // tmp = json_object_new_uint64(ts_us);
- // probably jso->_to_json_string -> custom function
- // jso->o.c_uint64 -> map this with pointer to signed int
- // commit that implements json_object_new_uint64 is 3c3b592
- // between 0.14 and 0.15
- tmp = json_object_new_int64(ts_us);
- json_object_object_add(obj, "timestamp-offset-usec", tmp);
- tmp = json_object_new_int64(aclk_session_sec);
- json_object_object_add(obj, "connect", tmp);
- // TODO handle this somehow see above
- // tmp = json_object_new_uint64(0 /* TODO aclk_session_us */);
- tmp = json_object_new_int64(aclk_session_us);
- json_object_object_add(obj, "connect-offset-usec", tmp);
- tmp = json_object_new_int(version);
- json_object_object_add(obj, "version", tmp);
- return obj;
- }
- static char *create_uuid()
- {
- uuid_t uuid;
- char *uuid_str = mallocz(36 + 1);
- uuid_generate(uuid);
- uuid_unparse(uuid, uuid_str);
- return uuid_str;
- }
- #ifndef __GNUC__
- #pragma endregion
- #endif
- #ifndef __GNUC__
- #pragma region aclk_tx_msgs message generators
- #endif
- /*
- * This will send the /api/v1/info
- */
- #define BUFFER_INITIAL_SIZE (1024 * 16)
- void aclk_send_info_metadata(mqtt_wss_client client, int metadata_submitted, RRDHOST *host)
- {
- BUFFER *local_buffer = buffer_create(BUFFER_INITIAL_SIZE);
- json_object *msg, *payload, *tmp;
- char *msg_id = create_uuid();
- buffer_flush(local_buffer);
- local_buffer->contenttype = CT_APPLICATION_JSON;
- // on_connect messages are sent on a health reload, if the on_connect message is real then we
- // use the session time as the fake timestamp to indicate that it starts the session. If it is
- // a fake on_connect message then use the real timestamp to indicate it is within the existing
- // session.
- if (metadata_submitted)
- msg = create_hdr("update", msg_id, 0, 0, ACLK_VERSION);
- else
- msg = create_hdr("connect", msg_id, aclk_session_sec, aclk_session_us, ACLK_VERSION);
- payload = json_object_new_object();
- json_object_object_add(msg, "payload", payload);
- web_client_api_request_v1_info_fill_buffer(host, local_buffer);
- tmp = json_tokener_parse(local_buffer->buffer);
- json_object_object_add(payload, "info", tmp);
- buffer_flush(local_buffer);
- charts2json(host, local_buffer, 1, 0);
- tmp = json_tokener_parse(local_buffer->buffer);
- json_object_object_add(payload, "charts", tmp);
- aclk_send_message_subtopic(client, msg, ACLK_TOPICID_METADATA);
- json_object_put(msg);
- freez(msg_id);
- buffer_free(local_buffer);
- }
- // TODO should include header instead
- void health_active_log_alarms_2json(RRDHOST *host, BUFFER *wb);
- void aclk_send_alarm_metadata(mqtt_wss_client client, int metadata_submitted)
- {
- BUFFER *local_buffer = buffer_create(BUFFER_INITIAL_SIZE);
- json_object *msg, *payload, *tmp;
- char *msg_id = create_uuid();
- buffer_flush(local_buffer);
- local_buffer->contenttype = CT_APPLICATION_JSON;
- // on_connect messages are sent on a health reload, if the on_connect message is real then we
- // use the session time as the fake timestamp to indicate that it starts the session. If it is
- // a fake on_connect message then use the real timestamp to indicate it is within the existing
- // session.
- if (metadata_submitted)
- msg = create_hdr("connect_alarms", msg_id, 0, 0, ACLK_VERSION);
- else
- msg = create_hdr("connect_alarms", msg_id, aclk_session_sec, aclk_session_us, ACLK_VERSION);
- payload = json_object_new_object();
- json_object_object_add(msg, "payload", payload);
- health_alarms2json(localhost, local_buffer, 1);
- tmp = json_tokener_parse(local_buffer->buffer);
- json_object_object_add(payload, "configured-alarms", tmp);
- buffer_flush(local_buffer);
- health_active_log_alarms_2json(localhost, local_buffer);
- tmp = json_tokener_parse(local_buffer->buffer);
- json_object_object_add(payload, "alarms-active", tmp);
- aclk_send_message_subtopic(client, msg, ACLK_TOPICID_ALARMS);
- json_object_put(msg);
- freez(msg_id);
- buffer_free(local_buffer);
- }
- void aclk_http_msg_v2_err(mqtt_wss_client client, const char *topic, const char *msg_id, int http_code, int ec, const char* emsg, const char *payload, size_t payload_len)
- {
- json_object *tmp, *msg;
- msg = create_hdr("http", msg_id, 0, 0, 2);
- tmp = json_object_new_int(http_code);
- json_object_object_add(msg, "http-code", tmp);
- tmp = json_object_new_int(ec);
- json_object_object_add(msg, "error-code", tmp);
- tmp = json_object_new_string(emsg);
- json_object_object_add(msg, "error-description", tmp);
- if (aclk_send_message_with_bin_payload(client, msg, topic, payload, payload_len)) {
- error("Failed to send cancelation message for http reply");
- }
- json_object_put(msg);
- }
- void aclk_http_msg_v2(mqtt_wss_client client, const char *topic, const char *msg_id, usec_t t_exec, usec_t created, int http_code, const char *payload, size_t payload_len)
- {
- json_object *tmp, *msg;
- msg = create_hdr("http", msg_id, 0, 0, 2);
- tmp = json_object_new_int64(t_exec);
- json_object_object_add(msg, "t-exec", tmp);
- tmp = json_object_new_int64(created);
- json_object_object_add(msg, "t-rx", tmp);
- tmp = json_object_new_int(http_code);
- json_object_object_add(msg, "http-code", tmp);
- int rc = aclk_send_message_with_bin_payload(client, msg, topic, payload, payload_len);
- json_object_put(msg);
- switch (rc) {
- case 403:
- aclk_http_msg_v2_err(client, topic, msg_id, rc, CLOUD_EC_REQ_REPLY_TOO_BIG, CLOUD_EMSG_REQ_REPLY_TOO_BIG, payload, payload_len);
- break;
- case 500:
- aclk_http_msg_v2_err(client, topic, msg_id, rc, CLOUD_EC_FAIL_TOPIC, CLOUD_EMSG_FAIL_TOPIC, payload, payload_len);
- break;
- case 503:
- aclk_http_msg_v2_err(client, topic, msg_id, rc, CLOUD_EC_SND_TIMEOUT, CLOUD_EMSG_SND_TIMEOUT, payload, payload_len);
- break;
- }
- }
- void aclk_chart_msg(mqtt_wss_client client, RRDHOST *host, const char *chart)
- {
- json_object *msg, *payload;
- BUFFER *tmp_buffer;
- RRDSET *st;
-
- st = rrdset_find(host, chart);
- if (!st)
- st = rrdset_find_byname(host, chart);
- if (!st) {
- info("FAILED to find chart %s", chart);
- return;
- }
- tmp_buffer = buffer_create(BUFFER_INITIAL_SIZE);
- rrdset2json(st, tmp_buffer, NULL, NULL, 1);
- payload = json_tokener_parse(tmp_buffer->buffer);
- if (!payload) {
- error("Failed to parse JSON from rrdset2json");
- buffer_free(tmp_buffer);
- return;
- }
- msg = create_hdr("chart", NULL, 0, 0, ACLK_VERSION);
- json_object_object_add(msg, "payload", payload);
- aclk_send_message_subtopic(client, msg, ACLK_TOPICID_CHART);
- buffer_free(tmp_buffer);
- json_object_put(msg);
- }
- void aclk_alarm_state_msg(mqtt_wss_client client, json_object *msg)
- {
- // we create header here on purpose (and not send message with it already as `msg` param)
- // timestamps etc. which in ACLK legacy would be wrong (because ACLK legacy
- // send message with timestamps already to Query Queue they would be incorrect at time
- // when query queue would get to send them)
- json_object *obj = create_hdr("status-change", NULL, 0, 0, ACLK_VERSION);
- json_object_object_add(obj, "payload", msg);
- aclk_send_message_subtopic(client, obj, ACLK_TOPICID_ALARMS);
- json_object_put(obj);
- }
- /*
- * Will generate disconnect message.
- * @param message if NULL it will generate LWT message (unexpected).
- * Otherwise string pointed to by this parameter will be used as
- * reason.
- */
- json_object *aclk_generate_disconnect(const char *message)
- {
- json_object *tmp, *msg;
- msg = create_hdr("disconnect", NULL, 0, 0, 2);
- tmp = json_object_new_string(message ? message : "unexpected");
- json_object_object_add(msg, "payload", tmp);
- return msg;
- }
- int aclk_send_app_layer_disconnect(mqtt_wss_client client, const char *message)
- {
- int pid;
- json_object *msg = aclk_generate_disconnect(message);
- pid = aclk_send_message_subtopic_pid(client, msg, ACLK_TOPICID_METADATA);
- json_object_put(msg);
- return pid;
- }
- #ifdef ENABLE_NEW_CLOUD_PROTOCOL
- // new protobuf msgs
- uint16_t aclk_send_agent_connection_update(mqtt_wss_client client, int reachable) {
- size_t len;
- uint16_t pid;
- update_agent_connection_t conn = {
- .reachable = (reachable ? 1 : 0),
- .lwt = 0,
- .session_id = aclk_session_newarch
- };
- rrdhost_aclk_state_lock(localhost);
- if (unlikely(!localhost->aclk_state.claimed_id)) {
- error("Internal error. Should not come here if not claimed");
- rrdhost_aclk_state_unlock(localhost);
- return 0;
- }
- if (localhost->aclk_state.prev_claimed_id)
- conn.claim_id = localhost->aclk_state.prev_claimed_id;
- else
- conn.claim_id = localhost->aclk_state.claimed_id;
- char *msg = generate_update_agent_connection(&len, &conn);
- rrdhost_aclk_state_unlock(localhost);
- if (!msg) {
- error("Error generating agent::v1::UpdateAgentConnection payload");
- return 0;
- }
- pid = aclk_send_bin_message_subtopic_pid(client, msg, len, ACLK_TOPICID_AGENT_CONN, "UpdateAgentConnection");
- freez(msg);
- if (localhost->aclk_state.prev_claimed_id) {
- freez(localhost->aclk_state.prev_claimed_id);
- localhost->aclk_state.prev_claimed_id = NULL;
- }
- return pid;
- }
- char *aclk_generate_lwt(size_t *size) {
- update_agent_connection_t conn = {
- .reachable = 0,
- .lwt = 1,
- .session_id = aclk_session_newarch
- };
- rrdhost_aclk_state_lock(localhost);
- if (unlikely(!localhost->aclk_state.claimed_id)) {
- error("Internal error. Should not come here if not claimed");
- rrdhost_aclk_state_unlock(localhost);
- return NULL;
- }
- conn.claim_id = localhost->aclk_state.claimed_id;
- char *msg = generate_update_agent_connection(size, &conn);
- rrdhost_aclk_state_unlock(localhost);
- if (!msg)
- error("Error generating agent::v1::UpdateAgentConnection payload for LWT");
- return msg;
- }
- void aclk_generate_node_registration(mqtt_wss_client client, node_instance_creation_t *node_creation) {
- size_t len;
- char *msg = generate_node_instance_creation(&len, node_creation);
- if (!msg) {
- error("Error generating nodeinstance::create::v1::CreateNodeInstance");
- return;
- }
- aclk_send_bin_message_subtopic_pid(client, msg, len, ACLK_TOPICID_CREATE_NODE, "CreateNodeInstance");
- freez(msg);
- }
- void aclk_generate_node_state_update(mqtt_wss_client client, node_instance_connection_t *node_connection) {
- size_t len;
- char *msg = generate_node_instance_connection(&len, node_connection);
- if (!msg) {
- error("Error generating nodeinstance::v1::UpdateNodeInstanceConnection");
- return;
- }
- aclk_send_bin_message_subtopic_pid(client, msg, len, ACLK_TOPICID_NODE_CONN, "UpdateNodeInstanceConnection");
- freez(msg);
- }
- #endif /* ENABLE_NEW_CLOUD_PROTOCOL */
- #ifndef __GNUC__
- #pragma endregion
- #endif
|