sqlite_metadata.c 49 KB


  1. // SPDX-License-Identifier: GPL-3.0-or-later
  2. #include "sqlite_metadata.h"
  3. // SQL statements
  4. #define SQL_STORE_CLAIM_ID "INSERT INTO node_instance " \
  5. "(host_id, claim_id, date_created) VALUES (@host_id, @claim_id, unixepoch()) " \
  6. "ON CONFLICT(host_id) DO UPDATE SET claim_id = excluded.claim_id;"
  7. #define SQL_DELETE_HOST_LABELS "DELETE FROM host_label WHERE host_id = @uuid;"
  8. #define STORE_HOST_LABEL \
  9. "INSERT OR REPLACE INTO host_label (host_id, source_type, label_key, label_value, date_created) VALUES "
  10. #define STORE_CHART_LABEL \
  11. "INSERT OR REPLACE INTO chart_label (chart_id, source_type, label_key, label_value, date_created) VALUES "
  12. #define STORE_HOST_OR_CHART_LABEL_VALUE "(u2h('%s'), %d,'%s','%s', unixepoch())"
  13. #define DELETE_DIMENSION_UUID "DELETE FROM dimension WHERE dim_id = @uuid;"
  14. #define SQL_STORE_HOST_INFO "INSERT OR REPLACE INTO host " \
  15. "(host_id, hostname, registry_hostname, update_every, os, timezone," \
  16. "tags, hops, memory_mode, abbrev_timezone, utc_offset, program_name, program_version," \
  17. "entries, health_enabled) " \
  18. "values (@host_id, @hostname, @registry_hostname, @update_every, @os, @timezone, @tags, @hops, @memory_mode, " \
  19. "@abbrev_timezone, @utc_offset, @program_name, @program_version, " \
  20. "@entries, @health_enabled);"
  21. #define SQL_STORE_CHART "insert or replace into chart (chart_id, host_id, type, id, " \
  22. "name, family, context, title, unit, plugin, module, priority, update_every , chart_type , memory_mode , " \
  23. "history_entries) values (?1,?2,?3,?4,?5,?6,?7,?8,?9,?10,?11,?12,?13,?14,?15,?16);"
  24. #define SQL_STORE_DIMENSION "INSERT OR REPLACE INTO dimension (dim_id, chart_id, id, name, multiplier, divisor , algorithm, options) " \
  25. "VALUES (@dim_id, @chart_id, @id, @name, @multiplier, @divisor, @algorithm, @options);"
  26. #define SELECT_DIMENSION_LIST "SELECT dim_id, rowid FROM dimension WHERE rowid > @row_id"
  27. #define STORE_HOST_INFO "INSERT OR REPLACE INTO host_info (host_id, system_key, system_value, date_created) VALUES "
  28. #define STORE_HOST_INFO_VALUES "(u2h('%s'), '%s','%s', unixepoch())"
  29. #define MIGRATE_LOCALHOST_TO_NEW_MACHINE_GUID \
  30. "UPDATE chart SET host_id = @host_id WHERE host_id in (SELECT host_id FROM host where host_id <> @host_id and hops = 0);"
  31. #define DELETE_NON_EXISTING_LOCALHOST "DELETE FROM host WHERE hops = 0 AND host_id <> @host_id;"
  32. #define DELETE_MISSING_NODE_INSTANCES "DELETE FROM node_instance WHERE host_id NOT IN (SELECT host_id FROM host);"
  33. #define METADATA_CMD_Q_MAX_SIZE (1024) // Max queue size; callers will block until there is room
  34. #define METADATA_MAINTENANCE_FIRST_CHECK (1800) // Maintenance first run after agent startup in seconds
  35. #define METADATA_MAINTENANCE_RETRY (60) // Retry run if already running or last run did actual work
  36. #define METADATA_MAINTENANCE_INTERVAL (3600) // Repeat maintenance after latest successful
  37. #define METADATA_HOST_CHECK_FIRST_CHECK (5) // First check for pending metadata
  38. #define METADATA_HOST_CHECK_INTERVAL (30) // Repeat check for pending metadata
  39. #define METADATA_HOST_CHECK_IMMEDIATE (5) // Repeat immediate run because we have more metadata to write
  40. #define MAX_METADATA_CLEANUP (500) // Maximum metadata write operations (e.g deletes before retrying)
  41. #define METADATA_MAX_BATCH_SIZE (512) // Maximum commands to execute before running the event loop
  42. enum metadata_opcode {
  43. METADATA_DATABASE_NOOP = 0,
  44. METADATA_DATABASE_TIMER,
  45. METADATA_DEL_DIMENSION,
  46. METADATA_STORE_CLAIM_ID,
  47. METADATA_ADD_HOST_INFO,
  48. METADATA_SCAN_HOSTS,
  49. METADATA_MAINTENANCE,
  50. METADATA_SYNC_SHUTDOWN,
  51. METADATA_UNITTEST,
  52. // leave this last
  53. // we need it to check for worker utilization
  54. METADATA_MAX_ENUMERATIONS_DEFINED
  55. };
  56. #define MAX_PARAM_LIST (2)
  57. struct metadata_cmd {
  58. enum metadata_opcode opcode;
  59. struct completion *completion;
  60. const void *param[MAX_PARAM_LIST];
  61. };
  62. struct metadata_database_cmdqueue {
  63. unsigned head, tail;
  64. struct metadata_cmd cmd_array[METADATA_CMD_Q_MAX_SIZE];
  65. };
  66. typedef enum {
  67. METADATA_FLAG_CLEANUP = (1 << 0), // Cleanup is running
  68. METADATA_FLAG_SCANNING_HOSTS = (1 << 1), // Scanning of hosts in worker thread
  69. METADATA_FLAG_SHUTDOWN = (1 << 2), // Shutting down
  70. } METADATA_FLAG;
  71. #define METADATA_WORKER_BUSY (METADATA_FLAG_CLEANUP | METADATA_FLAG_SCANNING_HOSTS)
  72. struct metadata_wc {
  73. uv_thread_t thread;
  74. uv_loop_t *loop;
  75. uv_async_t async;
  76. uv_timer_t timer_req;
  77. time_t check_metadata_after;
  78. time_t check_hosts_after;
  79. volatile unsigned queue_size;
  80. METADATA_FLAG flags;
  81. uint64_t row_id;
  82. struct completion init_complete;
  83. /* FIFO command queue */
  84. uv_mutex_t cmd_mutex;
  85. uv_cond_t cmd_cond;
  86. struct metadata_database_cmdqueue cmd_queue;
  87. };
  88. #define metadata_flag_check(target_flags, flag) (__atomic_load_n(&((target_flags)->flags), __ATOMIC_SEQ_CST) & (flag))
  89. #define metadata_flag_set(target_flags, flag) __atomic_or_fetch(&((target_flags)->flags), (flag), __ATOMIC_SEQ_CST)
  90. #define metadata_flag_clear(target_flags, flag) __atomic_and_fetch(&((target_flags)->flags), ~(flag), __ATOMIC_SEQ_CST)
  91. //
  92. // For unittest
  93. //
  94. struct thread_unittest {
  95. int join;
  96. unsigned added;
  97. unsigned processed;
  98. unsigned *done;
  99. };
  100. // Metadata functions
  101. struct query_build {
  102. BUFFER *sql;
  103. int count;
  104. char uuid_str[UUID_STR_LEN];
  105. };
  106. static int host_label_store_to_sql_callback(const char *name, const char *value, RRDLABEL_SRC ls, void *data) {
  107. struct query_build *lb = data;
  108. if (unlikely(!lb->count))
  109. buffer_sprintf(lb->sql, STORE_HOST_LABEL);
  110. else
  111. buffer_strcat(lb->sql, ", ");
  112. buffer_sprintf(lb->sql, STORE_HOST_OR_CHART_LABEL_VALUE, lb->uuid_str, (int)ls & ~(RRDLABEL_FLAG_INTERNAL), name, value);
  113. lb->count++;
  114. return 1;
  115. }
  116. static int chart_label_store_to_sql_callback(const char *name, const char *value, RRDLABEL_SRC ls, void *data) {
  117. struct query_build *lb = data;
  118. if (unlikely(!lb->count))
  119. buffer_sprintf(lb->sql, STORE_CHART_LABEL);
  120. else
  121. buffer_strcat(lb->sql, ", ");
  122. buffer_sprintf(lb->sql, STORE_HOST_OR_CHART_LABEL_VALUE, lb->uuid_str, ls, name, value);
  123. lb->count++;
  124. return 1;
  125. }
  126. static void check_and_update_chart_labels(RRDSET *st, BUFFER *work_buffer)
  127. {
  128. size_t old_version = st->rrdlabels_last_saved_version;
  129. size_t new_version = dictionary_version(st->rrdlabels);
  130. if(new_version != old_version) {
  131. buffer_flush(work_buffer);
  132. struct query_build tmp = {.sql = work_buffer, .count = 0};
  133. uuid_unparse_lower(st->chart_uuid, tmp.uuid_str);
  134. rrdlabels_walkthrough_read(st->rrdlabels, chart_label_store_to_sql_callback, &tmp);
  135. st->rrdlabels_last_saved_version = new_version;
  136. db_execute(buffer_tostring(work_buffer));
  137. }
  138. }
  139. // Migrate all hosts with hops zero to this host_uuid
  140. void migrate_localhost(uuid_t *host_uuid)
  141. {
  142. int rc;
  143. rc = exec_statement_with_uuid(MIGRATE_LOCALHOST_TO_NEW_MACHINE_GUID, host_uuid);
  144. if (!rc)
  145. rc = exec_statement_with_uuid(DELETE_NON_EXISTING_LOCALHOST, host_uuid);
  146. if (!rc)
  147. db_execute(DELETE_MISSING_NODE_INSTANCES);
  148. }
  149. static void store_claim_id(uuid_t *host_id, uuid_t *claim_id)
  150. {
  151. sqlite3_stmt *res = NULL;
  152. int rc;
  153. if (unlikely(!db_meta)) {
  154. if (default_rrd_memory_mode == RRD_MEMORY_MODE_DBENGINE)
  155. error_report("Database has not been initialized");
  156. return;
  157. }
  158. rc = sqlite3_prepare_v2(db_meta, SQL_STORE_CLAIM_ID, -1, &res, 0);
  159. if (unlikely(rc != SQLITE_OK)) {
  160. error_report("Failed to prepare statement store chart labels");
  161. return;
  162. }
  163. rc = sqlite3_bind_blob(res, 1, host_id, sizeof(*host_id), SQLITE_STATIC);
  164. if (unlikely(rc != SQLITE_OK)) {
  165. error_report("Failed to bind host_id parameter to store node instance information");
  166. goto failed;
  167. }
  168. if (claim_id)
  169. rc = sqlite3_bind_blob(res, 2, claim_id, sizeof(*claim_id), SQLITE_STATIC);
  170. else
  171. rc = sqlite3_bind_null(res, 2);
  172. if (unlikely(rc != SQLITE_OK)) {
  173. error_report("Failed to bind claim_id parameter to store node instance information");
  174. goto failed;
  175. }
  176. rc = execute_insert(res);
  177. if (unlikely(rc != SQLITE_DONE))
  178. error_report("Failed to store node instance information, rc = %d", rc);
  179. failed:
  180. if (unlikely(sqlite3_finalize(res) != SQLITE_OK))
  181. error_report("Failed to finalize the prepared statement when storing node instance information");
  182. }
  183. static void delete_dimension_uuid(uuid_t *dimension_uuid)
  184. {
  185. static __thread sqlite3_stmt *res = NULL;
  186. int rc;
  187. if (unlikely(!res)) {
  188. rc = prepare_statement(db_meta, DELETE_DIMENSION_UUID, &res);
  189. if (rc != SQLITE_OK) {
  190. error_report("Failed to prepare statement to delete a dimension uuid");
  191. return;
  192. }
  193. }
  194. rc = sqlite3_bind_blob(res, 1, dimension_uuid, sizeof(*dimension_uuid), SQLITE_STATIC);
  195. if (unlikely(rc != SQLITE_OK))
  196. goto skip_execution;
  197. rc = sqlite3_step_monitored(res);
  198. if (unlikely(rc != SQLITE_DONE))
  199. error_report("Failed to delete dimension uuid, rc = %d", rc);
  200. skip_execution:
  201. rc = sqlite3_reset(res);
  202. if (unlikely(rc != SQLITE_OK))
  203. error_report("Failed to reset statement when deleting dimension UUID, rc = %d", rc);
  204. }
  205. //
  206. // Store host and host system info information in the database
  207. static int sql_store_host_info(RRDHOST *host)
  208. {
  209. static __thread sqlite3_stmt *res = NULL;
  210. int rc, param = 0;
  211. if (unlikely(!db_meta)) {
  212. if (default_rrd_memory_mode != RRD_MEMORY_MODE_DBENGINE)
  213. return 0;
  214. error_report("Database has not been initialized");
  215. return 1;
  216. }
  217. if (unlikely((!res))) {
  218. rc = prepare_statement(db_meta, SQL_STORE_HOST_INFO, &res);
  219. if (unlikely(rc != SQLITE_OK)) {
  220. error_report("Failed to prepare statement to store host, rc = %d", rc);
  221. return 1;
  222. }
  223. }
  224. rc = sqlite3_bind_blob(res, ++param, &host->host_uuid, sizeof(host->host_uuid), SQLITE_STATIC);
  225. if (unlikely(rc != SQLITE_OK))
  226. goto bind_fail;
  227. rc = bind_text_null(res, ++param, rrdhost_hostname(host), 0);
  228. if (unlikely(rc != SQLITE_OK))
  229. goto bind_fail;
  230. rc = bind_text_null(res, ++param, rrdhost_registry_hostname(host), 1);
  231. if (unlikely(rc != SQLITE_OK))
  232. goto bind_fail;
  233. rc = sqlite3_bind_int(res, ++param, host->rrd_update_every);
  234. if (unlikely(rc != SQLITE_OK))
  235. goto bind_fail;
  236. rc = bind_text_null(res, ++param, rrdhost_os(host), 1);
  237. if (unlikely(rc != SQLITE_OK))
  238. goto bind_fail;
  239. rc = bind_text_null(res, ++param, rrdhost_timezone(host), 1);
  240. if (unlikely(rc != SQLITE_OK))
  241. goto bind_fail;
  242. rc = bind_text_null(res, ++param, rrdhost_tags(host), 1);
  243. if (unlikely(rc != SQLITE_OK))
  244. goto bind_fail;
  245. rc = sqlite3_bind_int(res, ++param, host->system_info ? host->system_info->hops : 0);
  246. if (unlikely(rc != SQLITE_OK))
  247. goto bind_fail;
  248. rc = sqlite3_bind_int(res, ++param, host->rrd_memory_mode);
  249. if (unlikely(rc != SQLITE_OK))
  250. goto bind_fail;
  251. rc = bind_text_null(res, ++param, rrdhost_abbrev_timezone(host), 1);
  252. if (unlikely(rc != SQLITE_OK))
  253. goto bind_fail;
  254. rc = sqlite3_bind_int(res, ++param, host->utc_offset);
  255. if (unlikely(rc != SQLITE_OK))
  256. goto bind_fail;
  257. rc = bind_text_null(res, ++param, rrdhost_program_name(host), 1);
  258. if (unlikely(rc != SQLITE_OK))
  259. goto bind_fail;
  260. rc = bind_text_null(res, ++param, rrdhost_program_version(host), 1);
  261. if (unlikely(rc != SQLITE_OK))
  262. goto bind_fail;
  263. rc = sqlite3_bind_int64(res, ++param, host->rrd_history_entries);
  264. if (unlikely(rc != SQLITE_OK))
  265. goto bind_fail;
  266. rc = sqlite3_bind_int(res, ++param, (int ) host->health.health_enabled);
  267. if (unlikely(rc != SQLITE_OK))
  268. goto bind_fail;
  269. int store_rc = sqlite3_step_monitored(res);
  270. if (unlikely(store_rc != SQLITE_DONE))
  271. error_report("Failed to store host %s, rc = %d", rrdhost_hostname(host), rc);
  272. rc = sqlite3_reset(res);
  273. if (unlikely(rc != SQLITE_OK))
  274. error_report("Failed to reset statement to store host %s, rc = %d", rrdhost_hostname(host), rc);
  275. return !(store_rc == SQLITE_DONE);
  276. bind_fail:
  277. error_report("Failed to bind %d parameter to store host %s, rc = %d", param, rrdhost_hostname(host), rc);
  278. rc = sqlite3_reset(res);
  279. if (unlikely(rc != SQLITE_OK))
  280. error_report("Failed to reset statement to store host %s, rc = %d", rrdhost_hostname(host), rc);
  281. return 1;
  282. }
  283. static void sql_store_host_system_info_key_value(const char *name, const char *value, void *data)
  284. {
  285. struct query_build *lb = data;
  286. if (unlikely(!value))
  287. return;
  288. if (unlikely(!lb->count))
  289. buffer_sprintf(
  290. lb->sql, STORE_HOST_INFO);
  291. else
  292. buffer_strcat(lb->sql, ", ");
  293. buffer_sprintf(lb->sql, STORE_HOST_INFO_VALUES, lb->uuid_str, name, value);
  294. lb->count++;
  295. }
  296. static BUFFER *sql_store_host_system_info(RRDHOST *host)
  297. {
  298. struct rrdhost_system_info *system_info = host->system_info;
  299. if (unlikely(!system_info))
  300. return NULL;
  301. BUFFER *work_buffer = buffer_create(1024, &netdata_buffers_statistics.buffers_sqlite);
  302. struct query_build key_data = {.sql = work_buffer, .count = 0};
  303. uuid_unparse_lower(host->host_uuid, key_data.uuid_str);
  304. sql_store_host_system_info_key_value("NETDATA_CONTAINER_OS_NAME", system_info->container_os_name, &key_data);
  305. sql_store_host_system_info_key_value("NETDATA_CONTAINER_OS_ID", system_info->container_os_id, &key_data);
  306. sql_store_host_system_info_key_value("NETDATA_CONTAINER_OS_ID_LIKE", system_info->container_os_id_like, &key_data);
  307. sql_store_host_system_info_key_value("NETDATA_CONTAINER_OS_VERSION", system_info->container_os_version, &key_data);
  308. sql_store_host_system_info_key_value("NETDATA_CONTAINER_OS_VERSION_ID", system_info->container_os_version_id, &key_data);
  309. sql_store_host_system_info_key_value("NETDATA_CONTAINER_OS_DETECTION", system_info->host_os_detection, &key_data);
  310. sql_store_host_system_info_key_value("NETDATA_HOST_OS_NAME", system_info->host_os_name, &key_data);
  311. sql_store_host_system_info_key_value("NETDATA_HOST_OS_ID", system_info->host_os_id, &key_data);
  312. sql_store_host_system_info_key_value("NETDATA_HOST_OS_ID_LIKE", system_info->host_os_id_like, &key_data);
  313. sql_store_host_system_info_key_value("NETDATA_HOST_OS_VERSION", system_info->host_os_version, &key_data);
  314. sql_store_host_system_info_key_value("NETDATA_HOST_OS_VERSION_ID", system_info->host_os_version_id, &key_data);
  315. sql_store_host_system_info_key_value("NETDATA_HOST_OS_DETECTION", system_info->host_os_detection, &key_data);
  316. sql_store_host_system_info_key_value("NETDATA_SYSTEM_KERNEL_NAME", system_info->kernel_name, &key_data);
  317. sql_store_host_system_info_key_value("NETDATA_SYSTEM_CPU_LOGICAL_CPU_COUNT", system_info->host_cores, &key_data);
  318. sql_store_host_system_info_key_value("NETDATA_SYSTEM_CPU_FREQ", system_info->host_cpu_freq, &key_data);
  319. sql_store_host_system_info_key_value("NETDATA_SYSTEM_TOTAL_RAM", system_info->host_ram_total, &key_data);
  320. sql_store_host_system_info_key_value("NETDATA_SYSTEM_TOTAL_DISK_SIZE", system_info->host_disk_space, &key_data);
  321. sql_store_host_system_info_key_value("NETDATA_SYSTEM_KERNEL_VERSION", system_info->kernel_version, &key_data);
  322. sql_store_host_system_info_key_value("NETDATA_SYSTEM_ARCHITECTURE", system_info->architecture, &key_data);
  323. sql_store_host_system_info_key_value("NETDATA_SYSTEM_VIRTUALIZATION", system_info->virtualization, &key_data);
  324. sql_store_host_system_info_key_value("NETDATA_SYSTEM_VIRT_DETECTION", system_info->virt_detection, &key_data);
  325. sql_store_host_system_info_key_value("NETDATA_SYSTEM_CONTAINER", system_info->container, &key_data);
  326. sql_store_host_system_info_key_value("NETDATA_SYSTEM_CONTAINER_DETECTION", system_info->container_detection, &key_data);
  327. sql_store_host_system_info_key_value("NETDATA_HOST_IS_K8S_NODE", system_info->is_k8s_node, &key_data);
  328. return work_buffer;
  329. }
  330. /*
  331. * Store a chart in the database
  332. */
  333. static int sql_store_chart(
  334. uuid_t *chart_uuid, uuid_t *host_uuid, const char *type, const char *id, const char *name, const char *family,
  335. const char *context, const char *title, const char *units, const char *plugin, const char *module, long priority,
  336. int update_every, int chart_type, int memory_mode, long history_entries)
  337. {
  338. static __thread sqlite3_stmt *res = NULL;
  339. int rc, param = 0;
  340. if (unlikely(!db_meta)) {
  341. if (default_rrd_memory_mode != RRD_MEMORY_MODE_DBENGINE)
  342. return 0;
  343. error_report("Database has not been initialized");
  344. return 1;
  345. }
  346. if (unlikely(!res)) {
  347. rc = prepare_statement(db_meta, SQL_STORE_CHART, &res);
  348. if (unlikely(rc != SQLITE_OK)) {
  349. error_report("Failed to prepare statement to store chart, rc = %d", rc);
  350. return 1;
  351. }
  352. }
  353. param++;
  354. rc = sqlite3_bind_blob(res, 1, chart_uuid, sizeof(*chart_uuid), SQLITE_STATIC);
  355. if (unlikely(rc != SQLITE_OK))
  356. goto bind_fail;
  357. param++;
  358. rc = sqlite3_bind_blob(res, 2, host_uuid, sizeof(*host_uuid), SQLITE_STATIC);
  359. if (unlikely(rc != SQLITE_OK))
  360. goto bind_fail;
  361. param++;
  362. rc = sqlite3_bind_text(res, 3, type, -1, SQLITE_STATIC);
  363. if (unlikely(rc != SQLITE_OK))
  364. goto bind_fail;
  365. param++;
  366. rc = sqlite3_bind_text(res, 4, id, -1, SQLITE_STATIC);
  367. if (unlikely(rc != SQLITE_OK))
  368. goto bind_fail;
  369. param++;
  370. if (name && *name)
  371. rc = sqlite3_bind_text(res, 5, name, -1, SQLITE_STATIC);
  372. else
  373. rc = sqlite3_bind_null(res, 5);
  374. if (unlikely(rc != SQLITE_OK))
  375. goto bind_fail;
  376. param++;
  377. rc = sqlite3_bind_text(res, 6, family, -1, SQLITE_STATIC);
  378. if (unlikely(rc != SQLITE_OK))
  379. goto bind_fail;
  380. param++;
  381. rc = sqlite3_bind_text(res, 7, context, -1, SQLITE_STATIC);
  382. if (unlikely(rc != SQLITE_OK))
  383. goto bind_fail;
  384. param++;
  385. rc = sqlite3_bind_text(res, 8, title, -1, SQLITE_STATIC);
  386. if (unlikely(rc != SQLITE_OK))
  387. goto bind_fail;
  388. param++;
  389. rc = sqlite3_bind_text(res, 9, units, -1, SQLITE_STATIC);
  390. if (unlikely(rc != SQLITE_OK))
  391. goto bind_fail;
  392. param++;
  393. rc = sqlite3_bind_text(res, 10, plugin, -1, SQLITE_STATIC);
  394. if (unlikely(rc != SQLITE_OK))
  395. goto bind_fail;
  396. param++;
  397. rc = sqlite3_bind_text(res, 11, module, -1, SQLITE_STATIC);
  398. if (unlikely(rc != SQLITE_OK))
  399. goto bind_fail;
  400. param++;
  401. rc = sqlite3_bind_int(res, 12, (int) priority);
  402. if (unlikely(rc != SQLITE_OK))
  403. goto bind_fail;
  404. param++;
  405. rc = sqlite3_bind_int(res, 13, update_every);
  406. if (unlikely(rc != SQLITE_OK))
  407. goto bind_fail;
  408. param++;
  409. rc = sqlite3_bind_int(res, 14, chart_type);
  410. if (unlikely(rc != SQLITE_OK))
  411. goto bind_fail;
  412. param++;
  413. rc = sqlite3_bind_int(res, 15, memory_mode);
  414. if (unlikely(rc != SQLITE_OK))
  415. goto bind_fail;
  416. param++;
  417. rc = sqlite3_bind_int(res, 16, (int) history_entries);
  418. if (unlikely(rc != SQLITE_OK))
  419. goto bind_fail;
  420. rc = execute_insert(res);
  421. if (unlikely(rc != SQLITE_DONE))
  422. error_report("Failed to store chart, rc = %d", rc);
  423. rc = sqlite3_reset(res);
  424. if (unlikely(rc != SQLITE_OK))
  425. error_report("Failed to reset statement in chart store function, rc = %d", rc);
  426. return 0;
  427. bind_fail:
  428. error_report("Failed to bind parameter %d to store chart, rc = %d", param, rc);
  429. rc = sqlite3_reset(res);
  430. if (unlikely(rc != SQLITE_OK))
  431. error_report("Failed to reset statement in chart store function, rc = %d", rc);
  432. return 1;
  433. }
  434. /*
  435. * Store a dimension
  436. */
  437. static int sql_store_dimension(
  438. uuid_t *dim_uuid, uuid_t *chart_uuid, const char *id, const char *name, collected_number multiplier,
  439. collected_number divisor, int algorithm, bool hidden)
  440. {
  441. static __thread sqlite3_stmt *res = NULL;
  442. int rc, param = 0;
  443. if (unlikely(!db_meta)) {
  444. if (default_rrd_memory_mode != RRD_MEMORY_MODE_DBENGINE)
  445. return 0;
  446. error_report("Database has not been initialized");
  447. return 1;
  448. }
  449. if (unlikely(!res)) {
  450. rc = prepare_statement(db_meta, SQL_STORE_DIMENSION, &res);
  451. if (unlikely(rc != SQLITE_OK)) {
  452. error_report("Failed to prepare statement to store dimension, rc = %d", rc);
  453. return 1;
  454. }
  455. }
  456. rc = sqlite3_bind_blob(res, ++param, dim_uuid, sizeof(*dim_uuid), SQLITE_STATIC);
  457. if (unlikely(rc != SQLITE_OK))
  458. goto bind_fail;
  459. rc = sqlite3_bind_blob(res, ++param, chart_uuid, sizeof(*chart_uuid), SQLITE_STATIC);
  460. if (unlikely(rc != SQLITE_OK))
  461. goto bind_fail;
  462. rc = sqlite3_bind_text(res, ++param, id, -1, SQLITE_STATIC);
  463. if (unlikely(rc != SQLITE_OK))
  464. goto bind_fail;
  465. rc = sqlite3_bind_text(res, ++param, name, -1, SQLITE_STATIC);
  466. if (unlikely(rc != SQLITE_OK))
  467. goto bind_fail;
  468. rc = sqlite3_bind_int(res, ++param, (int) multiplier);
  469. if (unlikely(rc != SQLITE_OK))
  470. goto bind_fail;
  471. rc = sqlite3_bind_int(res, ++param, (int ) divisor);
  472. if (unlikely(rc != SQLITE_OK))
  473. goto bind_fail;
  474. rc = sqlite3_bind_int(res, ++param, algorithm);
  475. if (unlikely(rc != SQLITE_OK))
  476. goto bind_fail;
  477. if (hidden)
  478. rc = sqlite3_bind_text(res, ++param, "hidden", -1, SQLITE_STATIC);
  479. else
  480. rc = sqlite3_bind_null(res, ++param);
  481. if (unlikely(rc != SQLITE_OK))
  482. goto bind_fail;
  483. rc = execute_insert(res);
  484. if (unlikely(rc != SQLITE_DONE))
  485. error_report("Failed to store dimension, rc = %d", rc);
  486. rc = sqlite3_reset(res);
  487. if (unlikely(rc != SQLITE_OK))
  488. error_report("Failed to reset statement in store dimension, rc = %d", rc);
  489. return 0;
  490. bind_fail:
  491. error_report("Failed to bind parameter %d to store dimension, rc = %d", param, rc);
  492. rc = sqlite3_reset(res);
  493. if (unlikely(rc != SQLITE_OK))
  494. error_report("Failed to reset statement in store dimension, rc = %d", rc);
  495. return 1;
  496. }
  497. static bool dimension_can_be_deleted(uuid_t *dim_uuid __maybe_unused)
  498. {
  499. #ifdef ENABLE_DBENGINE
  500. if(dbengine_enabled) {
  501. bool no_retention = true;
  502. for (size_t tier = 0; tier < storage_tiers; tier++) {
  503. if (!multidb_ctx[tier])
  504. continue;
  505. time_t first_time_t = 0, last_time_t = 0;
  506. if (rrdeng_metric_retention_by_uuid((void *) multidb_ctx[tier], dim_uuid, &first_time_t, &last_time_t)) {
  507. if (first_time_t > 0) {
  508. no_retention = false;
  509. break;
  510. }
  511. }
  512. }
  513. return no_retention;
  514. }
  515. else
  516. return false;
  517. #else
  518. return false;
  519. #endif
  520. }
  521. static void check_dimension_metadata(struct metadata_wc *wc)
  522. {
  523. int rc;
  524. sqlite3_stmt *res = NULL;
  525. rc = sqlite3_prepare_v2(db_meta, SELECT_DIMENSION_LIST, -1, &res, 0);
  526. if (unlikely(rc != SQLITE_OK)) {
  527. error_report("Failed to prepare statement to fetch host dimensions");
  528. return;
  529. }
  530. rc = sqlite3_bind_int64(res, 1, (sqlite3_int64) wc->row_id);
  531. if (unlikely(rc != SQLITE_OK)) {
  532. error_report("Failed to row parameter");
  533. goto skip_run;
  534. }
  535. uint32_t total_checked = 0;
  536. uint32_t total_deleted= 0;
  537. uint64_t last_row_id = wc->row_id;
  538. info("METADATA: Checking dimensions starting after row %"PRIu64, wc->row_id);
  539. while (sqlite3_step_monitored(res) == SQLITE_ROW && total_deleted < MAX_METADATA_CLEANUP) {
  540. if (unlikely(metadata_flag_check(wc, METADATA_FLAG_SHUTDOWN)))
  541. break;
  542. last_row_id = sqlite3_column_int64(res, 1);
  543. rc = dimension_can_be_deleted((uuid_t *)sqlite3_column_blob(res, 0));
  544. if (rc == true) {
  545. delete_dimension_uuid((uuid_t *)sqlite3_column_blob(res, 0));
  546. total_deleted++;
  547. }
  548. total_checked++;
  549. }
  550. wc->row_id = last_row_id;
  551. time_t now = now_realtime_sec();
  552. if (total_deleted > 0) {
  553. wc->check_metadata_after = now + METADATA_MAINTENANCE_RETRY;
  554. } else
  555. wc->row_id = 0;
  556. info("METADATA: Checked %u, deleted %u -- will resume after row %"PRIu64" in %lld seconds", total_checked, total_deleted, wc->row_id,
  557. (long long)(wc->check_metadata_after - now));
  558. skip_run:
  559. rc = sqlite3_finalize(res);
  560. if (unlikely(rc != SQLITE_OK))
  561. error_report("Failed to finalize the prepared statement when reading dimensions");
  562. }
  563. static void cleanup_health_log(void)
  564. {
  565. RRDHOST *host;
  566. dfe_start_reentrant(rrdhost_root_index, host) {
  567. if (rrdhost_flag_check(host, RRDHOST_FLAG_ARCHIVED))
  568. continue;
  569. sql_health_alarm_log_cleanup(host);
  570. }
  571. dfe_done(host);
  572. }
  573. //
  574. // EVENT LOOP STARTS HERE
  575. //
  576. static uv_mutex_t metadata_async_lock;
  577. static void metadata_init_cmd_queue(struct metadata_wc *wc)
  578. {
  579. wc->cmd_queue.head = wc->cmd_queue.tail = 0;
  580. wc->queue_size = 0;
  581. fatal_assert(0 == uv_cond_init(&wc->cmd_cond));
  582. fatal_assert(0 == uv_mutex_init(&wc->cmd_mutex));
  583. }
  584. int metadata_enq_cmd_noblock(struct metadata_wc *wc, struct metadata_cmd *cmd)
  585. {
  586. unsigned queue_size;
  587. /* wait for free space in queue */
  588. uv_mutex_lock(&wc->cmd_mutex);
  589. if (cmd->opcode == METADATA_SYNC_SHUTDOWN) {
  590. metadata_flag_set(wc, METADATA_FLAG_SHUTDOWN);
  591. uv_mutex_unlock(&wc->cmd_mutex);
  592. return 0;
  593. }
  594. if (unlikely((queue_size = wc->queue_size) == METADATA_CMD_Q_MAX_SIZE ||
  595. metadata_flag_check(wc, METADATA_FLAG_SHUTDOWN))) {
  596. uv_mutex_unlock(&wc->cmd_mutex);
  597. return 1;
  598. }
  599. fatal_assert(queue_size < METADATA_CMD_Q_MAX_SIZE);
  600. /* enqueue command */
  601. wc->cmd_queue.cmd_array[wc->cmd_queue.tail] = *cmd;
  602. wc->cmd_queue.tail = wc->cmd_queue.tail != METADATA_CMD_Q_MAX_SIZE - 1 ?
  603. wc->cmd_queue.tail + 1 : 0;
  604. wc->queue_size = queue_size + 1;
  605. uv_mutex_unlock(&wc->cmd_mutex);
  606. return 0;
  607. }
  608. static void metadata_enq_cmd(struct metadata_wc *wc, struct metadata_cmd *cmd)
  609. {
  610. unsigned queue_size;
  611. /* wait for free space in queue */
  612. uv_mutex_lock(&wc->cmd_mutex);
  613. if (unlikely(metadata_flag_check(wc, METADATA_FLAG_SHUTDOWN))) {
  614. uv_mutex_unlock(&wc->cmd_mutex);
  615. (void) uv_async_send(&wc->async);
  616. return;
  617. }
  618. if (cmd->opcode == METADATA_SYNC_SHUTDOWN) {
  619. metadata_flag_set(wc, METADATA_FLAG_SHUTDOWN);
  620. uv_mutex_unlock(&wc->cmd_mutex);
  621. (void) uv_async_send(&wc->async);
  622. return;
  623. }
  624. while ((queue_size = wc->queue_size) == METADATA_CMD_Q_MAX_SIZE) {
  625. if (unlikely(metadata_flag_check(wc, METADATA_FLAG_SHUTDOWN))) {
  626. uv_mutex_unlock(&wc->cmd_mutex);
  627. return;
  628. }
  629. uv_cond_wait(&wc->cmd_cond, &wc->cmd_mutex);
  630. }
  631. fatal_assert(queue_size < METADATA_CMD_Q_MAX_SIZE);
  632. /* enqueue command */
  633. wc->cmd_queue.cmd_array[wc->cmd_queue.tail] = *cmd;
  634. wc->cmd_queue.tail = wc->cmd_queue.tail != METADATA_CMD_Q_MAX_SIZE - 1 ?
  635. wc->cmd_queue.tail + 1 : 0;
  636. wc->queue_size = queue_size + 1;
  637. uv_mutex_unlock(&wc->cmd_mutex);
  638. /* wake up event loop */
  639. (void) uv_async_send(&wc->async);
  640. }
  641. static struct metadata_cmd metadata_deq_cmd(struct metadata_wc *wc)
  642. {
  643. struct metadata_cmd ret;
  644. unsigned queue_size;
  645. uv_mutex_lock(&wc->cmd_mutex);
  646. queue_size = wc->queue_size;
  647. if (queue_size == 0) {
  648. memset(&ret, 0, sizeof(ret));
  649. ret.opcode = METADATA_DATABASE_NOOP;
  650. ret.completion = NULL;
  651. } else {
  652. /* dequeue command */
  653. ret = wc->cmd_queue.cmd_array[wc->cmd_queue.head];
  654. if (queue_size == 1) {
  655. wc->cmd_queue.head = wc->cmd_queue.tail = 0;
  656. } else {
  657. wc->cmd_queue.head = wc->cmd_queue.head != METADATA_CMD_Q_MAX_SIZE - 1 ?
  658. wc->cmd_queue.head + 1 : 0;
  659. }
  660. wc->queue_size = queue_size - 1;
  661. /* wake up producers */
  662. uv_cond_signal(&wc->cmd_cond);
  663. }
  664. uv_mutex_unlock(&wc->cmd_mutex);
  665. return ret;
  666. }
  667. static void async_cb(uv_async_t *handle)
  668. {
  669. uv_stop(handle->loop);
  670. uv_update_time(handle->loop);
  671. }
  672. #define TIMER_INITIAL_PERIOD_MS (1000)
  673. #define TIMER_REPEAT_PERIOD_MS (1000)
  674. static void timer_cb(uv_timer_t* handle)
  675. {
  676. uv_stop(handle->loop);
  677. uv_update_time(handle->loop);
  678. struct metadata_wc *wc = handle->data;
  679. struct metadata_cmd cmd;
  680. memset(&cmd, 0, sizeof(cmd));
  681. time_t now = now_realtime_sec();
  682. if (wc->check_metadata_after && wc->check_metadata_after < now) {
  683. cmd.opcode = METADATA_MAINTENANCE;
  684. if (!metadata_enq_cmd_noblock(wc, &cmd))
  685. wc->check_metadata_after = now + METADATA_MAINTENANCE_INTERVAL;
  686. }
  687. if (wc->check_hosts_after && wc->check_hosts_after < now) {
  688. cmd.opcode = METADATA_SCAN_HOSTS;
  689. if (!metadata_enq_cmd_noblock(wc, &cmd))
  690. wc->check_hosts_after = now + METADATA_HOST_CHECK_INTERVAL;
  691. }
  692. }
  693. static void after_metadata_cleanup(uv_work_t *req, int status)
  694. {
  695. UNUSED(status);
  696. struct metadata_wc *wc = req->data;
  697. metadata_flag_clear(wc, METADATA_FLAG_CLEANUP);
  698. }
  699. static void start_metadata_cleanup(uv_work_t *req)
  700. {
  701. register_libuv_worker_jobs();
  702. worker_is_busy(UV_EVENT_METADATA_CLEANUP);
  703. struct metadata_wc *wc = req->data;
  704. check_dimension_metadata(wc);
  705. cleanup_health_log();
  706. worker_is_idle();
  707. }
  708. struct scan_metadata_payload {
  709. uv_work_t request;
  710. struct metadata_wc *wc;
  711. struct completion *completion;
  712. uint32_t max_count;
  713. };
  714. // Callback after scan of hosts is done
  715. static void after_metadata_hosts(uv_work_t *req, int status __maybe_unused)
  716. {
  717. struct scan_metadata_payload *data = req->data;
  718. struct metadata_wc *wc = data->wc;
  719. metadata_flag_clear(wc, METADATA_FLAG_SCANNING_HOSTS);
  720. internal_error(true, "METADATA: scanning hosts complete");
  721. if (unlikely(data->completion)) {
  722. completion_mark_complete(data->completion);
  723. internal_error(true, "METADATA: Sending completion done");
  724. }
  725. freez(data);
  726. }
  727. static bool metadata_scan_host(RRDHOST *host, uint32_t max_count, size_t *query_counter) {
  728. RRDSET *st;
  729. int rc;
  730. bool more_to_do = false;
  731. uint32_t scan_count = 1;
  732. BUFFER *work_buffer = buffer_create(1024, &netdata_buffers_statistics.buffers_sqlite);
  733. rrdset_foreach_reentrant(st, host) {
  734. if (scan_count == max_count) {
  735. more_to_do = true;
  736. break;
  737. }
  738. if(rrdset_flag_check(st, RRDSET_FLAG_METADATA_UPDATE)) {
  739. (*query_counter)++;
  740. rrdset_flag_clear(st, RRDSET_FLAG_METADATA_UPDATE);
  741. scan_count++;
  742. check_and_update_chart_labels(st, work_buffer);
  743. rc = sql_store_chart(
  744. &st->chart_uuid,
  745. &st->rrdhost->host_uuid,
  746. string2str(st->parts.type),
  747. string2str(st->parts.id),
  748. string2str(st->parts.name),
  749. rrdset_family(st),
  750. rrdset_context(st),
  751. rrdset_title(st),
  752. rrdset_units(st),
  753. rrdset_plugin_name(st),
  754. rrdset_module_name(st),
  755. st->priority,
  756. st->update_every,
  757. st->chart_type,
  758. st->rrd_memory_mode,
  759. st->entries);
  760. if (unlikely(rc))
  761. internal_error(true, "METADATA: Failed to store chart metadata %s", string2str(st->id));
  762. }
  763. RRDDIM *rd;
  764. rrddim_foreach_read(rd, st) {
  765. if(rrddim_flag_check(rd, RRDDIM_FLAG_METADATA_UPDATE)) {
  766. (*query_counter)++;
  767. rrddim_flag_clear(rd, RRDDIM_FLAG_METADATA_UPDATE);
  768. if (rrddim_option_check(rd, RRDDIM_OPTION_HIDDEN))
  769. rrddim_flag_set(rd, RRDDIM_FLAG_META_HIDDEN);
  770. else
  771. rrddim_flag_clear(rd, RRDDIM_FLAG_META_HIDDEN);
  772. rc = sql_store_dimension(
  773. &rd->metric_uuid,
  774. &rd->rrdset->chart_uuid,
  775. string2str(rd->id),
  776. string2str(rd->name),
  777. rd->multiplier,
  778. rd->divisor,
  779. rd->algorithm,
  780. rrddim_option_check(rd, RRDDIM_OPTION_HIDDEN));
  781. if (unlikely(rc))
  782. error_report("METADATA: Failed to store dimension %s", string2str(rd->id));
  783. }
  784. }
  785. rrddim_foreach_done(rd);
  786. }
  787. rrdset_foreach_done(st);
  788. buffer_free(work_buffer);
  789. return more_to_do;
  790. }
  791. // Worker thread to scan hosts for pending metadata to store
  792. static void start_metadata_hosts(uv_work_t *req __maybe_unused)
  793. {
  794. register_libuv_worker_jobs();
  795. RRDHOST *host;
  796. struct scan_metadata_payload *data = req->data;
  797. struct metadata_wc *wc = data->wc;
  798. usec_t all_started_ut = now_monotonic_usec(); (void)all_started_ut;
  799. internal_error(true, "METADATA: checking all hosts...");
  800. bool run_again = false;
  801. worker_is_busy(UV_EVENT_METADATA_STORE);
  802. if (!data->max_count)
  803. db_execute("BEGIN TRANSACTION;");
  804. dfe_start_reentrant(rrdhost_root_index, host) {
  805. if (rrdhost_flag_check(host, RRDHOST_FLAG_ARCHIVED) || !rrdhost_flag_check(host, RRDHOST_FLAG_METADATA_UPDATE))
  806. continue;
  807. size_t query_counter = 0; (void)query_counter;
  808. usec_t started_ut = now_monotonic_usec(); (void)started_ut;
  809. rrdhost_flag_clear(host,RRDHOST_FLAG_METADATA_UPDATE);
  810. if (unlikely(rrdhost_flag_check(host, RRDHOST_FLAG_METADATA_LABELS))) {
  811. rrdhost_flag_clear(host, RRDHOST_FLAG_METADATA_LABELS);
  812. int rc = exec_statement_with_uuid(SQL_DELETE_HOST_LABELS, &host->host_uuid);
  813. if (likely(rc == SQLITE_OK)) {
  814. BUFFER *work_buffer = buffer_create(1024, &netdata_buffers_statistics.buffers_sqlite);
  815. struct query_build tmp = {.sql = work_buffer, .count = 0};
  816. uuid_unparse_lower(host->host_uuid, tmp.uuid_str);
  817. rrdlabels_walkthrough_read(host->rrdlabels, host_label_store_to_sql_callback, &tmp);
  818. db_execute(buffer_tostring(work_buffer));
  819. buffer_free(work_buffer);
  820. query_counter++;
  821. }
  822. }
  823. if (unlikely(rrdhost_flag_check(host, RRDHOST_FLAG_METADATA_CLAIMID))) {
  824. rrdhost_flag_clear(host, RRDHOST_FLAG_METADATA_CLAIMID);
  825. uuid_t uuid;
  826. if (likely(host->aclk_state.claimed_id && !uuid_parse(host->aclk_state.claimed_id, uuid)))
  827. store_claim_id(&host->host_uuid, &uuid);
  828. else
  829. store_claim_id(&host->host_uuid, NULL);
  830. query_counter++;
  831. }
  832. if (unlikely(rrdhost_flag_check(host, RRDHOST_FLAG_METADATA_INFO))) {
  833. rrdhost_flag_clear(host, RRDHOST_FLAG_METADATA_INFO);
  834. BUFFER *work_buffer = sql_store_host_system_info(host);
  835. if(work_buffer) {
  836. db_execute(buffer_tostring(work_buffer));
  837. buffer_free(work_buffer);
  838. query_counter++;
  839. }
  840. int rc = sql_store_host_info(host);
  841. if (unlikely(rc))
  842. error_report("METADATA: 'host:%s': failed to store host info", string2str(host->hostname));
  843. else
  844. query_counter++;
  845. }
  846. if (data->max_count)
  847. db_execute("BEGIN TRANSACTION;");
  848. if (unlikely(metadata_scan_host(host, data->max_count, &query_counter))) {
  849. run_again = true;
  850. rrdhost_flag_set(host,RRDHOST_FLAG_METADATA_UPDATE);
  851. internal_error(true,"METADATA: 'host:%s': scheduling another run, more charts to store", rrdhost_hostname(host));
  852. }
  853. if (data->max_count)
  854. db_execute("COMMIT TRANSACTION;");
  855. usec_t ended_ut = now_monotonic_usec(); (void)ended_ut;
  856. internal_error(true, "METADATA: 'host:%s': saved metadata with %zu SQL statements, in %0.2f ms",
  857. rrdhost_hostname(host), query_counter,
  858. (double)(ended_ut - started_ut) / USEC_PER_MS);
  859. }
  860. dfe_done(host);
  861. if (!data->max_count)
  862. db_execute("COMMIT TRANSACTION;");
  863. usec_t all_ended_ut = now_monotonic_usec(); (void)all_ended_ut;
  864. internal_error(true, "METADATA: checking all hosts completed in %0.2f ms",
  865. (double)(all_ended_ut - all_started_ut) / USEC_PER_MS);
  866. if (unlikely(run_again))
  867. wc->check_hosts_after = now_realtime_sec() + METADATA_HOST_CHECK_IMMEDIATE;
  868. else
  869. wc->check_hosts_after = now_realtime_sec() + METADATA_HOST_CHECK_INTERVAL;
  870. worker_is_idle();
  871. }
  872. static void metadata_event_loop(void *arg)
  873. {
  874. service_register(SERVICE_THREAD_TYPE_EVENT_LOOP, NULL, NULL, NULL, true);
  875. worker_register("METASYNC");
  876. worker_register_job_name(METADATA_DATABASE_NOOP, "noop");
  877. worker_register_job_name(METADATA_DATABASE_TIMER, "timer");
  878. worker_register_job_name(METADATA_DEL_DIMENSION, "delete dimension");
  879. worker_register_job_name(METADATA_STORE_CLAIM_ID, "add claim id");
  880. worker_register_job_name(METADATA_ADD_HOST_INFO, "add host info");
  881. worker_register_job_name(METADATA_MAINTENANCE, "maintenance");
  882. int ret;
  883. uv_loop_t *loop;
  884. unsigned cmd_batch_size;
  885. struct metadata_wc *wc = arg;
  886. enum metadata_opcode opcode;
  887. uv_work_t metadata_cleanup_worker;
  888. uv_thread_set_name_np(wc->thread, "METASYNC");
  889. loop = wc->loop = mallocz(sizeof(uv_loop_t));
  890. ret = uv_loop_init(loop);
  891. if (ret) {
  892. error("uv_loop_init(): %s", uv_strerror(ret));
  893. goto error_after_loop_init;
  894. }
  895. loop->data = wc;
  896. ret = uv_async_init(wc->loop, &wc->async, async_cb);
  897. if (ret) {
  898. error("uv_async_init(): %s", uv_strerror(ret));
  899. goto error_after_async_init;
  900. }
  901. wc->async.data = wc;
  902. ret = uv_timer_init(loop, &wc->timer_req);
  903. if (ret) {
  904. error("uv_timer_init(): %s", uv_strerror(ret));
  905. goto error_after_timer_init;
  906. }
  907. wc->timer_req.data = wc;
  908. fatal_assert(0 == uv_timer_start(&wc->timer_req, timer_cb, TIMER_INITIAL_PERIOD_MS, TIMER_REPEAT_PERIOD_MS));
  909. info("Starting metadata sync thread with %d entries command queue", METADATA_CMD_Q_MAX_SIZE);
  910. struct metadata_cmd cmd;
  911. memset(&cmd, 0, sizeof(cmd));
  912. metadata_flag_clear(wc, METADATA_FLAG_CLEANUP);
  913. metadata_flag_clear(wc, METADATA_FLAG_SCANNING_HOSTS);
  914. wc->check_metadata_after = now_realtime_sec() + METADATA_MAINTENANCE_FIRST_CHECK;
  915. wc->check_hosts_after = now_realtime_sec() + METADATA_HOST_CHECK_FIRST_CHECK;
  916. int shutdown = 0;
  917. wc->row_id = 0;
  918. completion_mark_complete(&wc->init_complete);
  919. while (shutdown == 0 || (wc->flags & METADATA_WORKER_BUSY)) {
  920. uuid_t *uuid;
  921. RRDHOST *host = NULL;
  922. int rc;
  923. worker_is_idle();
  924. uv_run(loop, UV_RUN_DEFAULT);
  925. /* wait for commands */
  926. cmd_batch_size = 0;
  927. do {
  928. if (unlikely(cmd_batch_size >= METADATA_MAX_BATCH_SIZE))
  929. break;
  930. cmd = metadata_deq_cmd(wc);
  931. opcode = cmd.opcode;
  932. if (unlikely(opcode == METADATA_DATABASE_NOOP && metadata_flag_check(wc, METADATA_FLAG_SHUTDOWN))) {
  933. shutdown = 1;
  934. continue;
  935. }
  936. ++cmd_batch_size;
  937. if (likely(opcode != METADATA_DATABASE_NOOP))
  938. worker_is_busy(opcode);
  939. switch (opcode) {
  940. case METADATA_DATABASE_NOOP:
  941. case METADATA_DATABASE_TIMER:
  942. break;
  943. case METADATA_DEL_DIMENSION:
  944. uuid = (uuid_t *) cmd.param[0];
  945. if (likely(dimension_can_be_deleted(uuid)))
  946. delete_dimension_uuid(uuid);
  947. freez(uuid);
  948. break;
  949. case METADATA_STORE_CLAIM_ID:
  950. store_claim_id((uuid_t *) cmd.param[0], (uuid_t *) cmd.param[1]);
  951. freez((void *) cmd.param[0]);
  952. freez((void *) cmd.param[1]);
  953. break;
  954. case METADATA_ADD_HOST_INFO:
  955. host = (RRDHOST *) cmd.param[0];
  956. rc = sql_store_host_info(host);
  957. if (unlikely(rc))
  958. error_report("Failed to store host info in the database for %s", string2str(host->hostname));
  959. break;
  960. case METADATA_SCAN_HOSTS:
  961. if (unlikely(metadata_flag_check(wc, METADATA_FLAG_SCANNING_HOSTS)))
  962. break;
  963. if (unittest_running)
  964. break;
  965. struct scan_metadata_payload *data = mallocz(sizeof(*data));
  966. data->request.data = data;
  967. data->wc = wc;
  968. data->completion = cmd.completion; // Completion by the worker
  969. if (unlikely(cmd.completion)) {
  970. data->max_count = 0; // 0 will process all pending updates
  971. cmd.completion = NULL; // Do not complete after launching worker (worker will do)
  972. }
  973. else
  974. data->max_count = 5000;
  975. metadata_flag_set(wc, METADATA_FLAG_SCANNING_HOSTS);
  976. if (unlikely(
  977. uv_queue_work(loop,&data->request,
  978. start_metadata_hosts,
  979. after_metadata_hosts))) {
  980. // Failed to launch worker -- let the event loop handle completion
  981. cmd.completion = data->completion;
  982. freez(data);
  983. metadata_flag_clear(wc, METADATA_FLAG_SCANNING_HOSTS);
  984. }
  985. break;
  986. case METADATA_MAINTENANCE:
  987. if (unlikely(metadata_flag_check(wc, METADATA_FLAG_CLEANUP)))
  988. break;
  989. metadata_cleanup_worker.data = wc;
  990. metadata_flag_set(wc, METADATA_FLAG_CLEANUP);
  991. if (unlikely(
  992. uv_queue_work(loop, &metadata_cleanup_worker, start_metadata_cleanup, after_metadata_cleanup))) {
  993. metadata_flag_clear(wc, METADATA_FLAG_CLEANUP);
  994. }
  995. break;
  996. case METADATA_UNITTEST:;
  997. struct thread_unittest *tu = (struct thread_unittest *) cmd.param[0];
  998. sleep_usec(1000); // processing takes 1ms
  999. __atomic_fetch_add(&tu->processed, 1, __ATOMIC_SEQ_CST);
  1000. break;
  1001. default:
  1002. break;
  1003. }
  1004. if (cmd.completion)
  1005. completion_mark_complete(cmd.completion);
  1006. } while (opcode != METADATA_DATABASE_NOOP);
  1007. }
  1008. if (!uv_timer_stop(&wc->timer_req))
  1009. uv_close((uv_handle_t *)&wc->timer_req, NULL);
  1010. /*
  1011. * uv_async_send after uv_close does not seem to crash in linux at the moment,
  1012. * it is however undocumented behaviour we need to be aware if this becomes
  1013. * an issue in the future.
  1014. */
  1015. uv_close((uv_handle_t *)&wc->async, NULL);
  1016. uv_run(loop, UV_RUN_DEFAULT);
  1017. uv_cond_destroy(&wc->cmd_cond);
  1018. int rc;
  1019. do {
  1020. rc = uv_loop_close(loop);
  1021. } while (rc != UV_EBUSY);
  1022. freez(loop);
  1023. worker_unregister();
  1024. info("METADATA: Shutting down event loop");
  1025. completion_mark_complete(&wc->init_complete);
  1026. return;
  1027. error_after_timer_init:
  1028. uv_close((uv_handle_t *)&wc->async, NULL);
  1029. error_after_async_init:
  1030. fatal_assert(0 == uv_loop_close(loop));
  1031. error_after_loop_init:
  1032. freez(loop);
  1033. worker_unregister();
  1034. }
  1035. struct metadata_wc metasync_worker = {.loop = NULL};
  1036. void metadata_sync_shutdown(void)
  1037. {
  1038. completion_init(&metasync_worker.init_complete);
  1039. struct metadata_cmd cmd;
  1040. memset(&cmd, 0, sizeof(cmd));
  1041. info("METADATA: Sending a shutdown command");
  1042. cmd.opcode = METADATA_SYNC_SHUTDOWN;
  1043. metadata_enq_cmd(&metasync_worker, &cmd);
  1044. /* wait for metadata thread to shut down */
  1045. info("METADATA: Waiting for shutdown ACK");
  1046. completion_wait_for(&metasync_worker.init_complete);
  1047. completion_destroy(&metasync_worker.init_complete);
  1048. info("METADATA: Shutdown complete");
  1049. }
  1050. void metadata_sync_shutdown_prepare(void)
  1051. {
  1052. struct metadata_cmd cmd;
  1053. memset(&cmd, 0, sizeof(cmd));
  1054. struct completion compl;
  1055. completion_init(&compl);
  1056. info("METADATA: Sending a scan host command");
  1057. uint32_t max_wait_iterations = 2000;
  1058. while (unlikely(metadata_flag_check(&metasync_worker, METADATA_FLAG_SCANNING_HOSTS)) && max_wait_iterations--) {
  1059. if (max_wait_iterations == 1999)
  1060. info("METADATA: Current worker is running; waiting to finish");
  1061. sleep_usec(1000);
  1062. }
  1063. cmd.opcode = METADATA_SCAN_HOSTS;
  1064. cmd.completion = &compl;
  1065. metadata_enq_cmd(&metasync_worker, &cmd);
  1066. info("METADATA: Waiting for host scan completion");
  1067. completion_wait_for(&compl);
  1068. completion_destroy(&compl);
  1069. info("METADATA: Host scan complete; can continue with shutdown");
  1070. }
  1071. // -------------------------------------------------------------
  1072. // Init function called on agent startup
  1073. void metadata_sync_init(void)
  1074. {
  1075. struct metadata_wc *wc = &metasync_worker;
  1076. fatal_assert(0 == uv_mutex_init(&metadata_async_lock));
  1077. memset(wc, 0, sizeof(*wc));
  1078. metadata_init_cmd_queue(wc);
  1079. completion_init(&wc->init_complete);
  1080. fatal_assert(0 == uv_thread_create(&(wc->thread), metadata_event_loop, wc));
  1081. completion_wait_for(&wc->init_complete);
  1082. completion_destroy(&wc->init_complete);
  1083. info("SQLite metadata sync initialization complete");
  1084. }
  1085. // Helpers
  1086. static inline void queue_metadata_cmd(enum metadata_opcode opcode, const void *param0, const void *param1)
  1087. {
  1088. struct metadata_cmd cmd;
  1089. cmd.opcode = opcode;
  1090. cmd.param[0] = param0;
  1091. cmd.param[1] = param1;
  1092. cmd.completion = NULL;
  1093. metadata_enq_cmd(&metasync_worker, &cmd);
  1094. }
  1095. // Public
  1096. void metaqueue_delete_dimension_uuid(uuid_t *uuid)
  1097. {
  1098. if (unlikely(!metasync_worker.loop))
  1099. return;
  1100. uuid_t *use_uuid = mallocz(sizeof(*uuid));
  1101. uuid_copy(*use_uuid, *uuid);
  1102. queue_metadata_cmd(METADATA_DEL_DIMENSION, use_uuid, NULL);
  1103. }
  1104. void metaqueue_store_claim_id(uuid_t *host_uuid, uuid_t *claim_uuid)
  1105. {
  1106. if (unlikely(!host_uuid))
  1107. return;
  1108. uuid_t *local_host_uuid = mallocz(sizeof(*host_uuid));
  1109. uuid_t *local_claim_uuid = NULL;
  1110. uuid_copy(*local_host_uuid, *host_uuid);
  1111. if (likely(claim_uuid)) {
  1112. local_claim_uuid = mallocz(sizeof(*claim_uuid));
  1113. uuid_copy(*local_claim_uuid, *claim_uuid);
  1114. }
  1115. queue_metadata_cmd(METADATA_STORE_CLAIM_ID, local_host_uuid, local_claim_uuid);
  1116. }
  1117. void metaqueue_host_update_info(RRDHOST *host)
  1118. {
  1119. if (unlikely(!metasync_worker.loop))
  1120. return;
  1121. queue_metadata_cmd(METADATA_ADD_HOST_INFO, host, NULL);
  1122. }
  1123. //
  1124. // unitests
  1125. //
  1126. static void *unittest_queue_metadata(void *arg) {
  1127. struct thread_unittest *tu = arg;
  1128. struct metadata_cmd cmd;
  1129. cmd.opcode = METADATA_UNITTEST;
  1130. cmd.param[0] = tu;
  1131. cmd.param[1] = NULL;
  1132. cmd.completion = NULL;
  1133. metadata_enq_cmd(&metasync_worker, &cmd);
  1134. do {
  1135. __atomic_fetch_add(&tu->added, 1, __ATOMIC_SEQ_CST);
  1136. metadata_enq_cmd(&metasync_worker, &cmd);
  1137. sleep_usec(10000);
  1138. } while (!__atomic_load_n(&tu->join, __ATOMIC_RELAXED));
  1139. return arg;
  1140. }
  1141. static void *metadata_unittest_threads(void)
  1142. {
  1143. unsigned done;
  1144. struct thread_unittest tu = {
  1145. .join = 0,
  1146. .added = 0,
  1147. .processed = 0,
  1148. .done = &done,
  1149. };
  1150. // Queue messages / Time it
  1151. time_t seconds_to_run = 5;
  1152. int threads_to_create = 4;
  1153. fprintf(
  1154. stderr,
  1155. "\nChecking metadata queue using %d threads for %lld seconds...\n",
  1156. threads_to_create,
  1157. (long long)seconds_to_run);
  1158. netdata_thread_t threads[threads_to_create];
  1159. tu.join = 0;
  1160. for (int i = 0; i < threads_to_create; i++) {
  1161. char buf[100 + 1];
  1162. snprintf(buf, 100, "META[%d]", i);
  1163. netdata_thread_create(
  1164. &threads[i],
  1165. buf,
  1166. NETDATA_THREAD_OPTION_DONT_LOG | NETDATA_THREAD_OPTION_JOINABLE,
  1167. unittest_queue_metadata,
  1168. &tu);
  1169. }
  1170. uv_async_send(&metasync_worker.async);
  1171. sleep_usec(seconds_to_run * USEC_PER_SEC);
  1172. __atomic_store_n(&tu.join, 1, __ATOMIC_RELAXED);
  1173. for (int i = 0; i < threads_to_create; i++) {
  1174. void *retval;
  1175. netdata_thread_join(threads[i], &retval);
  1176. }
  1177. sleep_usec(5 * USEC_PER_SEC);
  1178. fprintf(stderr, "Added %u elements, processed %u\n", tu.added, tu.processed);
  1179. return 0;
  1180. }
  1181. int metadata_unittest(void)
  1182. {
  1183. metadata_sync_init();
  1184. // Queue items for a specific period of time
  1185. metadata_unittest_threads();
  1186. fprintf(stderr, "Items still in queue %u\n", metasync_worker.queue_size);
  1187. metadata_sync_shutdown();
  1188. return 0;
  1189. }