|
@@ -22,6 +22,7 @@ static netdata_mutex_t queue_mutex = NETDATA_MUTEX_INITIALIZER;
|
|
|
|
|
|
struct aclk_query {
|
|
struct aclk_query {
|
|
usec_t created;
|
|
usec_t created;
|
|
|
|
+ struct timeval tv_in;
|
|
usec_t created_boot_time;
|
|
usec_t created_boot_time;
|
|
time_t run_after; // Delay run until after this time
|
|
time_t run_after; // Delay run until after this time
|
|
ACLK_CMD cmd; // What command is this
|
|
ACLK_CMD cmd; // What command is this
|
|
@@ -30,6 +31,7 @@ struct aclk_query {
|
|
char *msg_id; // msg_id generated by the cloud (NULL if internal)
|
|
char *msg_id; // msg_id generated by the cloud (NULL if internal)
|
|
char *query; // The actual query
|
|
char *query; // The actual query
|
|
u_char deleted; // Mark deleted for garbage collect
|
|
u_char deleted; // Mark deleted for garbage collect
|
|
|
|
+ int idx; // index of query thread
|
|
struct aclk_query *next;
|
|
struct aclk_query *next;
|
|
};
|
|
};
|
|
|
|
|
|
@@ -237,7 +239,8 @@ int aclk_queue_query(char *topic, void *data, char *msg_id, char *query, int run
|
|
|
|
|
|
new_query->data = data;
|
|
new_query->data = data;
|
|
new_query->next = NULL;
|
|
new_query->next = NULL;
|
|
- new_query->created = now_realtime_usec();
|
|
|
|
|
|
+ now_realtime_timeval(&new_query->tv_in);
|
|
|
|
+ new_query->created = (new_query->tv_in.tv_sec * USEC_PER_SEC) + new_query->tv_in.tv_usec;
|
|
new_query->created_boot_time = now_boottime_usec();
|
|
new_query->created_boot_time = now_boottime_usec();
|
|
new_query->run_after = run_after;
|
|
new_query->run_after = run_after;
|
|
|
|
|
|
@@ -325,6 +328,7 @@ static char *aclk_encode_response(char *src, size_t content_size, int keep_newli
|
|
#pragma region ACLK_QUERY
|
|
#pragma region ACLK_QUERY
|
|
#endif
|
|
#endif
|
|
|
|
|
|
|
|
+
|
|
static usec_t aclk_web_api_request_v1(RRDHOST *host, struct web_client *w, char *url, usec_t q_created)
|
|
static usec_t aclk_web_api_request_v1(RRDHOST *host, struct web_client *w, char *url, usec_t q_created)
|
|
{
|
|
{
|
|
usec_t t = now_boottime_usec();
|
|
usec_t t = now_boottime_usec();
|
|
@@ -360,8 +364,11 @@ static int aclk_execute_query(struct aclk_query *this_query)
|
|
mysep = strrchr(this_query->query, '/');
|
|
mysep = strrchr(this_query->query, '/');
|
|
|
|
|
|
// TODO: handle bad response perhaps in a different way. For now it does to the payload
|
|
// TODO: handle bad response perhaps in a different way. For now it does to the payload
|
|
- aclk_web_api_request_v1(localhost, w, mysep ? mysep + 1 : "noop", this_query->created_boot_time);
|
|
|
|
|
|
+ w->tv_in = this_query->tv_in;
|
|
now_realtime_timeval(&w->tv_ready);
|
|
now_realtime_timeval(&w->tv_ready);
|
|
|
|
+ aclk_web_api_request_v1(localhost, w, mysep ? mysep + 1 : "noop", this_query->created_boot_time);
|
|
|
|
+ size_t size = w->response.data->len;
|
|
|
|
+ size_t sent = size;
|
|
w->response.data->date = w->tv_ready.tv_sec;
|
|
w->response.data->date = w->tv_ready.tv_sec;
|
|
web_client_build_http_header(w); // TODO: this function should offset from date, not tv_ready
|
|
web_client_build_http_header(w); // TODO: this function should offset from date, not tv_ready
|
|
BUFFER *local_buffer = buffer_create(NETDATA_WEB_RESPONSE_INITIAL_SIZE);
|
|
BUFFER *local_buffer = buffer_create(NETDATA_WEB_RESPONSE_INITIAL_SIZE);
|
|
@@ -383,6 +390,24 @@ static int aclk_execute_query(struct aclk_query *this_query)
|
|
|
|
|
|
aclk_send_message(this_query->topic, local_buffer->buffer, this_query->msg_id);
|
|
aclk_send_message(this_query->topic, local_buffer->buffer, this_query->msg_id);
|
|
|
|
|
|
|
|
+ struct timeval tv;
|
|
|
|
+ now_realtime_timeval(&tv);
|
|
|
|
+
|
|
|
|
+ log_access("%llu: %d '[ACLK]:%d' '%s' (sent/all = %zu/%zu bytes %0.0f%%, prep/sent/total = %0.2f/%0.2f/%0.2f ms) %d '%s'",
|
|
|
|
+ w->id
|
|
|
|
+ , gettid()
|
|
|
|
+ , this_query->idx
|
|
|
|
+ , "DATA"
|
|
|
|
+ , sent
|
|
|
|
+ , size
|
|
|
|
+ , size > sent ? -((size > 0) ? (((size - sent) / (double) size) * 100.0) : 0.0) : ((size > 0) ? (((sent - size ) / (double) size) * 100.0) : 0.0)
|
|
|
|
+ , dt_usec(&w->tv_ready, &w->tv_in) / 1000.0
|
|
|
|
+ , dt_usec(&tv, &w->tv_ready) / 1000.0
|
|
|
|
+ , dt_usec(&tv, &w->tv_in) / 1000.0
|
|
|
|
+ , w->response.code
|
|
|
|
+ , strip_control_characters(this_query->query)
|
|
|
|
+ );
|
|
|
|
+
|
|
buffer_free(w->response.data);
|
|
buffer_free(w->response.data);
|
|
buffer_free(w->response.header);
|
|
buffer_free(w->response.header);
|
|
buffer_free(w->response.header_output);
|
|
buffer_free(w->response.header_output);
|
|
@@ -427,7 +452,11 @@ static int aclk_execute_query_v2(struct aclk_query *this_query)
|
|
mysep = strrchr(this_query->query, '/');
|
|
mysep = strrchr(this_query->query, '/');
|
|
|
|
|
|
// execute the query
|
|
// execute the query
|
|
|
|
+ w->tv_in = this_query->tv_in;
|
|
|
|
+ now_realtime_timeval(&w->tv_ready);
|
|
t = aclk_web_api_request_v1(cloud_req->host, w, mysep ? mysep + 1 : "noop", this_query->created_boot_time);
|
|
t = aclk_web_api_request_v1(cloud_req->host, w, mysep ? mysep + 1 : "noop", this_query->created_boot_time);
|
|
|
|
+ size_t size = (w->mode == WEB_CLIENT_MODE_FILECOPY)?w->response.rlen:w->response.data->len;
|
|
|
|
+ size_t sent = size;
|
|
|
|
|
|
#ifdef NETDATA_WITH_ZLIB
|
|
#ifdef NETDATA_WITH_ZLIB
|
|
// check if gzip encoding can and should be used
|
|
// check if gzip encoding can and should be used
|
|
@@ -476,7 +505,6 @@ static int aclk_execute_query_v2(struct aclk_query *this_query)
|
|
}
|
|
}
|
|
#endif
|
|
#endif
|
|
|
|
|
|
- now_realtime_timeval(&w->tv_ready);
|
|
|
|
w->response.data->date = w->tv_ready.tv_sec;
|
|
w->response.data->date = w->tv_ready.tv_sec;
|
|
web_client_build_http_header(w);
|
|
web_client_build_http_header(w);
|
|
local_buffer = buffer_create(NETDATA_WEB_RESPONSE_INITIAL_SIZE);
|
|
local_buffer = buffer_create(NETDATA_WEB_RESPONSE_INITIAL_SIZE);
|
|
@@ -493,6 +521,7 @@ static int aclk_execute_query_v2(struct aclk_query *this_query)
|
|
buffer_need_bytes(local_buffer, w->response.data->len);
|
|
buffer_need_bytes(local_buffer, w->response.data->len);
|
|
memcpy(&local_buffer->buffer[local_buffer->len], w->response.data->buffer, w->response.data->len);
|
|
memcpy(&local_buffer->buffer[local_buffer->len], w->response.data->buffer, w->response.data->len);
|
|
local_buffer->len += w->response.data->len;
|
|
local_buffer->len += w->response.data->len;
|
|
|
|
+ sent = sent - size + w->response.data->len;
|
|
} else {
|
|
} else {
|
|
#endif
|
|
#endif
|
|
buffer_strcat(local_buffer, w->response.data->buffer);
|
|
buffer_strcat(local_buffer, w->response.data->buffer);
|
|
@@ -503,6 +532,23 @@ static int aclk_execute_query_v2(struct aclk_query *this_query)
|
|
|
|
|
|
aclk_send_message_bin(this_query->topic, local_buffer->buffer, local_buffer->len, this_query->msg_id);
|
|
aclk_send_message_bin(this_query->topic, local_buffer->buffer, local_buffer->len, this_query->msg_id);
|
|
|
|
|
|
|
|
+ struct timeval tv;
|
|
|
|
+ now_realtime_timeval(&tv);
|
|
|
|
+
|
|
|
|
+ log_access("%llu: %d '[ACLK]:%d' '%s' (sent/all = %zu/%zu bytes %0.0f%%, prep/sent/total = %0.2f/%0.2f/%0.2f ms) %d '%s'",
|
|
|
|
+ w->id
|
|
|
|
+ , gettid()
|
|
|
|
+ , this_query->idx
|
|
|
|
+ , "DATA"
|
|
|
|
+ , sent
|
|
|
|
+ , size
|
|
|
|
+ , size > sent ? -((size > 0) ? (((size - sent) / (double) size) * 100.0) : 0.0) : ((size > 0) ? (((sent - size ) / (double) size) * 100.0) : 0.0)
|
|
|
|
+ , dt_usec(&w->tv_ready, &w->tv_in) / 1000.0
|
|
|
|
+ , dt_usec(&tv, &w->tv_ready) / 1000.0
|
|
|
|
+ , dt_usec(&tv, &w->tv_in) / 1000.0
|
|
|
|
+ , w->response.code
|
|
|
|
+ , strip_control_characters(this_query->query)
|
|
|
|
+ );
|
|
cleanup:
|
|
cleanup:
|
|
#ifdef NETDATA_WITH_ZLIB
|
|
#ifdef NETDATA_WITH_ZLIB
|
|
if(w->response.zinitialized)
|
|
if(w->response.zinitialized)
|
|
@@ -551,6 +597,7 @@ static int aclk_process_query(struct aclk_query_thread *t_info)
|
|
query_count++;
|
|
query_count++;
|
|
|
|
|
|
host = (RRDHOST*)this_query->data;
|
|
host = (RRDHOST*)this_query->data;
|
|
|
|
+ this_query->idx = t_info->idx;
|
|
|
|
|
|
debug(
|
|
debug(
|
|
D_ACLK, "Query #%ld (%s) size=%zu in queue %llu ms", query_count, this_query->topic,
|
|
D_ACLK, "Query #%ld (%s) size=%zu in queue %llu ms", query_count, this_query->topic,
|