123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737738739740741742743744745746747748749750751752753754755756757758759760761762763764765766767768769770771772773774775776777778779780781782783784785786787788789790791792793794795796797798799800801802803804805806807808809810811812813814815816817818819820821822823824825826827828829830831832833834835836837838839840841842843844845846847848849850851852853854855856857858859860861862863864865866867868869870871872873874875876877878879880881882883884885886887888889890891892893894895896897898899900901902903904905906907908909910911912913914915916917918919920921922923924925926927928929930931932933934935936937938939940941942943944945946947948949950951952953954955956957958959960961962963964965966967968969970971972973974975976977978979980981982983984985986987988989990991992993994995 |
- // SPDX-License-Identifier: GPL-3.0-or-later
- #include "libnetdata/libnetdata.h"
- #include "agent_cloud_link.h"
- // Read from the config file -- new section [agent_cloud_link]
- // Defaults are supplied
- int aclk_recv_maximum = 0; // default 20
- int aclk_send_maximum = 0; // default 20
- int aclk_port = 0; // default 1883
- char *aclk_hostname = NULL; //default localhost
- int aclk_subscribed = 0;
- int aclk_metadata_submitted = 0;
- int waiting_init = 1;
- int cmdpause = 0; // Used to pause query processing
- BUFFER *aclk_buffer = NULL;
- char *global_base_topic = NULL;
- int cloud_to_agent_parse(JSON_ENTRY *e)
- {
- struct aclk_request *data = e->callback_data;
- switch(e->type) {
- case JSON_OBJECT:
- e->callback_function = cloud_to_agent_parse;
- break;
- case JSON_ARRAY:
- e->callback_function = cloud_to_agent_parse;
- break;
- case JSON_STRING:
- if (!strcmp(e->name, ACLK_JSON_IN_MSGID)) {
- data->msg_id = strdupz(e->data.string);
- break;
- }
- if (!strcmp(e->name, ACLK_JSON_IN_TYPE)) {
- data->type_id = strdupz(e->data.string);
- break;
- }
- if (!strcmp(e->name, ACLK_JSON_IN_TOPIC)) {
- data->topic = strdupz(e->data.string);
- break;
- }
- if (!strcmp(e->name, ACLK_JSON_IN_URL)) {
- data->url = strdupz(e->data.string);
- break;
- }
- break;
- case JSON_NUMBER:
- if (!strcmp(e->name, ACLK_JSON_IN_VERSION)) {
- data->version = atol(e->data.string);
- break;
- }
- break;
- case JSON_BOOLEAN:
- break;
- case JSON_NULL:
- break;
- }
- return 0;
- }
- //char *send_http_request(char *host, char *port, char *url, BUFFER *b)
- //{
- // struct timeval timeout = { .tv_sec = 30, .tv_usec = 0 };
- //
- // buffer_flush(b);
- // buffer_sprintf(
- // b,
- // "GET %s HTTP/1.1\r\nHost: %s\r\nAccept: plain/text\r\nAccept-Language: en-us\r\nUser-Agent: Netdata/rocks\r\n\r\n",
- // url, host);
- // int sock = connect_to_this_ip46(IPPROTO_TCP, SOCK_STREAM, host, 0, "443", &timeout);
- //
- // if (unlikely(sock == -1)) {
- // error("Handshake failed");
- // return NULL;
- // }
- //
- // SSL_CTX *ctx = security_initialize_openssl_client();
- // // Certificate chain: not updating the stores - do we need private CA roots?
- // // Calls to SSL_CTX_load_verify_locations would go here.
- // SSL *ssl = SSL_new(ctx);
- // SSL_set_fd(ssl, sock);
- // int err = SSL_connect(ssl);
- // SSL_write(ssl, b->buffer, b->len); // Timeout options?
- // int bytes_read = SSL_read(ssl, b->buffer, b->len);
- // SSL_shutdown(ssl);
- // close(sock);
- //}
- // Set when we have connection up and running from the connection callback
- int aclk_connection_initialized = 0;
- static netdata_mutex_t aclk_mutex = NETDATA_MUTEX_INITIALIZER;
- static netdata_mutex_t query_mutex = NETDATA_MUTEX_INITIALIZER;
- #define ACLK_LOCK netdata_mutex_lock(&aclk_mutex)
- #define ACLK_UNLOCK netdata_mutex_unlock(&aclk_mutex)
- #define QUERY_LOCK netdata_mutex_lock(&query_mutex)
- #define QUERY_UNLOCK netdata_mutex_unlock(&query_mutex)
- pthread_cond_t query_cond_wait = PTHREAD_COND_INITIALIZER;
- pthread_mutex_t query_lock_wait = PTHREAD_MUTEX_INITIALIZER;
- #define QUERY_THREAD_LOCK pthread_mutex_lock(&query_lock_wait);
- #define QUERY_THREAD_UNLOCK pthread_mutex_unlock(&query_lock_wait)
- #define QUERY_THREAD_WAKEUP pthread_cond_signal(&query_cond_wait)
- struct aclk_query {
- time_t created;
- time_t run_after; // Delay run until after this time
- char *topic; // Topic to respond to
- char *data; // Internal data (NULL if request from the cloud)
- char *msg_id; // msg_id generated by the cloud (NULL if internal)
- char *query; // The actual query
- u_char deleted; // Mark deleted for garbage collect
- struct aclk_query *next;
- };
- struct aclk_query_queue {
- struct aclk_query *aclk_query_head;
- struct aclk_query *aclk_query_tail;
- u_int64_t count;
- } aclk_queue = { .aclk_query_head = NULL, .aclk_query_tail = NULL, .count = 0 };
- /*
- * Free a query structure when done
- */
- void aclk_query_free(struct aclk_query *this_query)
- {
- if (unlikely(!this_query))
- return;
- freez(this_query->topic);
- freez(this_query->query);
- if (this_query->data)
- freez(this_query->data);
- if (this_query->msg_id)
- freez(this_query->msg_id);
- freez(this_query);
- return;
- }
- // Returns the entry after which we need to create a new entry to run at the specified time
- // If NULL is returned we need to add to HEAD
- // Called with locked entries
- struct aclk_query *aclk_query_find_position(time_t time_to_run)
- {
- struct aclk_query *tmp_query, *last_query;
- last_query = NULL;
- tmp_query = aclk_queue.aclk_query_head;
- while (tmp_query) {
- if (tmp_query->run_after > time_to_run)
- return last_query;
- last_query = tmp_query;
- tmp_query = tmp_query->next;
- }
- return last_query;
- }
- // Need to have a lock before calling this
- struct aclk_query *aclk_query_find(char *topic, char *data, char *msg_id, char *query)
- {
- struct aclk_query *tmp_query;
- tmp_query = aclk_queue.aclk_query_head;
- while (tmp_query) {
- if (likely(!tmp_query->deleted)) {
- if (strcmp(tmp_query->topic, topic) == 0 && (strcmp(tmp_query->query, query) == 0)) {
- if ((!data || (data && strcmp(data, tmp_query->data) == 0)) &&
- (!msg_id || (msg_id && strcmp(msg_id, tmp_query->msg_id) == 0)))
- return tmp_query;
- }
- }
- tmp_query = tmp_query->next;
- }
- return NULL;
- }
- /*
- * Add a query to execute, the result will be send to the specified topic
- */
- int aclk_queue_query(char *topic, char *data, char *msg_id, char *query, int run_after, int internal)
- {
- struct aclk_query *new_query, *tmp_query;
- // Ignore all commands while we wait for the agent to initialize
- if (unlikely(waiting_init))
- return 0;
- run_after = now_realtime_sec() + run_after;
- QUERY_LOCK;
- tmp_query = aclk_query_find(topic, data, msg_id, query);
- if (unlikely(tmp_query)) {
- if (tmp_query->run_after == run_after) {
- QUERY_UNLOCK;
- QUERY_THREAD_WAKEUP;
- return 0;
- }
- tmp_query->deleted = 1;
- }
- new_query = callocz(1, sizeof(struct aclk_query));
- if (internal) {
- new_query->topic = strdupz(topic);
- new_query->query = strdupz(query);
- } else {
- new_query->topic = topic;
- new_query->query = query;
- new_query->msg_id = msg_id;
- }
- if (data)
- new_query->data = strdupz(data);
- new_query->next = NULL;
- new_query->created = now_realtime_sec();
- new_query->run_after = run_after;
- info("Added query (%s) (%s)", topic, query);
- tmp_query = aclk_query_find_position(run_after);
- if (tmp_query) {
- new_query->next = tmp_query->next;
- tmp_query->next = new_query;
- if (tmp_query == aclk_queue.aclk_query_tail)
- aclk_queue.aclk_query_tail = new_query;
- aclk_queue.count++;
- QUERY_UNLOCK;
- QUERY_THREAD_WAKEUP;
- return 0;
- }
- new_query->next = aclk_queue.aclk_query_head;
- aclk_queue.aclk_query_head = new_query;
- aclk_queue.count++;
- QUERY_UNLOCK;
- QUERY_THREAD_WAKEUP;
- return 0;
- // if (likely(aclk_queue.aclk_query_tail)) {
- // aclk_queue.aclk_query_tail->next = new_query;
- // aclk_queue.aclk_query_tail = new_query;
- // aclk_queue.count++;
- // QUERY_UNLOCK;
- // return 0;
- // }
- //
- // if (likely(!aclk_queue.aclk_query_head)) {
- // aclk_queue.aclk_query_head = new_query;
- // aclk_queue.aclk_query_tail = new_query;
- // aclk_queue.count++;
- // QUERY_UNLOCK;
- // return 0;
- // }
- // QUERY_UNLOCK;
- // return 0;
- }
- inline int aclk_submit_request(struct aclk_request *request)
- {
- return aclk_queue_query(request->topic, NULL, request->msg_id, request->url, 0, 0);
- }
- /*
- * Get the next query to process - NULL if nothing there
- * The caller needs to free memory by calling aclk_query_free()
- *
- * topic
- * query
- * The structure itself
- *
- */
- struct aclk_query *aclk_queue_pop()
- {
- struct aclk_query *this_query;
- QUERY_LOCK;
- if (likely(!aclk_queue.aclk_query_head)) {
- QUERY_UNLOCK;
- return NULL;
- }
- this_query = aclk_queue.aclk_query_head;
- if (this_query->run_after > now_realtime_sec()) {
- info("Query %s will run in %ld seconds", this_query->query, this_query->run_after - now_realtime_sec());
- QUERY_UNLOCK;
- return NULL;
- }
- aclk_queue.count--;
- aclk_queue.aclk_query_head = aclk_queue.aclk_query_head->next;
- if (likely(!aclk_queue.aclk_query_head)) {
- aclk_queue.aclk_query_tail = NULL;
- }
- QUERY_UNLOCK;
- return this_query;
- }
- // This will give the base topic that the agent will publish messages.
- // subtopics will be sent under the base topic e.g. base_topic/subtopic
- // This is called by aclk_init(), to compute the base topic once and have
- // it stored internally.
- // Need to check if additional logic should be added to make sure that there
- // is enough information to determine the base topic at init time
- // TODO: Locking may be needed, depends on the calculation of the base topic and also if we need to switch
- // that on the fly
- char *get_publish_base_topic(PUBLISH_TOPIC_ACTION action)
- {
- static char *topic = NULL;
- if (unlikely(!is_agent_claimed()))
- return NULL;
- ACLK_LOCK;
- if (unlikely(action == PUBLICH_TOPIC_FREE)) {
- if (likely(topic)) {
- freez(topic);
- topic = NULL;
- }
- ACLK_UNLOCK;
- return NULL;
- }
- if (unlikely(action == PUBLICH_TOPIC_REBUILD)) {
- ACLK_UNLOCK;
- get_publish_base_topic(PUBLICH_TOPIC_FREE);
- return get_publish_base_topic(PUBLICH_TOPIC_GET);
- }
- if (unlikely(!topic)) {
- char tmp_topic[ACLK_MAX_TOPIC + 1];
- sprintf(tmp_topic, ACLK_TOPIC_STRUCTURE, is_agent_claimed());
- topic = strdupz(tmp_topic);
- }
- ACLK_UNLOCK;
- return topic;
- }
- char *get_topic(char *sub_topic, char *final_topic, int max_size)
- {
- if (unlikely(!global_base_topic))
- global_base_topic = GET_PUBLISH_BASE_TOPIC;
- if (unlikely(!global_base_topic))
- return sub_topic;
- snprintfz(final_topic, max_size, "%s/%s", global_base_topic, sub_topic);
- return final_topic;
- }
- // Wait for ACLK connection to be established
- int aclk_wait_for_initialization()
- {
- if (unlikely(!aclk_connection_initialized)) {
- time_t now = now_realtime_sec();
- while (!aclk_connection_initialized && (now_realtime_sec() - now) < ACLK_INITIALIZATION_WAIT) {
- sleep_usec(USEC_PER_SEC * ACLK_INITIALIZATION_SLEEP_WAIT);
- _link_event_loop(0);
- }
- if (unlikely(!aclk_connection_initialized)) {
- error("ACLK connection cannot be established");
- return 1;
- }
- }
- return 0;
- }
- /*
- * This function will fetch the next pending command and process it
- *
- */
- int aclk_process_query()
- {
- struct aclk_query *this_query;
- static u_int64_t query_count = 0;
- //int rc;
- if (unlikely(cmdpause))
- return 0;
- if (!aclk_connection_initialized)
- return 0;
- this_query = aclk_queue_pop();
- if (likely(!this_query)) {
- //info("No pending queries");
- return 0;
- }
- if (unlikely(this_query->deleted)) {
- info("Garbage collect query %s:%s", this_query->topic, this_query->query);
- aclk_query_free(this_query);
- return 1;
- }
- query_count++;
- info(
- "Query #%d (%s) (%s) in queue %d seconds", (int) query_count, this_query->topic, this_query->query,
- (int) (now_realtime_sec() - this_query->created));
- if (strncmp((char *)this_query->query, "/api/v1/", 8) == 0) {
- struct web_client *w = (struct web_client *)callocz(1, sizeof(struct web_client));
- w->response.data = buffer_create(NETDATA_WEB_RESPONSE_INITIAL_SIZE);
- strcpy(w->origin, "*"); // Simulate web_client_create_on_fd()
- w->cookie1[0] = 0; // Simulate web_client_create_on_fd()
- w->cookie2[0] = 0; // Simulate web_client_create_on_fd()
- w->acl = 0x1f;
- char *mysep = strchr(this_query->query, '?');
- if (mysep) {
- strncpyz(w->decoded_query_string, mysep, NETDATA_WEB_REQUEST_URL_SIZE);
- *mysep = '\0';
- } else
- strncpyz(w->decoded_query_string, this_query->query, NETDATA_WEB_REQUEST_URL_SIZE);
- mysep = strrchr(this_query->query, '/');
- // TODO: ignore return code for now
- web_client_api_request_v1(localhost, w, mysep ? mysep + 1 : "noop");
- //TODO: handle bad response perhaps in a different way. For now it does to the payload
- //if (rc == HTTP_RESP_OK || 1) {
- buffer_flush(aclk_buffer);
- aclk_create_metadata_message(aclk_buffer, mysep ? mysep + 1 : "noop", this_query->msg_id, w->response.data);
- aclk_buffer->contenttype = CT_APPLICATION_JSON;
- aclk_send_message(this_query->topic, aclk_buffer->buffer);
- //} else
- // error("Query RESP: %s", w->response.data->buffer);
- buffer_free(w->response.data);
- freez(w);
- aclk_query_free(this_query);
- return 1;
- }
- if (strcmp((char *)this_query->topic, "_chart") == 0) {
- aclk_send_single_chart(this_query->data, this_query->query);
- }
- aclk_query_free(this_query);
- return 1;
- }
- // Launch a query processing thread
- /*
- * Process all pending queries
- * Return 0 if no queries were processed, 1 otherwise
- *
- */
- int aclk_process_queries()
- {
- if (unlikely(cmdpause))
- return 0;
- // Return if no queries pending
- if (likely(!aclk_queue.count))
- return 0;
- info("Processing %d queries", (int ) aclk_queue.count);
- while (aclk_process_query()) {
- //rc = _link_event_loop(0);
- };
- return 1;
- }
- static void aclk_query_thread_cleanup(void *ptr)
- {
- struct netdata_static_thread *static_thread = (struct netdata_static_thread *)ptr;
- static_thread->enabled = NETDATA_MAIN_THREAD_EXITING;
- info("cleaning up...");
- static_thread->enabled = NETDATA_MAIN_THREAD_EXITED;
- }
- /**
- * MAin query processing thread
- *
- */
- void *aclk_query_main_thread(void *ptr)
- {
- netdata_thread_cleanup_push(aclk_query_thread_cleanup, ptr);
- while (!netdata_exit) {
- QUERY_THREAD_LOCK;
- if (unlikely(!aclk_metadata_submitted)) {
- aclk_send_metadata();
- aclk_metadata_submitted = 1;
- }
- if (unlikely(pthread_cond_wait(&query_cond_wait, &query_lock_wait)))
- sleep_usec(USEC_PER_SEC * 1);
- if (likely(aclk_connection_initialized && !netdata_exit)) {
- while (aclk_process_queries()) {
- // Sleep for a few ms and retry maybe we have something to process
- // before going to sleep
- // TODO: This needs improvement to avoid missed queries
- sleep_usec(USEC_PER_MS * 100);
- }
- }
- QUERY_THREAD_UNLOCK;
- } // forever
- info("Shutting down query processing thread");
- netdata_thread_cleanup_pop(1);
- return NULL;
- }
- // Thread cleanup
- static void aclk_main_cleanup(void *ptr)
- {
- struct netdata_static_thread *static_thread = (struct netdata_static_thread *)ptr;
- static_thread->enabled = NETDATA_MAIN_THREAD_EXITING;
- info("cleaning up...");
- QUERY_THREAD_WAKEUP;
- static_thread->enabled = NETDATA_MAIN_THREAD_EXITED;
- }
- /**
- * Main agent cloud link thread
- *
- * This thread will simply call the main event loop that handles
- * pending requests - both inbound and outbound
- *
- * @param ptr is a pointer to the netdata_static_thread structure.
- *
- * @return It always returns NULL
- */
- void *aclk_main(void *ptr)
- {
- //netdata_thread_t *query_thread;
- struct netdata_static_thread query_thread;
- memset(&query_thread, 0, sizeof(query_thread));
- netdata_thread_cleanup_push(aclk_main_cleanup, ptr);
- if (unlikely(!aclk_buffer))
- aclk_buffer = buffer_create(NETDATA_WEB_RESPONSE_INITIAL_SIZE);
- assert(aclk_buffer != NULL);
- //netdata_thread_cleanup_push(aclk_query_thread_cleanup, ptr);
- //netdata_thread_create(&query_thread.thread , "ACLKQ", NETDATA_THREAD_OPTION_DEFAULT, aclk_query_main_thread, &query_thread);
- info("Waiting for netdata to be ready");
- while (!netdata_ready) {
- sleep_usec(USEC_PER_MS * 300);
- }
- info("Waiting %d seconds for the agent to initialize", ACLK_STARTUP_WAIT);
- sleep_usec(USEC_PER_SEC * ACLK_STARTUP_WAIT);
- // Ok mark we are ready to accept incoming requests
- waiting_init = 0;
- while (!netdata_exit) {
- // TODO: This may change when we have enough info from the claiming itself to avoid wasting 60 seconds
- // TODO: Handle the unclaim command as well -- we may need to shutdown the connection
- if (likely(!is_agent_claimed())) {
- sleep_usec(USEC_PER_SEC * 60);
- info("Checking agent claiming status");
- continue;
- }
- if (unlikely(!aclk_connection_initialized)) {
- static int initializing = 0;
- if (likely(initializing)) {
- _link_event_loop(ACLK_LOOP_TIMEOUT * 1000);
- continue;
- }
- initializing = 1;
- info("Initializing connection");
- //send_http_request(aclk_hostname, "443", "/auth/challenge?id=blah", aclk_buffer);
- if (unlikely(aclk_init(ACLK_INIT))) {
- // TODO: TBD how to handle. We are claimed and we cant init the connection. For now keep trying.
- sleep_usec(USEC_PER_SEC * 60);
- continue;
- } else {
- sleep_usec(USEC_PER_SEC * 1);
- }
- _link_event_loop(ACLK_LOOP_TIMEOUT * 1000);
- continue;
- }
- if (unlikely(!aclk_subscribed)) {
- aclk_subscribed = !aclk_subscribe(ACLK_COMMAND_TOPIC, 2);
- }
- if (unlikely(!query_thread.thread)) {
- query_thread.thread = mallocz(sizeof(netdata_thread_t));
- netdata_thread_create(
- query_thread.thread, "ACLKQ", NETDATA_THREAD_OPTION_DEFAULT, aclk_query_main_thread, &query_thread);
- }
- //TODO: Check if there is a return code
- _link_event_loop(ACLK_LOOP_TIMEOUT * 1000);
- } // forever
- aclk_shutdown();
- netdata_thread_cleanup_pop(1);
- return NULL;
- }
- /*
- * Send a message to the cloud, using a base topic and sib_topic
- * The final topic will be in the form <base_topic>/<sub_topic>
- * If base_topic is missing then the global_base_topic will be used (if available)
- *
- */
- int aclk_send_message(char *sub_topic, char *message)
- {
- int rc;
- static int skip_due_to_shutdown = 0;
- char topic[ACLK_MAX_TOPIC + 1];
- char *final_topic;
- if (!aclk_connection_initialized)
- return 0;
- if (unlikely(netdata_exit)) {
- if (unlikely(!aclk_connection_initialized))
- return 1;
- ++skip_due_to_shutdown;
- if (unlikely(!(skip_due_to_shutdown % 100)))
- info("%d messages not sent -- shutdown in progress", skip_due_to_shutdown);
- return 1;
- }
- if (unlikely(!message))
- return 0;
- if (unlikely(aclk_wait_for_initialization()))
- return 1;
- final_topic = get_topic(sub_topic, topic, ACLK_MAX_TOPIC);
- ACLK_LOCK;
- rc = _link_send_message(final_topic, message);
- ACLK_UNLOCK;
- // TODO: Add better handling -- error will flood the logfile here
- if (unlikely(rc))
- error("Failed to send message, error code %d (%s)", rc, _link_strerror(rc));
- return rc;
- }
- /*
- * Subscribe to a topic in the cloud
- * The final subscription will be in the form
- * /agent/claim_id/<sub_topic>
- */
- int aclk_subscribe(char *sub_topic, int qos)
- {
- int rc;
- //static char *global_base_topic = NULL;
- char topic[ACLK_MAX_TOPIC + 1];
- char *final_topic;
- if (!aclk_connection_initialized)
- return 0;
- if (unlikely(netdata_exit)) {
- return 1;
- }
- if (unlikely(aclk_wait_for_initialization()))
- return 1;
- final_topic = get_topic(sub_topic, topic, ACLK_MAX_TOPIC);
- ACLK_LOCK;
- rc = _link_subscribe(final_topic, qos);
- ACLK_UNLOCK;
- // TODO: Add better handling -- error will flood the logfile here
- if (unlikely(rc))
- error("Failed to send message, error code %d (%s)", rc, _link_strerror(rc));
- return rc;
- }
- // This is called from a callback when the link goes up
- void aclk_connect(void *ptr)
- {
- (void) ptr;
- info("Connection detected");
- return;
- }
- // This is called from a callback when the link goes down
- void aclk_disconnect(void *ptr)
- {
- (void) ptr;
- info("Disconnect detected");
- aclk_subscribed = 0;
- aclk_metadata_submitted = 0;
- }
- void aclk_shutdown()
- {
- info("Shutdown initiated");
- aclk_connection_initialized = 0;
- _link_shutdown();
- info("Shutdown complete");
- }
- int aclk_init(ACLK_INIT_ACTION action)
- {
- (void) action;
- static int init = 0;
- int rc;
- if (likely(init))
- return 0;
- aclk_send_maximum = config_get_number(CONFIG_SECTION_ACLK, "agent cloud link send maximum", 20);
- aclk_recv_maximum = config_get_number(CONFIG_SECTION_ACLK, "agent cloud link receive maximum", 20);
- aclk_hostname = config_get(CONFIG_SECTION_ACLK, "agent cloud link hostname", "localhost");
- aclk_port = config_get_number(CONFIG_SECTION_ACLK, "agent cloud link port", 1883);
- info("Maximum parallel outgoing messages %d", aclk_send_maximum);
- info("Maximum parallel incoming messages %d", aclk_recv_maximum);
- // This will setup the base publish topic internally
- //get_publish_base_topic(PUBLICH_TOPIC_GET);
- // initialize the low level link to the cloud
- rc = _link_lib_init(aclk_hostname, aclk_port, aclk_connect, aclk_disconnect);
- if (unlikely(rc)) {
- error("Failed to initialize the agent cloud link library");
- return 1;
- }
- global_base_topic = GET_PUBLISH_BASE_TOPIC;
- init = 1;
- return 0;
- }
- // Use this to disable encoding of quotes and newlines so that
- // MQTT subscriber can display more readable data on screen
- void aclk_create_header(BUFFER *dest, char *type, char *msg_id)
- {
- uuid_t uuid;
- char uuid_str[36 + 1];
- if (unlikely(!msg_id)) {
- uuid_generate(uuid);
- uuid_unparse(uuid, uuid_str);
- msg_id = uuid_str;
- }
- buffer_sprintf(
- dest,
- "\t{\"type\": \"%s\",\n"
- "\t\"msg-id\": \"%s\",\n"
- "\t\"version\": %s,\n"
- "\t\"payload\": ",
- type, msg_id, ACLK_VERSION);
- }
- #define EYE_FRIENDLY 1
- // encapsulate contents into metadata message as per ACLK documentation
- void aclk_create_metadata_message(BUFFER *dest, char *type, char *msg_id, BUFFER *contents)
- {
- #ifndef EYE_FRIENDLY
- char *tmp_buffer = mallocz(NETDATA_WEB_RESPONSE_INITIAL_SIZE);
- char *src, *dst;
- #endif
- buffer_sprintf(
- dest,
- "\t{\"type\": \"%s\",\n"
- "\t\"msg-id\": \"%s\",\n"
- "\t\"payload\": %s\n\t}",
- type, msg_id ? msg_id : "", contents->buffer);
- #ifndef EYE_FRIENDLY
- //TODO: this is the initial escaping, It will expanded
- src = dest->buffer;
- dst = tmp_buffer;
- while (*src) {
- switch (*src) {
- case '0x0a':
- case '\n':
- *dst++ = '\\';
- *dst++ = 'n';
- break;
- case '\"':
- *dst++ = '\\';
- *dst++ = '\"';
- break;
- case '\'':
- *dst++ = '\\';
- *dst++ = '\"';
- break;
- default:
- *dst++ = *src;
- }
- src++;
- }
- *dst = '\0';
- buffer_flush(dest);
- buffer_sprintf(dest, "%s", tmp_buffer);
- freez(tmp_buffer);
- #endif
- return;
- }
- //TODO: this has been changed in the latest specs. We need to pack the data in one MQTT
- //message with a payload and has a list of json objects
- int aclk_send_alarm_metadata()
- {
- //TODO: improve locking on the buffer -- same lock is used for the message send
- //improve error handling
- ACLK_LOCK;
- buffer_flush(aclk_buffer);
- // Alarms configuration
- aclk_create_header(aclk_buffer, "alarms", NULL);
- health_alarms2json(localhost, aclk_buffer, 1);
- buffer_sprintf(aclk_buffer,"\n}");
- ACLK_UNLOCK;
- aclk_send_message(ACLK_ALARMS_TOPIC, aclk_buffer->buffer);
- // Alarms log
- ACLK_LOCK;
- buffer_flush(aclk_buffer);
- aclk_create_header(aclk_buffer, "alarms_log", NULL);
- health_alarm_log2json(localhost, aclk_buffer, 0);
- buffer_sprintf(aclk_buffer,"\n}");
- ACLK_UNLOCK;
- aclk_send_message(ACLK_ALARMS_TOPIC, aclk_buffer->buffer);
- return 0;
- }
- // Send info metadata message to the cloud if the link is established
- // or on request
- int aclk_send_metadata()
- {
- ACLK_LOCK;
- buffer_flush(aclk_buffer);
- aclk_create_header(aclk_buffer, "connect", NULL);
- buffer_sprintf(aclk_buffer,"{\n\t \"info\" : ");
- web_client_api_request_v1_info_fill_buffer(localhost, aclk_buffer);
- buffer_sprintf(aclk_buffer,", \n\t \"charts\" : ");
- charts2json(localhost, aclk_buffer);
- buffer_sprintf(aclk_buffer,"\n}\n}");
- aclk_buffer->contenttype = CT_APPLICATION_JSON;
- ACLK_UNLOCK;
- aclk_send_message(ACLK_METADATA_TOPIC, aclk_buffer->buffer);
- aclk_send_alarm_metadata();
- return 0;
- }
- //rrd_stats_api_v1_chart(RRDSET *st, BUFFER *buf)
- int aclk_send_single_chart(char *hostname, char *chart)
- {
- RRDHOST *target_host;
- ACLK_LOCK;
- buffer_flush(aclk_buffer);
- target_host = rrdhost_find_by_hostname(hostname, 0);
- if (!target_host)
- return 1;
- RRDSET *st = rrdset_find(target_host, chart);
- if (!st)
- st = rrdset_find_byname(target_host, chart);
- if (!st) {
- info("FAILED to find chart %s", chart);
- return 1;
- }
- aclk_buffer->contenttype = CT_APPLICATION_JSON;
- buffer_flush(aclk_buffer);
- aclk_create_header(aclk_buffer, "chart", NULL);
- rrdset2json(st, aclk_buffer, NULL, NULL);
- buffer_sprintf(aclk_buffer,"\n}\n}");
- ACLK_UNLOCK;
- aclk_send_message(ACLK_METADATA_TOPIC, aclk_buffer->buffer);
- return 0;
- }
- int aclk_update_chart(RRDHOST *host, char *chart_name)
- {
- (void) host;
- (void) chart_name;
- #ifndef ENABLE_ACLK
- return 0;
- #else
- if (host != localhost)
- return 0;
- aclk_queue_query("_chart", host->hostname, NULL, chart_name, 2, 1);
- return 0;
- #endif
- }
- int aclk_update_alarm(RRDHOST *host, char *alarm_name)
- {
- if (host != localhost)
- return 0;
- aclk_queue_query("_alarm", host->hostname, NULL, alarm_name, 2, 1);
- return 0;
- }
- //TODO: add and check the incoming type e.g http
- int aclk_handle_cloud_request(char *payload)
- {
- struct aclk_request cloud_to_agent = { .msg_id = NULL, .topic = NULL, .url = NULL, .version = 1};
- int rc = json_parse(payload, &cloud_to_agent, cloud_to_agent_parse);
- if (unlikely(JSON_OK != rc)) {
- error("Malformed json request (%s)", payload);
- return 1;
- }
- if (unlikely(!cloud_to_agent.url || !cloud_to_agent.topic)) {
- return 1;
- }
- aclk_submit_request(&cloud_to_agent);
- return 0;
- }
|