Browse Source

Add Cancel Pending Request Message (#14953)

Timotej S 1 year ago
parent
commit
2a491f7932

+ 3 - 0
CMakeLists.txt

@@ -936,6 +936,8 @@ set(ACLK_FILES
         aclk/schema-wrappers/schema_wrappers.h
         aclk/schema-wrappers/schema_wrapper_utils.cc
         aclk/schema-wrappers/schema_wrapper_utils.h
+        aclk/schema-wrappers/agent_cmds.cc \
+        aclk/schema-wrappers/agent_cmds.h \
         aclk/helpers/mqtt_wss_pal.h
         aclk/helpers/ringbuffer_pal.h
         )
@@ -1259,6 +1261,7 @@ set(ACLK_PROTO_DEFS
         aclk/aclk-schemas/proto/nodeinstance/info/v1/info.proto
         aclk/aclk-schemas/proto/context/v1/context.proto
         aclk/aclk-schemas/proto/context/v1/stream.proto
+        aclk/aclk-schemas/proto/agent/v1/cmds.proto
         )
 PROTOBUF_ACLK_GENERATE_CPP(ACLK_PROTO_BUILT_SRCS ACLK_PROTO_BUILT_HDRS ${ACLK_PROTO_DEFS})
 

+ 9 - 0
Makefile.am

@@ -726,6 +726,8 @@ ACLK_FILES = \
     aclk/schema-wrappers/context_stream.h \
     aclk/schema-wrappers/context.cc \
     aclk/schema-wrappers/context.h \
+    aclk/schema-wrappers/agent_cmds.cc \
+    aclk/schema-wrappers/agent_cmds.h \
     $(NULL)
 
 noinst_LIBRARIES += libmqttwebsockets.a
@@ -768,6 +770,7 @@ ACLK_PROTO_DEFINITIONS = \
     aclk/aclk-schemas/proto/nodeinstance/info/v1/info.proto \
     aclk/aclk-schemas/proto/context/v1/context.proto \
     aclk/aclk-schemas/proto/context/v1/stream.proto \
+    aclk/aclk-schemas/proto/agent/v1/cmds.proto \
     $(NULL)
 
 dist_noinst_DATA += $(ACLK_PROTO_DEFINITIONS)
@@ -792,6 +795,8 @@ ACLK_PROTO_BUILT_FILES = aclk/aclk-schemas/proto/agent/v1/connection.pb.cc \
     aclk/aclk-schemas/proto/context/v1/context.pb.h \
     aclk/aclk-schemas/proto/context/v1/stream.pb.cc \
     aclk/aclk-schemas/proto/context/v1/stream.pb.h \
+    aclk/aclk-schemas/proto/agent/v1/cmds.pb.cc \
+    aclk/aclk-schemas/proto/agent/v1/cmds.pb.h \
     $(NULL)
 
 BUILT_SOURCES += $(ACLK_PROTO_BUILT_FILES)
@@ -838,6 +843,10 @@ aclk/aclk-schemas/proto/context/v1/stream.pb.cc \
 aclk/aclk-schemas/proto/context/v1/stream.pb.h: aclk/aclk-schemas/proto/context/v1/stream.proto
 	$(PROTOC) -I=aclk/aclk-schemas --cpp_out=$(builddir)/aclk/aclk-schemas $^
 
+aclk/aclk-schemas/proto/agent/v1/cmds.pb.cc \
+aclk/aclk-schemas/proto/agent/v1/cmds.pb.h: aclk/aclk-schemas/proto/agent/v1/cmds.proto
+	$(PROTOC) -I=aclk/aclk-schemas --cpp_out=$(builddir)/aclk/aclk-schemas $^
+
 endif #ENABLE_ACLK
 
 ACLK_ALWAYS_BUILD_FILES = \

+ 2 - 0
aclk/aclk_capas.c

@@ -15,6 +15,7 @@ const struct capability *aclk_get_agent_capas()
         { .name = "funcs",       .version = 1, .enabled = 1 },
         { .name = "http_api_v2", .version = 1, .enabled = 1 },
         { .name = "health",      .version = 1, .enabled = 0 },
+        { .name = "req_cancel",  .version = 1, .enabled = 1 },
         { .name = NULL,          .version = 0, .enabled = 0 }
     };
     agent_capabilities[2].version = ml_capable() ? 1 : 0;
@@ -40,6 +41,7 @@ struct capability *aclk_get_node_instance_capas(RRDHOST *host)
         { .name = "funcs",       .version = 0,                     .enabled = 0 },
         { .name = "http_api_v2", .version = 2,                     .enabled = 1 },
         { .name = "health",      .version = 1,                     .enabled = host->health.health_enabled },
+        { .name = "req_cancel",  .version = 1,                     .enabled = 1 },
         { .name = NULL,          .version = 0,                     .enabled = 0 }
     };
 

+ 81 - 0
aclk/aclk_query.c

@@ -13,6 +13,82 @@ pthread_mutex_t query_lock_wait = PTHREAD_MUTEX_INITIALIZER;
 #define QUERY_THREAD_LOCK pthread_mutex_lock(&query_lock_wait)
 #define QUERY_THREAD_UNLOCK pthread_mutex_unlock(&query_lock_wait)
 
+struct pending_req_list {
+    const char *msg_id;
+    uint32_t hash;
+
+    int canceled;
+
+    struct pending_req_list *next;
+};
+
+static struct pending_req_list *pending_req_list_head = NULL;
+static pthread_mutex_t pending_req_list_lock = PTHREAD_MUTEX_INITIALIZER;
+
+static struct pending_req_list *pending_req_list_add(const char *msg_id)
+{
+    struct pending_req_list *new = callocz(1, sizeof(struct pending_req_list));
+    new->msg_id = msg_id;
+    new->hash = simple_hash(msg_id);
+
+    pthread_mutex_lock(&pending_req_list_lock);
+    new->next = pending_req_list_head;
+    pending_req_list_head = new;
+    pthread_mutex_unlock(&pending_req_list_lock);
+    return new;
+}
+
+void pending_req_list_rm(const char *msg_id)
+{
+    uint32_t hash = simple_hash(msg_id);
+    struct pending_req_list *prev = NULL;
+
+    pthread_mutex_lock(&pending_req_list_lock);
+    struct pending_req_list *curr = pending_req_list_head;
+
+    while (curr) {
+        if (curr->hash == hash && strcmp(curr->msg_id, msg_id) == 0) {
+            if (prev)
+                prev->next = curr->next;
+            else
+                pending_req_list_head = curr->next;
+
+            freez(curr);
+            break;
+        }
+
+        prev = curr;
+        curr = curr->next;
+    }
+    pthread_mutex_unlock(&pending_req_list_lock);
+}
+
+int mark_pending_req_cancelled(const char *msg_id)
+{
+    uint32_t hash = simple_hash(msg_id);
+
+    pthread_mutex_lock(&pending_req_list_lock);
+    struct pending_req_list *curr = pending_req_list_head;
+
+    while (curr) {
+        if (curr->hash == hash && strcmp(curr->msg_id, msg_id) == 0) {
+            curr->canceled = 1;
+            pthread_mutex_unlock(&pending_req_list_lock);
+            return 0;
+        }
+
+        curr = curr->next;
+    }
+    pthread_mutex_unlock(&pending_req_list_lock);
+    return 1;
+}
+
+static bool aclk_web_client_interrupt_cb(struct web_client *w __maybe_unused, void *data)
+{
+    struct pending_req_list *req = (struct pending_req_list *)data;
+    return req->canceled;
+}
+
 static int http_api_v2(struct aclk_query_thread *query_thr, aclk_query_t query) {
     int retval = 0;
     BUFFER *local_buffer = NULL;
@@ -30,6 +106,9 @@ static int http_api_v2(struct aclk_query_thread *query_thr, aclk_query_t query)
     w->mode = WEB_CLIENT_MODE_GET;
     w->timings.tv_in = query->created_tv;
 
+    w->interrupt.callback = aclk_web_client_interrupt_cb;
+    w->interrupt.callback_data = pending_req_list_add(query->msg_id);
+
     usec_t t;
     web_client_timeout_checkpoint_set(w, query->timeout);
     if(web_client_timeout_checkpoint_and_check(w, &t)) {
@@ -168,6 +247,8 @@ cleanup:
 
     web_client_release_to_cache(w);
 
+    pending_req_list_rm(query->msg_id);
+
 #ifdef NETDATA_WITH_ZLIB
     buffer_free(z_buffer);
 #endif

+ 2 - 0
aclk/aclk_query.h

@@ -33,4 +33,6 @@ void aclk_query_threads_cleanup(struct aclk_query_threads *query_threads);
 
 const char *aclk_query_get_name(aclk_query_type_t qt, int unknown_ok);
 
+int mark_pending_req_cancelled(const char *msg_id);
+
 #endif //NETDATA_AGENT_CLOUD_LINK_H

+ 19 - 0
aclk/aclk_rx_msgs.c

@@ -6,6 +6,7 @@
 #include "aclk_query_queue.h"
 #include "aclk.h"
 #include "aclk_capas.h"
+#include "aclk_query.h"
 
 #include "schema-wrappers/proto_2_json.h"
 
@@ -446,6 +447,23 @@ int stop_streaming_contexts(const char *msg, size_t msg_len)
     return 0;
 }
 
+int cancel_pending_req(const char *msg, size_t msg_len)
+{
+    struct aclk_cancel_pending_req cmd;
+    if(parse_cancel_pending_req(msg, msg_len, &cmd)) {
+        error_report("Error parsing CancelPendingReq");
+        return 1;
+    }
+
+    log_access("ACLK CancelPendingRequest REQ: %s, cloud trace-id: %s", cmd.request_id, cmd.trace_id);
+
+    if (mark_pending_req_cancelled(cmd.request_id))
+        error_report("CancelPending Request for %s failed. No such pending request.", cmd.request_id);
+
+    free_cancel_pending_req(&cmd);
+    return 0;
+}
+
 typedef struct {
     const char *name;
     simple_hash_t name_hash;
@@ -466,6 +484,7 @@ new_cloud_rx_msg_t rx_msgs[] = {
     { .name = "DisconnectReq",             .name_hash = 0, .fnc = handle_disconnect_req        },
     { .name = "ContextsCheckpoint",        .name_hash = 0, .fnc = contexts_checkpoint          },
     { .name = "StopStreamingContexts",     .name_hash = 0, .fnc = stop_streaming_contexts      },
+    { .name = "CancelPendingRequest",      .name_hash = 0, .fnc = cancel_pending_req           },
     { .name = NULL,                        .name_hash = 0, .fnc = NULL                         },
 };
 

+ 38 - 0
aclk/schema-wrappers/agent_cmds.cc

@@ -0,0 +1,38 @@
+// SPDX-License-Identifier: GPL-3.0-or-later
+
+#include "proto/agent/v1/cmds.pb.h"
+
+#include "agent_cmds.h"
+
+#include "schema_wrapper_utils.h"
+
+using namespace agent::v1;
+
+int parse_cancel_pending_req(const char *msg, size_t msg_len, struct aclk_cancel_pending_req *req)
+{
+    CancelPendingRequest msg_parsed;
+
+    if (!msg_parsed.ParseFromArray(msg, msg_len)) {
+        error_report("Failed to parse CancelPendingRequest message");
+        return 1;
+    }
+
+    if (msg_parsed.request_id().c_str() == NULL) {
+        error_report("CancelPendingRequest message missing request_id");
+        return 1;
+    }
+    req->request_id = strdupz(msg_parsed.request_id().c_str());
+
+    if (msg_parsed.trace_id().c_str())
+            req->trace_id = strdupz(msg_parsed.trace_id().c_str());
+
+    set_timeval_from_google_timestamp(msg_parsed.timestamp(), &req->timestamp);
+
+    return 0;
+}
+
+void free_cancel_pending_req(struct aclk_cancel_pending_req *req)
+{
+    freez(req->request_id);
+    freez(req->trace_id);
+}

+ 27 - 0
aclk/schema-wrappers/agent_cmds.h

@@ -0,0 +1,27 @@
+// SPDX-License-Identifier: GPL-3.0-or-later
+
+#ifndef ACLK_SCHEMA_WRAPPERS_AGENT_CMDS_H
+#define ACLK_SCHEMA_WRAPPERS_AGENT_CMDS_H
+
+#include "libnetdata/libnetdata.h"
+
+#ifdef __cplusplus
+extern "C" {
+#endif
+
+struct aclk_cancel_pending_req {
+    char *request_id;
+
+    struct timeval timestamp;
+
+    char *trace_id;
+};
+
+int parse_cancel_pending_req(const char *msg, size_t msg_len, struct aclk_cancel_pending_req *req);
+void free_cancel_pending_req(struct aclk_cancel_pending_req *req);
+
+#ifdef __cplusplus
+}
+#endif
+
+#endif /* ACLK_SCHEMA_WRAPPERS_AGENT_CMDS_H */

+ 3 - 0
aclk/schema-wrappers/proto_2_json.cc

@@ -11,6 +11,7 @@
 #include "proto/nodeinstance/info/v1/info.pb.h"
 #include "proto/context/v1/stream.pb.h"
 #include "proto/context/v1/context.pb.h"
+#include "proto/agent/v1/cmds.pb.h"
 
 #include "libnetdata/libnetdata.h"
 
@@ -63,6 +64,8 @@ static google::protobuf::Message *msg_name_to_protomsg(const char *msgname)
         return new context::v1::ContextsCheckpoint;
     if (!strcmp(msgname, "StopStreamingContexts"))
         return new context::v1::StopStreamingContexts;
+    if (!strcmp(msgname, "CancelPendingRequest"))
+        return new agent::v1::CancelPendingRequest;
 
     return NULL;
 }

+ 1 - 0
aclk/schema-wrappers/schema_wrappers.h

@@ -14,5 +14,6 @@
 #include "capability.h"
 #include "context_stream.h"
 #include "context.h"
+#include "agent_cmds.h"
 
 #endif /* SCHEMA_WRAPPERS_H */