sqlite_aclk_node.c 7.1 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200
  1. // SPDX-License-Identifier: GPL-3.0-or-later
  2. #include "sqlite_functions.h"
  3. #include "sqlite_aclk_node.h"
  4. #include "../../aclk/aclk_contexts_api.h"
  5. #include "../../aclk/aclk_capas.h"
  6. #ifdef ENABLE_ACLK
  7. DICTIONARY *collectors_from_charts(RRDHOST *host, DICTIONARY *dict) {
  8. RRDSET *st;
  9. char name[500];
  10. rrdset_foreach_read(st, host)
  11. {
  12. if (rrdset_is_available_for_viewers(st)) {
  13. struct collector_info col = {.plugin = rrdset_plugin_name(st), .module = rrdset_module_name(st)};
  14. snprintfz(name, sizeof(name) - 1, "%s:%s", col.plugin, col.module);
  15. dictionary_set(dict, name, &col, sizeof(struct collector_info));
  16. }
  17. }
  18. rrdset_foreach_done(st);
  19. return dict;
  20. }
  21. static void build_node_collectors(RRDHOST *host)
  22. {
  23. struct aclk_sync_cfg_t *wc = host->aclk_config;
  24. struct update_node_collectors upd_node_collectors;
  25. DICTIONARY *dict = dictionary_create(DICT_OPTION_SINGLE_THREADED);
  26. upd_node_collectors.node_id = wc->node_id;
  27. upd_node_collectors.claim_id = get_agent_claimid();
  28. upd_node_collectors.node_collectors = collectors_from_charts(host, dict);
  29. aclk_update_node_collectors(&upd_node_collectors);
  30. dictionary_destroy(dict);
  31. freez(upd_node_collectors.claim_id);
  32. nd_log(NDLS_ACCESS, NDLP_DEBUG, "ACLK RES [%s (%s)]: NODE COLLECTORS SENT", wc->node_id, rrdhost_hostname(host));
  33. }
  34. static void build_node_info(RRDHOST *host)
  35. {
  36. struct update_node_info node_info;
  37. struct aclk_sync_cfg_t *wc = host->aclk_config;
  38. rrd_rdlock();
  39. node_info.node_id = wc->node_id;
  40. node_info.claim_id = get_agent_claimid();
  41. node_info.machine_guid = host->machine_guid;
  42. node_info.child = (host != localhost);
  43. node_info.ml_info.ml_capable = ml_capable();
  44. node_info.ml_info.ml_enabled = ml_enabled(host);
  45. node_info.node_instance_capabilities = aclk_get_node_instance_capas(host);
  46. now_realtime_timeval(&node_info.updated_at);
  47. char *host_version = NULL;
  48. if (host != localhost) {
  49. netdata_mutex_lock(&host->receiver_lock);
  50. host_version = strdupz(
  51. host->receiver && host->receiver->program_version ? host->receiver->program_version :
  52. rrdhost_program_version(host));
  53. netdata_mutex_unlock(&host->receiver_lock);
  54. }
  55. node_info.data.name = rrdhost_hostname(host);
  56. node_info.data.os = rrdhost_os(host);
  57. node_info.data.os_name = host->system_info->host_os_name;
  58. node_info.data.os_version = host->system_info->host_os_version;
  59. node_info.data.kernel_name = host->system_info->kernel_name;
  60. node_info.data.kernel_version = host->system_info->kernel_version;
  61. node_info.data.architecture = host->system_info->architecture;
  62. node_info.data.cpus = host->system_info->host_cores ? str2uint32_t(host->system_info->host_cores, NULL) : 0;
  63. node_info.data.cpu_frequency = host->system_info->host_cpu_freq ? host->system_info->host_cpu_freq : "0";
  64. node_info.data.memory = host->system_info->host_ram_total ? host->system_info->host_ram_total : "0";
  65. node_info.data.disk_space = host->system_info->host_disk_space ? host->system_info->host_disk_space : "0";
  66. node_info.data.version = host_version ? host_version : VERSION;
  67. node_info.data.release_channel = get_release_channel();
  68. node_info.data.timezone = rrdhost_abbrev_timezone(host);
  69. node_info.data.virtualization_type = host->system_info->virtualization ? host->system_info->virtualization : "unknown";
  70. node_info.data.container_type = host->system_info->container ? host->system_info->container : "unknown";
  71. node_info.data.custom_info = config_get(CONFIG_SECTION_WEB, "custom dashboard_info.js", "");
  72. node_info.data.machine_guid = host->machine_guid;
  73. struct capability node_caps[] = {
  74. {.name = "ml", .version = host->system_info->ml_capable, .enabled = host->system_info->ml_enabled},
  75. {.name = "mc",
  76. .version = host->system_info->mc_version ? host->system_info->mc_version : 0,
  77. .enabled = host->system_info->mc_version ? 1 : 0},
  78. {.name = NULL, .version = 0, .enabled = 0}};
  79. node_info.node_capabilities = node_caps;
  80. node_info.data.ml_info.ml_capable = host->system_info->ml_capable;
  81. node_info.data.ml_info.ml_enabled = host->system_info->ml_enabled;
  82. node_info.data.host_labels_ptr = host->rrdlabels;
  83. aclk_update_node_info(&node_info);
  84. nd_log(
  85. NDLS_ACCESS,
  86. NDLP_DEBUG,
  87. "ACLK RES [%s (%s)]: NODE INFO SENT for guid [%s] (%s)",
  88. wc->node_id,
  89. rrdhost_hostname(host),
  90. host->machine_guid,
  91. host == localhost ? "parent" : "child");
  92. rrd_unlock();
  93. freez(node_info.claim_id);
  94. freez(node_info.node_instance_capabilities);
  95. freez(host_version);
  96. wc->node_collectors_send = now_realtime_sec();
  97. }
  98. static bool host_is_replicating(RRDHOST *host)
  99. {
  100. bool replicating = false;
  101. RRDSET *st;
  102. rrdset_foreach_reentrant(st, host) {
  103. if (rrdset_is_replicating(st)) {
  104. replicating = true;
  105. break;
  106. }
  107. }
  108. rrdset_foreach_done(st);
  109. return replicating;
  110. }
  111. void aclk_check_node_info_and_collectors(void)
  112. {
  113. RRDHOST *host;
  114. if (unlikely(!aclk_connected))
  115. return;
  116. size_t context_loading = 0;
  117. size_t replicating = 0;
  118. size_t context_pp = 0;
  119. time_t now = now_realtime_sec();
  120. dfe_start_reentrant(rrdhost_root_index, host)
  121. {
  122. struct aclk_sync_cfg_t *wc = host->aclk_config;
  123. if (unlikely(!wc))
  124. continue;
  125. if (unlikely(rrdhost_flag_check(host, RRDHOST_FLAG_PENDING_CONTEXT_LOAD))) {
  126. internal_error(true, "ACLK SYNC: Context still pending for %s", rrdhost_hostname(host));
  127. context_loading++;
  128. continue;
  129. }
  130. if (unlikely(host_is_replicating(host))) {
  131. internal_error(true, "ACLK SYNC: Host %s is still replicating", rrdhost_hostname(host));
  132. replicating++;
  133. continue;
  134. }
  135. bool pp_queue_empty = !(host->rrdctx.pp_queue && dictionary_entries(host->rrdctx.pp_queue));
  136. if (!pp_queue_empty && (wc->node_info_send_time || wc->node_collectors_send))
  137. context_pp++;
  138. if (pp_queue_empty && wc->node_info_send_time && wc->node_info_send_time + 30 < now) {
  139. wc->node_info_send_time = 0;
  140. build_node_info(host);
  141. internal_error(true, "ACLK SYNC: Sending node info for %s", rrdhost_hostname(host));
  142. }
  143. if (pp_queue_empty && wc->node_collectors_send && wc->node_collectors_send + 30 < now) {
  144. build_node_collectors(host);
  145. internal_error(true, "ACLK SYNC: Sending collectors for %s", rrdhost_hostname(host));
  146. wc->node_collectors_send = 0;
  147. }
  148. }
  149. dfe_done(host);
  150. if (context_loading || replicating || context_pp) {
  151. nd_log_limit_static_thread_var(erl, 10, 100 * USEC_PER_MS);
  152. nd_log_limit(
  153. &erl,
  154. NDLS_DAEMON,
  155. NDLP_INFO,
  156. "%zu nodes loading contexts, %zu replicating data, %zu pending context post processing",
  157. context_loading,
  158. replicating,
  159. context_pp);
  160. }
  161. }
  162. #endif