sqlite_metadata.c 57 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737738739740741742743744745746747748749750751752753754755756757758759760761762763764765766767768769770771772773774775776777778779780781782783784785786787788789790791792793794795796797798799800801802803804805806807808809810811812813814815816817818819820821822823824825826827828829830831832833834835836837838839840841842843844845846847848849850851852853854855856857858859860861862863864865866867868869870871872873874875876877878879880881882883884885886887888889890891892893894895896897898899900901902903904905906907908909910911912913914915916917918919920921922923924925926927928929930931932933934935936937938939940941942943944945946947948949950951952953954955956957958959960961962963964965966967968969970971972973974975976977978979980981982983984985986987988989990991992993994995996997998999100010011002100310041005100610071008100910101011101210131014101510161017101810191020102110221023102410251026102710281029103010311032103310341035103610371038103910401041104210431044104510461047104810491050105110521053105410551056105710581059106010611062106310641065106610671068106910701071107210731074107510761077107810791080108110821083108410851086108710881089109010911092109310941095109610971098109911001101110211031104110511061107110811091110111111121113111411151116111711181119112011211122112311241125112611271128112911301131113211331134113511361137113811391140114111421143114411451146114711481149115011511152115311541155115611571158115911601161116211631164116511661167116811691170117111721173117411751176117711781179118011811182118311841185118611871188118911901191119211931194119511961197119811991200120112021203120412051206120712081209121012111212121312141215121612171218121912201221122212231224122512261227122812291230123112321233123412351236123712381239124012411242124312441245124612471248124912501251125212531254125512561257125812591260126112621263126412651266126712681269127012711272127312741275127612771278127912801281128212831284128512861287128812891290129112921293129412951296129712981299130013011302130313041305130613071308130913101311131213131314131513161317131813191320132113221323132413251326132713281329133013311332133313341335133613371338133913401341134213431344134513461347134813491350135113521353135413551356135713581359136013611362136313641365136613671368136913701371137213731374137513761377137813791380138113821383138413851386138713881389139013911392139313941395139613971398139914001401140214031404140514061407140814091410141114121413141414151416141714181419142014211422142314241425142614271428142914301431143214331434143514361437143814391440144114421443144414451446144714481449145014511452145314541455145614571458145914601461146214631464146514661467146814691470147114721473147414751476147714781479148014811482148314841485148614871488148914901491149214931494149514961497149814991500150115021503150415051506150715081509151015111512151315141515151615171518151915201521152215231524152515261527152815291530153115321533153415351536153715381539154015411542154315441545154615471548154915501551155215531554155515561557155815591560156115621563156415651566156715681569157015711572157315741575157615771578157915801581158215831584158515861587158815891590159115921593159415951596159715981599160016011602160316041605160616071608160916101611161216131614161516161617161816191620162116221623162416251626162716281629163016311632163316341635163616371638
  1. // SPDX-License-Identifier: GPL-3.0-or-later
  2. #include "sqlite_metadata.h"
  3. // SQL statements
  4. #define SQL_STORE_CLAIM_ID "INSERT INTO node_instance " \
  5. "(host_id, claim_id, date_created) VALUES (@host_id, @claim_id, unixepoch()) " \
  6. "ON CONFLICT(host_id) DO UPDATE SET claim_id = excluded.claim_id;"
  7. #define SQL_DELETE_HOST_LABELS "DELETE FROM host_label WHERE host_id = @uuid;"
  8. #define STORE_HOST_LABEL \
  9. "INSERT OR REPLACE INTO host_label (host_id, source_type, label_key, label_value, date_created) VALUES "
  10. #define STORE_CHART_LABEL \
  11. "INSERT OR REPLACE INTO chart_label (chart_id, source_type, label_key, label_value, date_created) VALUES "
  12. #define STORE_HOST_OR_CHART_LABEL_VALUE "(u2h('%s'), %d,'%s','%s', unixepoch())"
  13. #define DELETE_DIMENSION_UUID "DELETE FROM dimension WHERE dim_id = @uuid;"
  14. #define SQL_STORE_HOST_INFO "INSERT OR REPLACE INTO host " \
  15. "(host_id, hostname, registry_hostname, update_every, os, timezone," \
  16. "tags, hops, memory_mode, abbrev_timezone, utc_offset, program_name, program_version," \
  17. "entries, health_enabled) " \
  18. "values (@host_id, @hostname, @registry_hostname, @update_every, @os, @timezone, @tags, @hops, @memory_mode, " \
  19. "@abbrev_timezone, @utc_offset, @program_name, @program_version, " \
  20. "@entries, @health_enabled);"
  21. #define SQL_STORE_CHART "insert or replace into chart (chart_id, host_id, type, id, " \
  22. "name, family, context, title, unit, plugin, module, priority, update_every , chart_type , memory_mode , " \
  23. "history_entries) values (?1,?2,?3,?4,?5,?6,?7,?8,?9,?10,?11,?12,?13,?14,?15,?16);"
  24. #define SQL_STORE_DIMENSION "INSERT OR REPLACE INTO dimension (dim_id, chart_id, id, name, multiplier, divisor , algorithm, options) " \
  25. "VALUES (@dim_id, @chart_id, @id, @name, @multiplier, @divisor, @algorithm, @options);"
  26. #define SELECT_DIMENSION_LIST "SELECT dim_id, rowid FROM dimension WHERE rowid > @row_id"
  27. #define SQL_STORE_HOST_SYSTEM_INFO_VALUES "INSERT OR REPLACE INTO host_info (host_id, system_key, system_value, date_created) VALUES " \
  28. "(@uuid, @name, @value, unixepoch())"
  29. #define MIGRATE_LOCALHOST_TO_NEW_MACHINE_GUID \
  30. "UPDATE chart SET host_id = @host_id WHERE host_id in (SELECT host_id FROM host where host_id <> @host_id and hops = 0);"
  31. #define DELETE_NON_EXISTING_LOCALHOST "DELETE FROM host WHERE hops = 0 AND host_id <> @host_id;"
  32. #define DELETE_MISSING_NODE_INSTANCES "DELETE FROM node_instance WHERE host_id NOT IN (SELECT host_id FROM host);"
  33. #define METADATA_CMD_Q_MAX_SIZE (1024) // Max queue size; callers will block until there is room
  34. #define METADATA_MAINTENANCE_FIRST_CHECK (1800) // Maintenance first run after agent startup in seconds
  35. #define METADATA_MAINTENANCE_RETRY (60) // Retry run if already running or last run did actual work
  36. #define METADATA_MAINTENANCE_INTERVAL (3600) // Repeat maintenance after latest successful
  37. #define METADATA_HOST_CHECK_FIRST_CHECK (5) // First check for pending metadata
  38. #define METADATA_HOST_CHECK_INTERVAL (30) // Repeat check for pending metadata
  39. #define METADATA_HOST_CHECK_IMMEDIATE (5) // Repeat immediate run because we have more metadata to write
  40. #define MAX_METADATA_CLEANUP (500) // Maximum metadata write operations (e.g deletes before retrying)
  41. #define METADATA_MAX_BATCH_SIZE (512) // Maximum commands to execute before running the event loop
  42. enum metadata_opcode {
  43. METADATA_DATABASE_NOOP = 0,
  44. METADATA_DATABASE_TIMER,
  45. METADATA_DEL_DIMENSION,
  46. METADATA_STORE_CLAIM_ID,
  47. METADATA_ADD_HOST_INFO,
  48. METADATA_SCAN_HOSTS,
  49. METADATA_LOAD_HOST_CONTEXT,
  50. METADATA_MAINTENANCE,
  51. METADATA_SYNC_SHUTDOWN,
  52. METADATA_UNITTEST,
  53. METADATA_ML_LOAD_MODELS,
  54. // leave this last
  55. // we need it to check for worker utilization
  56. METADATA_MAX_ENUMERATIONS_DEFINED
  57. };
  58. #define MAX_PARAM_LIST (2)
  59. struct metadata_cmd {
  60. enum metadata_opcode opcode;
  61. struct completion *completion;
  62. const void *param[MAX_PARAM_LIST];
  63. };
  64. struct metadata_database_cmdqueue {
  65. unsigned head, tail;
  66. struct metadata_cmd cmd_array[METADATA_CMD_Q_MAX_SIZE];
  67. };
  68. typedef enum {
  69. METADATA_FLAG_CLEANUP = (1 << 0), // Cleanup is running
  70. METADATA_FLAG_SCANNING_HOSTS = (1 << 1), // Scanning of hosts in worker thread
  71. METADATA_FLAG_SHUTDOWN = (1 << 2), // Shutting down
  72. } METADATA_FLAG;
  73. #define METADATA_WORKER_BUSY (METADATA_FLAG_CLEANUP | METADATA_FLAG_SCANNING_HOSTS)
  74. struct metadata_wc {
  75. uv_thread_t thread;
  76. uv_loop_t *loop;
  77. uv_async_t async;
  78. uv_timer_t timer_req;
  79. time_t check_metadata_after;
  80. time_t check_hosts_after;
  81. volatile unsigned queue_size;
  82. METADATA_FLAG flags;
  83. uint64_t row_id;
  84. struct completion init_complete;
  85. /* FIFO command queue */
  86. uv_mutex_t cmd_mutex;
  87. uv_cond_t cmd_cond;
  88. struct metadata_database_cmdqueue cmd_queue;
  89. };
  90. #define metadata_flag_check(target_flags, flag) (__atomic_load_n(&((target_flags)->flags), __ATOMIC_SEQ_CST) & (flag))
  91. #define metadata_flag_set(target_flags, flag) __atomic_or_fetch(&((target_flags)->flags), (flag), __ATOMIC_SEQ_CST)
  92. #define metadata_flag_clear(target_flags, flag) __atomic_and_fetch(&((target_flags)->flags), ~(flag), __ATOMIC_SEQ_CST)
  93. //
  94. // For unittest
  95. //
  96. struct thread_unittest {
  97. int join;
  98. unsigned added;
  99. unsigned processed;
  100. unsigned *done;
  101. };
  102. // Metadata functions
  103. struct query_build {
  104. BUFFER *sql;
  105. int count;
  106. char uuid_str[UUID_STR_LEN];
  107. };
  108. static int host_label_store_to_sql_callback(const char *name, const char *value, RRDLABEL_SRC ls, void *data) {
  109. struct query_build *lb = data;
  110. if (unlikely(!lb->count))
  111. buffer_sprintf(lb->sql, STORE_HOST_LABEL);
  112. else
  113. buffer_strcat(lb->sql, ", ");
  114. buffer_sprintf(lb->sql, STORE_HOST_OR_CHART_LABEL_VALUE, lb->uuid_str, (int)ls & ~(RRDLABEL_FLAG_INTERNAL), name, value);
  115. lb->count++;
  116. return 1;
  117. }
  118. static int chart_label_store_to_sql_callback(const char *name, const char *value, RRDLABEL_SRC ls, void *data) {
  119. struct query_build *lb = data;
  120. if (unlikely(!lb->count))
  121. buffer_sprintf(lb->sql, STORE_CHART_LABEL);
  122. else
  123. buffer_strcat(lb->sql, ", ");
  124. buffer_sprintf(lb->sql, STORE_HOST_OR_CHART_LABEL_VALUE, lb->uuid_str, ls, name, value);
  125. lb->count++;
  126. return 1;
  127. }
  128. #define SQL_DELETE_CHART_LABEL "DELETE FROM chart_label WHERE chart_id = @chart_id;"
  129. #define SQL_DELETE_CHART_LABEL_HISTORY "DELETE FROM chart_label WHERE date_created < %ld AND chart_id = @chart_id;"
  130. static void clean_old_chart_labels(RRDSET *st)
  131. {
  132. char sql[512];
  133. time_t first_time_s = rrdset_first_entry_s(st);
  134. if (unlikely(!first_time_s))
  135. snprintfz(sql, 511,SQL_DELETE_CHART_LABEL);
  136. else
  137. snprintfz(sql, 511,SQL_DELETE_CHART_LABEL_HISTORY, first_time_s);
  138. int rc = exec_statement_with_uuid(sql, &st->chart_uuid);
  139. if (unlikely(rc))
  140. error_report("METADATA: 'host:%s' Failed to clean old labels for chart %s", rrdhost_hostname(st->rrdhost), rrdset_name(st));
  141. }
  142. static int check_and_update_chart_labels(RRDSET *st, BUFFER *work_buffer, size_t *query_counter)
  143. {
  144. size_t old_version = st->rrdlabels_last_saved_version;
  145. size_t new_version = dictionary_version(st->rrdlabels);
  146. if (new_version == old_version)
  147. return 0;
  148. struct query_build tmp = {.sql = work_buffer, .count = 0};
  149. uuid_unparse_lower(st->chart_uuid, tmp.uuid_str);
  150. rrdlabels_walkthrough_read(st->rrdlabels, chart_label_store_to_sql_callback, &tmp);
  151. int rc = db_execute(db_meta, buffer_tostring(work_buffer));
  152. if (likely(!rc)) {
  153. st->rrdlabels_last_saved_version = new_version;
  154. (*query_counter)++;
  155. }
  156. clean_old_chart_labels(st);
  157. return rc;
  158. }
  159. // Migrate all hosts with hops zero to this host_uuid
  160. void migrate_localhost(uuid_t *host_uuid)
  161. {
  162. int rc;
  163. rc = exec_statement_with_uuid(MIGRATE_LOCALHOST_TO_NEW_MACHINE_GUID, host_uuid);
  164. if (!rc)
  165. rc = exec_statement_with_uuid(DELETE_NON_EXISTING_LOCALHOST, host_uuid);
  166. if (!rc) {
  167. if (unlikely(db_execute(db_meta, DELETE_MISSING_NODE_INSTANCES)))
  168. error_report("Failed to remove deleted hosts from node instances");
  169. }
  170. }
  171. static int store_claim_id(uuid_t *host_id, uuid_t *claim_id)
  172. {
  173. sqlite3_stmt *res = NULL;
  174. int rc;
  175. if (unlikely(!db_meta)) {
  176. if (default_rrd_memory_mode == RRD_MEMORY_MODE_DBENGINE)
  177. error_report("Database has not been initialized");
  178. return 1;
  179. }
  180. rc = sqlite3_prepare_v2(db_meta, SQL_STORE_CLAIM_ID, -1, &res, 0);
  181. if (unlikely(rc != SQLITE_OK)) {
  182. error_report("Failed to prepare statement to store host claim id");
  183. return 1;
  184. }
  185. rc = sqlite3_bind_blob(res, 1, host_id, sizeof(*host_id), SQLITE_STATIC);
  186. if (unlikely(rc != SQLITE_OK)) {
  187. error_report("Failed to bind host_id parameter to store claim id");
  188. goto failed;
  189. }
  190. if (claim_id)
  191. rc = sqlite3_bind_blob(res, 2, claim_id, sizeof(*claim_id), SQLITE_STATIC);
  192. else
  193. rc = sqlite3_bind_null(res, 2);
  194. if (unlikely(rc != SQLITE_OK)) {
  195. error_report("Failed to bind claim_id parameter to host claim id");
  196. goto failed;
  197. }
  198. rc = execute_insert(res);
  199. if (unlikely(rc != SQLITE_DONE))
  200. error_report("Failed to store host claim id rc = %d", rc);
  201. failed:
  202. if (unlikely(sqlite3_finalize(res) != SQLITE_OK))
  203. error_report("Failed to finalize the prepared statement when storing a host claim id");
  204. return rc != SQLITE_DONE;
  205. }
  206. static void delete_dimension_uuid(uuid_t *dimension_uuid)
  207. {
  208. static __thread sqlite3_stmt *res = NULL;
  209. int rc;
  210. if (unlikely(!res)) {
  211. rc = prepare_statement(db_meta, DELETE_DIMENSION_UUID, &res);
  212. if (rc != SQLITE_OK) {
  213. error_report("Failed to prepare statement to delete a dimension uuid");
  214. return;
  215. }
  216. }
  217. rc = sqlite3_bind_blob(res, 1, dimension_uuid, sizeof(*dimension_uuid), SQLITE_STATIC);
  218. if (unlikely(rc != SQLITE_OK))
  219. goto skip_execution;
  220. rc = sqlite3_step_monitored(res);
  221. if (unlikely(rc != SQLITE_DONE))
  222. error_report("Failed to delete dimension uuid, rc = %d", rc);
  223. skip_execution:
  224. rc = sqlite3_reset(res);
  225. if (unlikely(rc != SQLITE_OK))
  226. error_report("Failed to reset statement when deleting dimension UUID, rc = %d", rc);
  227. }
  228. //
  229. // Store host and host system info information in the database
  230. static int store_host_metadata(RRDHOST *host)
  231. {
  232. static __thread sqlite3_stmt *res = NULL;
  233. int rc, param = 0;
  234. if (unlikely(!db_meta)) {
  235. if (default_rrd_memory_mode != RRD_MEMORY_MODE_DBENGINE)
  236. return 0;
  237. error_report("Database has not been initialized");
  238. return 1;
  239. }
  240. if (unlikely((!res))) {
  241. rc = prepare_statement(db_meta, SQL_STORE_HOST_INFO, &res);
  242. if (unlikely(rc != SQLITE_OK)) {
  243. error_report("Failed to prepare statement to store host, rc = %d", rc);
  244. return 1;
  245. }
  246. }
  247. rc = sqlite3_bind_blob(res, ++param, &host->host_uuid, sizeof(host->host_uuid), SQLITE_STATIC);
  248. if (unlikely(rc != SQLITE_OK))
  249. goto bind_fail;
  250. rc = bind_text_null(res, ++param, rrdhost_hostname(host), 0);
  251. if (unlikely(rc != SQLITE_OK))
  252. goto bind_fail;
  253. rc = bind_text_null(res, ++param, rrdhost_registry_hostname(host), 1);
  254. if (unlikely(rc != SQLITE_OK))
  255. goto bind_fail;
  256. rc = sqlite3_bind_int(res, ++param, host->rrd_update_every);
  257. if (unlikely(rc != SQLITE_OK))
  258. goto bind_fail;
  259. rc = bind_text_null(res, ++param, rrdhost_os(host), 1);
  260. if (unlikely(rc != SQLITE_OK))
  261. goto bind_fail;
  262. rc = bind_text_null(res, ++param, rrdhost_timezone(host), 1);
  263. if (unlikely(rc != SQLITE_OK))
  264. goto bind_fail;
  265. rc = bind_text_null(res, ++param, rrdhost_tags(host), 1);
  266. if (unlikely(rc != SQLITE_OK))
  267. goto bind_fail;
  268. rc = sqlite3_bind_int(res, ++param, host->system_info ? host->system_info->hops : 0);
  269. if (unlikely(rc != SQLITE_OK))
  270. goto bind_fail;
  271. rc = sqlite3_bind_int(res, ++param, host->rrd_memory_mode);
  272. if (unlikely(rc != SQLITE_OK))
  273. goto bind_fail;
  274. rc = bind_text_null(res, ++param, rrdhost_abbrev_timezone(host), 1);
  275. if (unlikely(rc != SQLITE_OK))
  276. goto bind_fail;
  277. rc = sqlite3_bind_int(res, ++param, host->utc_offset);
  278. if (unlikely(rc != SQLITE_OK))
  279. goto bind_fail;
  280. rc = bind_text_null(res, ++param, rrdhost_program_name(host), 1);
  281. if (unlikely(rc != SQLITE_OK))
  282. goto bind_fail;
  283. rc = bind_text_null(res, ++param, rrdhost_program_version(host), 1);
  284. if (unlikely(rc != SQLITE_OK))
  285. goto bind_fail;
  286. rc = sqlite3_bind_int64(res, ++param, host->rrd_history_entries);
  287. if (unlikely(rc != SQLITE_OK))
  288. goto bind_fail;
  289. rc = sqlite3_bind_int(res, ++param, (int ) host->health.health_enabled);
  290. if (unlikely(rc != SQLITE_OK))
  291. goto bind_fail;
  292. int store_rc = sqlite3_step_monitored(res);
  293. if (unlikely(store_rc != SQLITE_DONE))
  294. error_report("Failed to store host %s, rc = %d", rrdhost_hostname(host), rc);
  295. rc = sqlite3_reset(res);
  296. if (unlikely(rc != SQLITE_OK))
  297. error_report("Failed to reset statement to store host %s, rc = %d", rrdhost_hostname(host), rc);
  298. return store_rc != SQLITE_DONE;
  299. bind_fail:
  300. error_report("Failed to bind %d parameter to store host %s, rc = %d", param, rrdhost_hostname(host), rc);
  301. rc = sqlite3_reset(res);
  302. if (unlikely(rc != SQLITE_OK))
  303. error_report("Failed to reset statement to store host %s, rc = %d", rrdhost_hostname(host), rc);
  304. return 1;
  305. }
  306. static int add_host_sysinfo_key_value(const char *name, const char *value, uuid_t *uuid)
  307. {
  308. static __thread sqlite3_stmt *res = NULL;
  309. int rc, param = 0;
  310. if (unlikely(!db_meta)) {
  311. if (default_rrd_memory_mode != RRD_MEMORY_MODE_DBENGINE)
  312. return 0;
  313. error_report("Database has not been initialized");
  314. return 0;
  315. }
  316. if (unlikely((!res))) {
  317. rc = prepare_statement(db_meta, SQL_STORE_HOST_SYSTEM_INFO_VALUES, &res);
  318. if (unlikely(rc != SQLITE_OK)) {
  319. error_report("Failed to prepare statement to store host info values, rc = %d", rc);
  320. return 0;
  321. }
  322. }
  323. rc = sqlite3_bind_blob(res, ++param, uuid, sizeof(*uuid), SQLITE_STATIC);
  324. if (unlikely(rc != SQLITE_OK))
  325. goto bind_fail;
  326. rc = bind_text_null(res, ++param, name, 0);
  327. if (unlikely(rc != SQLITE_OK))
  328. goto bind_fail;
  329. rc = bind_text_null(res, ++param, value ? value : "unknown", 0);
  330. if (unlikely(rc != SQLITE_OK))
  331. goto bind_fail;
  332. int store_rc = sqlite3_step_monitored(res);
  333. if (unlikely(store_rc != SQLITE_DONE))
  334. error_report("Failed to store host info value %s, rc = %d", name, rc);
  335. rc = sqlite3_reset(res);
  336. if (unlikely(rc != SQLITE_OK))
  337. error_report("Failed to reset statement to store host info value %s, rc = %d", name, rc);
  338. return store_rc == SQLITE_DONE;
  339. bind_fail:
  340. error_report("Failed to bind %d parameter to store host info values %s, rc = %d", param, name, rc);
  341. rc = sqlite3_reset(res);
  342. if (unlikely(rc != SQLITE_OK))
  343. error_report("Failed to reset statement to store host info values %s, rc = %d", name, rc);
  344. return 0;
  345. }
  346. static bool store_host_systeminfo(RRDHOST *host)
  347. {
  348. struct rrdhost_system_info *system_info = host->system_info;
  349. if (unlikely(!system_info))
  350. return false;
  351. int ret = 0;
  352. ret += add_host_sysinfo_key_value("NETDATA_CONTAINER_OS_NAME", system_info->container_os_name, &host->host_uuid);
  353. ret += add_host_sysinfo_key_value("NETDATA_CONTAINER_OS_ID", system_info->container_os_id, &host->host_uuid);
  354. ret += add_host_sysinfo_key_value("NETDATA_CONTAINER_OS_ID_LIKE", system_info->container_os_id_like, &host->host_uuid);
  355. ret += add_host_sysinfo_key_value("NETDATA_CONTAINER_OS_VERSION", system_info->container_os_version, &host->host_uuid);
  356. ret += add_host_sysinfo_key_value("NETDATA_CONTAINER_OS_VERSION_ID", system_info->container_os_version_id, &host->host_uuid);
  357. ret += add_host_sysinfo_key_value("NETDATA_CONTAINER_OS_DETECTION", system_info->host_os_detection, &host->host_uuid);
  358. ret += add_host_sysinfo_key_value("NETDATA_HOST_OS_NAME", system_info->host_os_name, &host->host_uuid);
  359. ret += add_host_sysinfo_key_value("NETDATA_HOST_OS_ID", system_info->host_os_id, &host->host_uuid);
  360. ret += add_host_sysinfo_key_value("NETDATA_HOST_OS_ID_LIKE", system_info->host_os_id_like, &host->host_uuid);
  361. ret += add_host_sysinfo_key_value("NETDATA_HOST_OS_VERSION", system_info->host_os_version, &host->host_uuid);
  362. ret += add_host_sysinfo_key_value("NETDATA_HOST_OS_VERSION_ID", system_info->host_os_version_id, &host->host_uuid);
  363. ret += add_host_sysinfo_key_value("NETDATA_HOST_OS_DETECTION", system_info->host_os_detection, &host->host_uuid);
  364. ret += add_host_sysinfo_key_value("NETDATA_SYSTEM_KERNEL_NAME", system_info->kernel_name, &host->host_uuid);
  365. ret += add_host_sysinfo_key_value("NETDATA_SYSTEM_CPU_LOGICAL_CPU_COUNT", system_info->host_cores, &host->host_uuid);
  366. ret += add_host_sysinfo_key_value("NETDATA_SYSTEM_CPU_FREQ", system_info->host_cpu_freq, &host->host_uuid);
  367. ret += add_host_sysinfo_key_value("NETDATA_SYSTEM_TOTAL_RAM", system_info->host_ram_total, &host->host_uuid);
  368. ret += add_host_sysinfo_key_value("NETDATA_SYSTEM_TOTAL_DISK_SIZE", system_info->host_disk_space, &host->host_uuid);
  369. ret += add_host_sysinfo_key_value("NETDATA_SYSTEM_KERNEL_VERSION", system_info->kernel_version, &host->host_uuid);
  370. ret += add_host_sysinfo_key_value("NETDATA_SYSTEM_ARCHITECTURE", system_info->architecture, &host->host_uuid);
  371. ret += add_host_sysinfo_key_value("NETDATA_SYSTEM_VIRTUALIZATION", system_info->virtualization, &host->host_uuid);
  372. ret += add_host_sysinfo_key_value("NETDATA_SYSTEM_VIRT_DETECTION", system_info->virt_detection, &host->host_uuid);
  373. ret += add_host_sysinfo_key_value("NETDATA_SYSTEM_CONTAINER", system_info->container, &host->host_uuid);
  374. ret += add_host_sysinfo_key_value("NETDATA_SYSTEM_CONTAINER_DETECTION", system_info->container_detection, &host->host_uuid);
  375. ret += add_host_sysinfo_key_value("NETDATA_HOST_IS_K8S_NODE", system_info->is_k8s_node, &host->host_uuid);
  376. return !(24 == ret);
  377. }
  378. /*
  379. * Store a chart in the database
  380. */
  381. static int store_chart_metadata(RRDSET *st)
  382. {
  383. static __thread sqlite3_stmt *res = NULL;
  384. int rc, param = 0, store_rc = 0;
  385. if (unlikely(!db_meta)) {
  386. if (default_rrd_memory_mode != RRD_MEMORY_MODE_DBENGINE)
  387. return 0;
  388. error_report("Database has not been initialized");
  389. return 1;
  390. }
  391. if (unlikely(!res)) {
  392. rc = prepare_statement(db_meta, SQL_STORE_CHART, &res);
  393. if (unlikely(rc != SQLITE_OK)) {
  394. error_report("Failed to prepare statement to store chart, rc = %d", rc);
  395. return 1;
  396. }
  397. }
  398. rc = sqlite3_bind_blob(res, ++param, &st->chart_uuid, sizeof(st->chart_uuid), SQLITE_STATIC);
  399. if (unlikely(rc != SQLITE_OK))
  400. goto bind_fail;
  401. rc = sqlite3_bind_blob(res, ++param, &st->rrdhost->host_uuid, sizeof(st->rrdhost->host_uuid), SQLITE_STATIC);
  402. if (unlikely(rc != SQLITE_OK))
  403. goto bind_fail;
  404. rc = sqlite3_bind_text(res, ++param, string2str(st->parts.type), -1, SQLITE_STATIC);
  405. if (unlikely(rc != SQLITE_OK))
  406. goto bind_fail;
  407. rc = sqlite3_bind_text(res, ++param, string2str(st->parts.id), -1, SQLITE_STATIC);
  408. if (unlikely(rc != SQLITE_OK))
  409. goto bind_fail;
  410. const char *name = string2str(st->parts.name);
  411. if (name && *name)
  412. rc = sqlite3_bind_text(res, ++param, name, -1, SQLITE_STATIC);
  413. else
  414. rc = sqlite3_bind_null(res, ++param);
  415. if (unlikely(rc != SQLITE_OK))
  416. goto bind_fail;
  417. rc = sqlite3_bind_text(res, ++param, rrdset_family(st), -1, SQLITE_STATIC);
  418. if (unlikely(rc != SQLITE_OK))
  419. goto bind_fail;
  420. rc = sqlite3_bind_text(res, ++param, rrdset_context(st), -1, SQLITE_STATIC);
  421. if (unlikely(rc != SQLITE_OK))
  422. goto bind_fail;
  423. rc = sqlite3_bind_text(res, ++param, rrdset_title(st), -1, SQLITE_STATIC);
  424. if (unlikely(rc != SQLITE_OK))
  425. goto bind_fail;
  426. rc = sqlite3_bind_text(res, ++param, rrdset_units(st), -1, SQLITE_STATIC);
  427. if (unlikely(rc != SQLITE_OK))
  428. goto bind_fail;
  429. rc = sqlite3_bind_text(res, ++param, rrdset_plugin_name(st), -1, SQLITE_STATIC);
  430. if (unlikely(rc != SQLITE_OK))
  431. goto bind_fail;
  432. rc = sqlite3_bind_text(res, ++param, rrdset_module_name(st), -1, SQLITE_STATIC);
  433. if (unlikely(rc != SQLITE_OK))
  434. goto bind_fail;
  435. rc = sqlite3_bind_int(res, ++param, (int) st->priority);
  436. if (unlikely(rc != SQLITE_OK))
  437. goto bind_fail;
  438. rc = sqlite3_bind_int(res, ++param, st->update_every);
  439. if (unlikely(rc != SQLITE_OK))
  440. goto bind_fail;
  441. rc = sqlite3_bind_int(res, ++param, st->chart_type);
  442. if (unlikely(rc != SQLITE_OK))
  443. goto bind_fail;
  444. rc = sqlite3_bind_int(res, ++param, st->rrd_memory_mode);
  445. if (unlikely(rc != SQLITE_OK))
  446. goto bind_fail;
  447. rc = sqlite3_bind_int(res, ++param, (int) st->db.entries);
  448. if (unlikely(rc != SQLITE_OK))
  449. goto bind_fail;
  450. store_rc = execute_insert(res);
  451. if (unlikely(store_rc != SQLITE_DONE))
  452. error_report("Failed to store chart, rc = %d", store_rc);
  453. rc = sqlite3_reset(res);
  454. if (unlikely(rc != SQLITE_OK))
  455. error_report("Failed to reset statement in chart store function, rc = %d", rc);
  456. return store_rc != SQLITE_DONE;
  457. bind_fail:
  458. error_report("Failed to bind parameter %d to store chart, rc = %d", param, rc);
  459. rc = sqlite3_reset(res);
  460. if (unlikely(rc != SQLITE_OK))
  461. error_report("Failed to reset statement in chart store function, rc = %d", rc);
  462. return 1;
  463. }
  464. /*
  465. * Store a dimension
  466. */
  467. static int store_dimension_metadata(RRDDIM *rd)
  468. {
  469. static __thread sqlite3_stmt *res = NULL;
  470. int rc, param = 0;
  471. if (unlikely(!db_meta)) {
  472. if (default_rrd_memory_mode != RRD_MEMORY_MODE_DBENGINE)
  473. return 0;
  474. error_report("Database has not been initialized");
  475. return 1;
  476. }
  477. if (unlikely(!res)) {
  478. rc = prepare_statement(db_meta, SQL_STORE_DIMENSION, &res);
  479. if (unlikely(rc != SQLITE_OK)) {
  480. error_report("Failed to prepare statement to store dimension, rc = %d", rc);
  481. return 1;
  482. }
  483. }
  484. rc = sqlite3_bind_blob(res, ++param, &rd->metric_uuid, sizeof(rd->metric_uuid), SQLITE_STATIC);
  485. if (unlikely(rc != SQLITE_OK))
  486. goto bind_fail;
  487. rc = sqlite3_bind_blob(res, ++param, &rd->rrdset->chart_uuid, sizeof(rd->rrdset->chart_uuid), SQLITE_STATIC);
  488. if (unlikely(rc != SQLITE_OK))
  489. goto bind_fail;
  490. rc = sqlite3_bind_text(res, ++param, string2str(rd->id), -1, SQLITE_STATIC);
  491. if (unlikely(rc != SQLITE_OK))
  492. goto bind_fail;
  493. rc = sqlite3_bind_text(res, ++param, string2str(rd->name), -1, SQLITE_STATIC);
  494. if (unlikely(rc != SQLITE_OK))
  495. goto bind_fail;
  496. rc = sqlite3_bind_int(res, ++param, (int) rd->multiplier);
  497. if (unlikely(rc != SQLITE_OK))
  498. goto bind_fail;
  499. rc = sqlite3_bind_int(res, ++param, (int ) rd->divisor);
  500. if (unlikely(rc != SQLITE_OK))
  501. goto bind_fail;
  502. rc = sqlite3_bind_int(res, ++param, rd->algorithm);
  503. if (unlikely(rc != SQLITE_OK))
  504. goto bind_fail;
  505. if (rrddim_option_check(rd, RRDDIM_OPTION_HIDDEN))
  506. rc = sqlite3_bind_text(res, ++param, "hidden", -1, SQLITE_STATIC);
  507. else
  508. rc = sqlite3_bind_null(res, ++param);
  509. if (unlikely(rc != SQLITE_OK))
  510. goto bind_fail;
  511. rc = execute_insert(res);
  512. if (unlikely(rc != SQLITE_DONE))
  513. error_report("Failed to store dimension, rc = %d", rc);
  514. rc = sqlite3_reset(res);
  515. if (unlikely(rc != SQLITE_OK))
  516. error_report("Failed to reset statement in store dimension, rc = %d", rc);
  517. return 0;
  518. bind_fail:
  519. error_report("Failed to bind parameter %d to store dimension, rc = %d", param, rc);
  520. rc = sqlite3_reset(res);
  521. if (unlikely(rc != SQLITE_OK))
  522. error_report("Failed to reset statement in store dimension, rc = %d", rc);
  523. return 1;
  524. }
  525. static bool dimension_can_be_deleted(uuid_t *dim_uuid __maybe_unused)
  526. {
  527. #ifdef ENABLE_DBENGINE
  528. if(dbengine_enabled) {
  529. bool no_retention = true;
  530. for (size_t tier = 0; tier < storage_tiers; tier++) {
  531. if (!multidb_ctx[tier])
  532. continue;
  533. time_t first_time_t = 0, last_time_t = 0;
  534. if (rrdeng_metric_retention_by_uuid((void *) multidb_ctx[tier], dim_uuid, &first_time_t, &last_time_t)) {
  535. if (first_time_t > 0) {
  536. no_retention = false;
  537. break;
  538. }
  539. }
  540. }
  541. return no_retention;
  542. }
  543. else
  544. return false;
  545. #else
  546. return false;
  547. #endif
  548. }
  549. static void check_dimension_metadata(struct metadata_wc *wc)
  550. {
  551. int rc;
  552. sqlite3_stmt *res = NULL;
  553. rc = sqlite3_prepare_v2(db_meta, SELECT_DIMENSION_LIST, -1, &res, 0);
  554. if (unlikely(rc != SQLITE_OK)) {
  555. error_report("Failed to prepare statement to fetch host dimensions");
  556. return;
  557. }
  558. rc = sqlite3_bind_int64(res, 1, (sqlite3_int64) wc->row_id);
  559. if (unlikely(rc != SQLITE_OK)) {
  560. error_report("Failed to row parameter");
  561. goto skip_run;
  562. }
  563. uint32_t total_checked = 0;
  564. uint32_t total_deleted= 0;
  565. uint64_t last_row_id = wc->row_id;
  566. netdata_log_info("METADATA: Checking dimensions starting after row %"PRIu64, wc->row_id);
  567. while (sqlite3_step_monitored(res) == SQLITE_ROW && total_deleted < MAX_METADATA_CLEANUP) {
  568. if (unlikely(metadata_flag_check(wc, METADATA_FLAG_SHUTDOWN)))
  569. break;
  570. last_row_id = sqlite3_column_int64(res, 1);
  571. rc = dimension_can_be_deleted((uuid_t *)sqlite3_column_blob(res, 0));
  572. if (rc == true) {
  573. delete_dimension_uuid((uuid_t *)sqlite3_column_blob(res, 0));
  574. total_deleted++;
  575. }
  576. total_checked++;
  577. }
  578. wc->row_id = last_row_id;
  579. time_t now = now_realtime_sec();
  580. if (total_deleted > 0) {
  581. wc->check_metadata_after = now + METADATA_MAINTENANCE_RETRY;
  582. } else
  583. wc->row_id = 0;
  584. netdata_log_info("METADATA: Checked %u, deleted %u -- will resume after row %"PRIu64" in %lld seconds", total_checked, total_deleted, wc->row_id,
  585. (long long)(wc->check_metadata_after - now));
  586. skip_run:
  587. rc = sqlite3_finalize(res);
  588. if (unlikely(rc != SQLITE_OK))
  589. error_report("Failed to finalize the prepared statement when reading dimensions");
  590. }
  591. static void cleanup_health_log(void)
  592. {
  593. RRDHOST *host;
  594. dfe_start_reentrant(rrdhost_root_index, host) {
  595. if (rrdhost_flag_check(host, RRDHOST_FLAG_ARCHIVED))
  596. continue;
  597. sql_health_alarm_log_cleanup(host);
  598. }
  599. dfe_done(host);
  600. }
  601. //
  602. // EVENT LOOP STARTS HERE
  603. //
  604. static void metadata_init_cmd_queue(struct metadata_wc *wc)
  605. {
  606. wc->cmd_queue.head = wc->cmd_queue.tail = 0;
  607. wc->queue_size = 0;
  608. fatal_assert(0 == uv_cond_init(&wc->cmd_cond));
  609. fatal_assert(0 == uv_mutex_init(&wc->cmd_mutex));
  610. }
  611. int metadata_enq_cmd_noblock(struct metadata_wc *wc, struct metadata_cmd *cmd)
  612. {
  613. unsigned queue_size;
  614. /* wait for free space in queue */
  615. uv_mutex_lock(&wc->cmd_mutex);
  616. if (cmd->opcode == METADATA_SYNC_SHUTDOWN) {
  617. metadata_flag_set(wc, METADATA_FLAG_SHUTDOWN);
  618. uv_mutex_unlock(&wc->cmd_mutex);
  619. return 0;
  620. }
  621. if (unlikely((queue_size = wc->queue_size) == METADATA_CMD_Q_MAX_SIZE ||
  622. metadata_flag_check(wc, METADATA_FLAG_SHUTDOWN))) {
  623. uv_mutex_unlock(&wc->cmd_mutex);
  624. return 1;
  625. }
  626. fatal_assert(queue_size < METADATA_CMD_Q_MAX_SIZE);
  627. /* enqueue command */
  628. wc->cmd_queue.cmd_array[wc->cmd_queue.tail] = *cmd;
  629. wc->cmd_queue.tail = wc->cmd_queue.tail != METADATA_CMD_Q_MAX_SIZE - 1 ?
  630. wc->cmd_queue.tail + 1 : 0;
  631. wc->queue_size = queue_size + 1;
  632. uv_mutex_unlock(&wc->cmd_mutex);
  633. return 0;
  634. }
  635. static void metadata_enq_cmd(struct metadata_wc *wc, struct metadata_cmd *cmd)
  636. {
  637. unsigned queue_size;
  638. /* wait for free space in queue */
  639. uv_mutex_lock(&wc->cmd_mutex);
  640. if (unlikely(metadata_flag_check(wc, METADATA_FLAG_SHUTDOWN))) {
  641. uv_mutex_unlock(&wc->cmd_mutex);
  642. (void) uv_async_send(&wc->async);
  643. return;
  644. }
  645. if (cmd->opcode == METADATA_SYNC_SHUTDOWN) {
  646. metadata_flag_set(wc, METADATA_FLAG_SHUTDOWN);
  647. uv_mutex_unlock(&wc->cmd_mutex);
  648. (void) uv_async_send(&wc->async);
  649. return;
  650. }
  651. while ((queue_size = wc->queue_size) == METADATA_CMD_Q_MAX_SIZE) {
  652. if (unlikely(metadata_flag_check(wc, METADATA_FLAG_SHUTDOWN))) {
  653. uv_mutex_unlock(&wc->cmd_mutex);
  654. return;
  655. }
  656. uv_cond_wait(&wc->cmd_cond, &wc->cmd_mutex);
  657. }
  658. fatal_assert(queue_size < METADATA_CMD_Q_MAX_SIZE);
  659. /* enqueue command */
  660. wc->cmd_queue.cmd_array[wc->cmd_queue.tail] = *cmd;
  661. wc->cmd_queue.tail = wc->cmd_queue.tail != METADATA_CMD_Q_MAX_SIZE - 1 ?
  662. wc->cmd_queue.tail + 1 : 0;
  663. wc->queue_size = queue_size + 1;
  664. uv_mutex_unlock(&wc->cmd_mutex);
  665. /* wake up event loop */
  666. (void) uv_async_send(&wc->async);
  667. }
  668. static struct metadata_cmd metadata_deq_cmd(struct metadata_wc *wc)
  669. {
  670. struct metadata_cmd ret;
  671. unsigned queue_size;
  672. uv_mutex_lock(&wc->cmd_mutex);
  673. queue_size = wc->queue_size;
  674. if (queue_size == 0) {
  675. memset(&ret, 0, sizeof(ret));
  676. ret.opcode = METADATA_DATABASE_NOOP;
  677. ret.completion = NULL;
  678. } else {
  679. /* dequeue command */
  680. ret = wc->cmd_queue.cmd_array[wc->cmd_queue.head];
  681. if (queue_size == 1) {
  682. wc->cmd_queue.head = wc->cmd_queue.tail = 0;
  683. } else {
  684. wc->cmd_queue.head = wc->cmd_queue.head != METADATA_CMD_Q_MAX_SIZE - 1 ?
  685. wc->cmd_queue.head + 1 : 0;
  686. }
  687. wc->queue_size = queue_size - 1;
  688. /* wake up producers */
  689. uv_cond_signal(&wc->cmd_cond);
  690. }
  691. uv_mutex_unlock(&wc->cmd_mutex);
  692. return ret;
  693. }
  694. static void async_cb(uv_async_t *handle)
  695. {
  696. uv_stop(handle->loop);
  697. uv_update_time(handle->loop);
  698. }
  699. #define TIMER_INITIAL_PERIOD_MS (1000)
  700. #define TIMER_REPEAT_PERIOD_MS (1000)
  701. static void timer_cb(uv_timer_t* handle)
  702. {
  703. uv_stop(handle->loop);
  704. uv_update_time(handle->loop);
  705. struct metadata_wc *wc = handle->data;
  706. struct metadata_cmd cmd;
  707. memset(&cmd, 0, sizeof(cmd));
  708. time_t now = now_realtime_sec();
  709. if (wc->check_metadata_after && wc->check_metadata_after < now) {
  710. cmd.opcode = METADATA_MAINTENANCE;
  711. if (!metadata_enq_cmd_noblock(wc, &cmd))
  712. wc->check_metadata_after = now + METADATA_MAINTENANCE_INTERVAL;
  713. }
  714. if (wc->check_hosts_after && wc->check_hosts_after < now) {
  715. cmd.opcode = METADATA_SCAN_HOSTS;
  716. if (!metadata_enq_cmd_noblock(wc, &cmd))
  717. wc->check_hosts_after = now + METADATA_HOST_CHECK_INTERVAL;
  718. }
  719. }
  720. static void after_metadata_cleanup(uv_work_t *req, int status)
  721. {
  722. UNUSED(status);
  723. struct metadata_wc *wc = req->data;
  724. metadata_flag_clear(wc, METADATA_FLAG_CLEANUP);
  725. }
  726. static void start_metadata_cleanup(uv_work_t *req)
  727. {
  728. register_libuv_worker_jobs();
  729. worker_is_busy(UV_EVENT_METADATA_CLEANUP);
  730. struct metadata_wc *wc = req->data;
  731. check_dimension_metadata(wc);
  732. cleanup_health_log();
  733. (void) sqlite3_wal_checkpoint(db_meta, NULL);
  734. worker_is_idle();
  735. }
  736. struct scan_metadata_payload {
  737. uv_work_t request;
  738. struct metadata_wc *wc;
  739. struct completion *completion;
  740. BUFFER *work_buffer;
  741. uint32_t max_count;
  742. };
  743. struct host_context_load_thread {
  744. uv_thread_t thread;
  745. RRDHOST *host;
  746. bool busy;
  747. bool finished;
  748. };
  749. static void restore_host_context(void *arg)
  750. {
  751. struct host_context_load_thread *hclt = arg;
  752. RRDHOST *host = hclt->host;
  753. usec_t started_ut = now_monotonic_usec(); (void)started_ut;
  754. rrdhost_load_rrdcontext_data(host);
  755. usec_t ended_ut = now_monotonic_usec(); (void)ended_ut;
  756. rrdhost_flag_clear(host, RRDHOST_FLAG_PENDING_CONTEXT_LOAD | RRDHOST_FLAG_CONTEXT_LOAD_IN_PROGRESS);
  757. #ifdef ENABLE_ACLK
  758. aclk_queue_node_info(host, false);
  759. #endif
  760. internal_error(true, "METADATA: 'host:%s' context load in %0.2f ms", rrdhost_hostname(host),
  761. (double)(ended_ut - started_ut) / USEC_PER_MS);
  762. __atomic_store_n(&hclt->finished, true, __ATOMIC_RELEASE);
  763. }
  764. // Callback after scan of hosts is done
  765. static void after_start_host_load_context(uv_work_t *req, int status __maybe_unused)
  766. {
  767. struct scan_metadata_payload *data = req->data;
  768. freez(data);
  769. }
  770. #define MAX_FIND_THREAD_RETRIES (10)
  771. static void cleanup_finished_threads(struct host_context_load_thread *hclt, size_t max_thread_slots, bool wait)
  772. {
  773. for (size_t index = 0; index < max_thread_slots; index++) {
  774. if (__atomic_load_n(&(hclt[index].finished), __ATOMIC_RELAXED)
  775. || (wait && __atomic_load_n(&(hclt[index].busy), __ATOMIC_ACQUIRE))) {
  776. int rc = uv_thread_join(&(hclt[index].thread));
  777. if (rc)
  778. netdata_log_error("Failed to join thread, rc = %d",rc);
  779. __atomic_store_n(&(hclt[index].busy), false, __ATOMIC_RELEASE);
  780. __atomic_store_n(&(hclt[index].finished), false, __ATOMIC_RELEASE);
  781. }
  782. }
  783. }
  784. static size_t find_available_thread_slot(struct host_context_load_thread *hclt, size_t max_thread_slots, size_t *found_index)
  785. {
  786. size_t retries = MAX_FIND_THREAD_RETRIES;
  787. while (retries--) {
  788. size_t index = 0;
  789. while (index < max_thread_slots) {
  790. if (false == __atomic_load_n(&(hclt[index].busy), __ATOMIC_ACQUIRE)) {
  791. *found_index = index;
  792. return true;
  793. }
  794. index++;
  795. }
  796. sleep_usec(10 * USEC_PER_MS);
  797. }
  798. return false;
  799. }
  800. static void start_all_host_load_context(uv_work_t *req __maybe_unused)
  801. {
  802. register_libuv_worker_jobs();
  803. struct scan_metadata_payload *data = req->data;
  804. UNUSED(data);
  805. worker_is_busy(UV_EVENT_HOST_CONTEXT_LOAD);
  806. usec_t started_ut = now_monotonic_usec(); (void)started_ut;
  807. RRDHOST *host;
  808. size_t max_threads = MIN(get_netdata_cpus() / 2, 6);
  809. struct host_context_load_thread *hclt = callocz(max_threads, sizeof(*hclt));
  810. size_t thread_index;
  811. dfe_start_reentrant(rrdhost_root_index, host) {
  812. if (rrdhost_flag_check(host, RRDHOST_FLAG_CONTEXT_LOAD_IN_PROGRESS) ||
  813. !rrdhost_flag_check(host, RRDHOST_FLAG_PENDING_CONTEXT_LOAD))
  814. continue;
  815. rrdhost_flag_set(host, RRDHOST_FLAG_CONTEXT_LOAD_IN_PROGRESS);
  816. internal_error(true, "METADATA: 'host:%s' loading context", rrdhost_hostname(host));
  817. cleanup_finished_threads(hclt, max_threads, false);
  818. bool found_slot = find_available_thread_slot(hclt, max_threads, &thread_index);
  819. if (unlikely(!found_slot)) {
  820. struct host_context_load_thread hclt_sync = {.host = host};
  821. restore_host_context(&hclt_sync);
  822. }
  823. else {
  824. __atomic_store_n(&hclt[thread_index].busy, true, __ATOMIC_RELAXED);
  825. hclt[thread_index].host = host;
  826. assert(0 == uv_thread_create(&hclt[thread_index].thread, restore_host_context, &hclt[thread_index]));
  827. }
  828. }
  829. dfe_done(host);
  830. cleanup_finished_threads(hclt, max_threads, true);
  831. freez(hclt);
  832. usec_t ended_ut = now_monotonic_usec(); (void)ended_ut;
  833. internal_error(true, "METADATA: 'host:ALL' contexts loaded in %0.2f ms", (double)(ended_ut - started_ut) / USEC_PER_MS);
  834. worker_is_idle();
  835. }
  836. // Callback after scan of hosts is done
  837. static void after_metadata_hosts(uv_work_t *req, int status __maybe_unused)
  838. {
  839. struct scan_metadata_payload *data = req->data;
  840. struct metadata_wc *wc = data->wc;
  841. metadata_flag_clear(wc, METADATA_FLAG_SCANNING_HOSTS);
  842. internal_error(true, "METADATA: scanning hosts complete");
  843. if (unlikely(data->completion)) {
  844. completion_mark_complete(data->completion);
  845. internal_error(true, "METADATA: Sending completion done");
  846. }
  847. freez(data);
  848. }
  849. static bool metadata_scan_host(RRDHOST *host, uint32_t max_count, bool use_transaction, BUFFER *work_buffer, size_t *query_counter) {
  850. RRDSET *st;
  851. int rc;
  852. bool more_to_do = false;
  853. uint32_t scan_count = 1;
  854. if (use_transaction)
  855. (void)db_execute(db_meta, "BEGIN TRANSACTION;");
  856. rrdset_foreach_reentrant(st, host) {
  857. if (scan_count == max_count) {
  858. more_to_do = true;
  859. break;
  860. }
  861. if(rrdset_flag_check(st, RRDSET_FLAG_METADATA_UPDATE)) {
  862. (*query_counter)++;
  863. rrdset_flag_clear(st, RRDSET_FLAG_METADATA_UPDATE);
  864. scan_count++;
  865. buffer_flush(work_buffer);
  866. rc = check_and_update_chart_labels(st, work_buffer, query_counter);
  867. if (unlikely(rc))
  868. error_report("METADATA: 'host:%s': Failed to update labels for chart %s", rrdhost_hostname(host), rrdset_name(st));
  869. else
  870. (*query_counter)++;
  871. rc = store_chart_metadata(st);
  872. if (unlikely(rc))
  873. error_report("METADATA: 'host:%s': Failed to store metadata for chart %s", rrdhost_hostname(host), rrdset_name(st));
  874. }
  875. RRDDIM *rd;
  876. rrddim_foreach_read(rd, st) {
  877. if(rrddim_flag_check(rd, RRDDIM_FLAG_METADATA_UPDATE)) {
  878. (*query_counter)++;
  879. rrddim_flag_clear(rd, RRDDIM_FLAG_METADATA_UPDATE);
  880. if (rrddim_option_check(rd, RRDDIM_OPTION_HIDDEN))
  881. rrddim_flag_set(rd, RRDDIM_FLAG_META_HIDDEN);
  882. else
  883. rrddim_flag_clear(rd, RRDDIM_FLAG_META_HIDDEN);
  884. rc = store_dimension_metadata(rd);
  885. if (unlikely(rc))
  886. error_report("METADATA: 'host:%s': Failed to dimension metadata for chart %s. dimension %s",
  887. rrdhost_hostname(host), rrdset_name(st),
  888. rrddim_name(rd));
  889. }
  890. }
  891. rrddim_foreach_done(rd);
  892. }
  893. rrdset_foreach_done(st);
  894. if (use_transaction)
  895. (void)db_execute(db_meta, "COMMIT TRANSACTION;");
  896. return more_to_do;
  897. }
  898. static void store_host_and_system_info(RRDHOST *host, size_t *query_counter)
  899. {
  900. if (unlikely(store_host_systeminfo(host))) {
  901. error_report("METADATA: 'host:%s': Failed to store host updated system information in the database", rrdhost_hostname(host));
  902. rrdhost_flag_set(host, RRDHOST_FLAG_METADATA_INFO | RRDHOST_FLAG_METADATA_UPDATE);
  903. }
  904. else {
  905. if (likely(query_counter))
  906. (*query_counter)++;
  907. }
  908. if (unlikely(store_host_metadata(host))) {
  909. error_report("METADATA: 'host:%s': Failed to store host info in the database", rrdhost_hostname(host));
  910. rrdhost_flag_set(host, RRDHOST_FLAG_METADATA_INFO | RRDHOST_FLAG_METADATA_UPDATE);
  911. }
  912. else {
  913. if (likely(query_counter))
  914. (*query_counter)++;
  915. }
  916. }
  917. // Worker thread to scan hosts for pending metadata to store
  918. static void start_metadata_hosts(uv_work_t *req __maybe_unused)
  919. {
  920. register_libuv_worker_jobs();
  921. RRDHOST *host;
  922. int transaction_started = 0;
  923. struct scan_metadata_payload *data = req->data;
  924. struct metadata_wc *wc = data->wc;
  925. BUFFER *work_buffer = data->work_buffer;
  926. usec_t all_started_ut = now_monotonic_usec(); (void)all_started_ut;
  927. internal_error(true, "METADATA: checking all hosts...");
  928. usec_t started_ut = now_monotonic_usec(); (void)started_ut;
  929. bool run_again = false;
  930. worker_is_busy(UV_EVENT_METADATA_STORE);
  931. if (!data->max_count)
  932. transaction_started = !db_execute(db_meta, "BEGIN TRANSACTION;");
  933. dfe_start_reentrant(rrdhost_root_index, host) {
  934. if (rrdhost_flag_check(host, RRDHOST_FLAG_ARCHIVED) || !rrdhost_flag_check(host, RRDHOST_FLAG_METADATA_UPDATE))
  935. continue;
  936. size_t query_counter = 0; (void)query_counter;
  937. rrdhost_flag_clear(host,RRDHOST_FLAG_METADATA_UPDATE);
  938. if (unlikely(rrdhost_flag_check(host, RRDHOST_FLAG_METADATA_LABELS))) {
  939. rrdhost_flag_clear(host, RRDHOST_FLAG_METADATA_LABELS);
  940. int rc = exec_statement_with_uuid(SQL_DELETE_HOST_LABELS, &host->host_uuid);
  941. if (likely(!rc)) {
  942. query_counter++;
  943. buffer_flush(work_buffer);
  944. struct query_build tmp = {.sql = work_buffer, .count = 0};
  945. uuid_unparse_lower(host->host_uuid, tmp.uuid_str);
  946. rrdlabels_walkthrough_read(host->rrdlabels, host_label_store_to_sql_callback, &tmp);
  947. rc = db_execute(db_meta, buffer_tostring(work_buffer));
  948. if (unlikely(rc)) {
  949. error_report("METADATA: 'host:%s': failed to update metadata host labels", rrdhost_hostname(host));
  950. rrdhost_flag_set(host, RRDHOST_FLAG_METADATA_LABELS | RRDHOST_FLAG_METADATA_UPDATE);
  951. }
  952. else
  953. query_counter++;
  954. } else {
  955. error_report("METADATA: 'host:%s': failed to delete old host labels", rrdhost_hostname(host));
  956. rrdhost_flag_set(host, RRDHOST_FLAG_METADATA_LABELS | RRDHOST_FLAG_METADATA_UPDATE);
  957. }
  958. }
  959. if (unlikely(rrdhost_flag_check(host, RRDHOST_FLAG_METADATA_CLAIMID))) {
  960. rrdhost_flag_clear(host, RRDHOST_FLAG_METADATA_CLAIMID);
  961. uuid_t uuid;
  962. int rc;
  963. if (likely(host->aclk_state.claimed_id && !uuid_parse(host->aclk_state.claimed_id, uuid)))
  964. rc = store_claim_id(&host->host_uuid, &uuid);
  965. else
  966. rc = store_claim_id(&host->host_uuid, NULL);
  967. if (unlikely(rc))
  968. rrdhost_flag_set(host, RRDHOST_FLAG_METADATA_CLAIMID | RRDHOST_FLAG_METADATA_UPDATE);
  969. else
  970. query_counter++;
  971. }
  972. if (unlikely(rrdhost_flag_check(host, RRDHOST_FLAG_METADATA_INFO))) {
  973. rrdhost_flag_clear(host, RRDHOST_FLAG_METADATA_INFO);
  974. store_host_and_system_info(host, &query_counter);
  975. }
  976. // For clarity
  977. bool use_transaction = data->max_count;
  978. if (unlikely(metadata_scan_host(host, data->max_count, use_transaction, work_buffer, &query_counter))) {
  979. run_again = true;
  980. rrdhost_flag_set(host,RRDHOST_FLAG_METADATA_UPDATE);
  981. internal_error(true,"METADATA: 'host:%s': scheduling another run, more charts to store", rrdhost_hostname(host));
  982. }
  983. usec_t ended_ut = now_monotonic_usec(); (void)ended_ut;
  984. internal_error(true, "METADATA: 'host:%s': saved metadata with %zu SQL statements, in %0.2f ms",
  985. rrdhost_hostname(host), query_counter,
  986. (double)(ended_ut - started_ut) / USEC_PER_MS);
  987. }
  988. dfe_done(host);
  989. if (!data->max_count && transaction_started)
  990. transaction_started = db_execute(db_meta, "COMMIT TRANSACTION;");
  991. usec_t all_ended_ut = now_monotonic_usec(); (void)all_ended_ut;
  992. internal_error(true, "METADATA: checking all hosts completed in %0.2f ms",
  993. (double)(all_ended_ut - all_started_ut) / USEC_PER_MS);
  994. if (unlikely(run_again))
  995. wc->check_hosts_after = now_realtime_sec() + METADATA_HOST_CHECK_IMMEDIATE;
  996. else
  997. wc->check_hosts_after = now_realtime_sec() + METADATA_HOST_CHECK_INTERVAL;
  998. worker_is_idle();
  999. }
  1000. static void metadata_event_loop(void *arg)
  1001. {
  1002. worker_register("METASYNC");
  1003. worker_register_job_name(METADATA_DATABASE_NOOP, "noop");
  1004. worker_register_job_name(METADATA_DATABASE_TIMER, "timer");
  1005. worker_register_job_name(METADATA_DEL_DIMENSION, "delete dimension");
  1006. worker_register_job_name(METADATA_STORE_CLAIM_ID, "add claim id");
  1007. worker_register_job_name(METADATA_ADD_HOST_INFO, "add host info");
  1008. worker_register_job_name(METADATA_MAINTENANCE, "maintenance");
  1009. worker_register_job_name(METADATA_ML_LOAD_MODELS, "ml load models");
  1010. int ret;
  1011. uv_loop_t *loop;
  1012. unsigned cmd_batch_size;
  1013. struct metadata_wc *wc = arg;
  1014. enum metadata_opcode opcode;
  1015. uv_work_t metadata_cleanup_worker;
  1016. uv_thread_set_name_np(wc->thread, "METASYNC");
  1017. // service_register(SERVICE_THREAD_TYPE_EVENT_LOOP, NULL, NULL, NULL, true);
  1018. loop = wc->loop = mallocz(sizeof(uv_loop_t));
  1019. ret = uv_loop_init(loop);
  1020. if (ret) {
  1021. netdata_log_error("uv_loop_init(): %s", uv_strerror(ret));
  1022. goto error_after_loop_init;
  1023. }
  1024. loop->data = wc;
  1025. ret = uv_async_init(wc->loop, &wc->async, async_cb);
  1026. if (ret) {
  1027. netdata_log_error("uv_async_init(): %s", uv_strerror(ret));
  1028. goto error_after_async_init;
  1029. }
  1030. wc->async.data = wc;
  1031. ret = uv_timer_init(loop, &wc->timer_req);
  1032. if (ret) {
  1033. netdata_log_error("uv_timer_init(): %s", uv_strerror(ret));
  1034. goto error_after_timer_init;
  1035. }
  1036. wc->timer_req.data = wc;
  1037. fatal_assert(0 == uv_timer_start(&wc->timer_req, timer_cb, TIMER_INITIAL_PERIOD_MS, TIMER_REPEAT_PERIOD_MS));
  1038. netdata_log_info("Starting metadata sync thread with %d entries command queue", METADATA_CMD_Q_MAX_SIZE);
  1039. struct metadata_cmd cmd;
  1040. memset(&cmd, 0, sizeof(cmd));
  1041. metadata_flag_clear(wc, METADATA_FLAG_CLEANUP);
  1042. metadata_flag_clear(wc, METADATA_FLAG_SCANNING_HOSTS);
  1043. wc->check_metadata_after = now_realtime_sec() + METADATA_MAINTENANCE_FIRST_CHECK;
  1044. wc->check_hosts_after = now_realtime_sec() + METADATA_HOST_CHECK_FIRST_CHECK;
  1045. int shutdown = 0;
  1046. wc->row_id = 0;
  1047. completion_mark_complete(&wc->init_complete);
  1048. BUFFER *work_buffer = buffer_create(1024, &netdata_buffers_statistics.buffers_sqlite);
  1049. struct scan_metadata_payload *data;
  1050. while (shutdown == 0 || (wc->flags & METADATA_WORKER_BUSY)) {
  1051. uuid_t *uuid;
  1052. RRDHOST *host = NULL;
  1053. worker_is_idle();
  1054. uv_run(loop, UV_RUN_DEFAULT);
  1055. /* wait for commands */
  1056. cmd_batch_size = 0;
  1057. do {
  1058. if (unlikely(cmd_batch_size >= METADATA_MAX_BATCH_SIZE))
  1059. break;
  1060. cmd = metadata_deq_cmd(wc);
  1061. opcode = cmd.opcode;
  1062. if (unlikely(opcode == METADATA_DATABASE_NOOP && metadata_flag_check(wc, METADATA_FLAG_SHUTDOWN))) {
  1063. shutdown = 1;
  1064. continue;
  1065. }
  1066. ++cmd_batch_size;
  1067. if (likely(opcode != METADATA_DATABASE_NOOP))
  1068. worker_is_busy(opcode);
  1069. switch (opcode) {
  1070. case METADATA_DATABASE_NOOP:
  1071. case METADATA_DATABASE_TIMER:
  1072. break;
  1073. case METADATA_ML_LOAD_MODELS: {
  1074. RRDDIM *rd = (RRDDIM *) cmd.param[0];
  1075. ml_dimension_load_models(rd);
  1076. break;
  1077. }
  1078. case METADATA_DEL_DIMENSION:
  1079. uuid = (uuid_t *) cmd.param[0];
  1080. if (likely(dimension_can_be_deleted(uuid)))
  1081. delete_dimension_uuid(uuid);
  1082. freez(uuid);
  1083. break;
  1084. case METADATA_STORE_CLAIM_ID:
  1085. store_claim_id((uuid_t *) cmd.param[0], (uuid_t *) cmd.param[1]);
  1086. freez((void *) cmd.param[0]);
  1087. freez((void *) cmd.param[1]);
  1088. break;
  1089. case METADATA_ADD_HOST_INFO:
  1090. host = (RRDHOST *) cmd.param[0];
  1091. store_host_and_system_info(host, NULL);
  1092. break;
  1093. case METADATA_SCAN_HOSTS:
  1094. if (unlikely(metadata_flag_check(wc, METADATA_FLAG_SCANNING_HOSTS)))
  1095. break;
  1096. if (unittest_running)
  1097. break;
  1098. data = mallocz(sizeof(*data));
  1099. data->request.data = data;
  1100. data->wc = wc;
  1101. data->completion = cmd.completion; // Completion by the worker
  1102. data->work_buffer = work_buffer;
  1103. if (unlikely(cmd.completion)) {
  1104. data->max_count = 0; // 0 will process all pending updates
  1105. cmd.completion = NULL; // Do not complete after launching worker (worker will do)
  1106. }
  1107. else
  1108. data->max_count = 5000;
  1109. metadata_flag_set(wc, METADATA_FLAG_SCANNING_HOSTS);
  1110. if (unlikely(
  1111. uv_queue_work(loop,&data->request,
  1112. start_metadata_hosts,
  1113. after_metadata_hosts))) {
  1114. // Failed to launch worker -- let the event loop handle completion
  1115. cmd.completion = data->completion;
  1116. freez(data);
  1117. metadata_flag_clear(wc, METADATA_FLAG_SCANNING_HOSTS);
  1118. }
  1119. break;
  1120. case METADATA_LOAD_HOST_CONTEXT:;
  1121. if (unittest_running)
  1122. break;
  1123. data = callocz(1,sizeof(*data));
  1124. data->request.data = data;
  1125. data->wc = wc;
  1126. if (unlikely(
  1127. uv_queue_work(loop,&data->request, start_all_host_load_context,
  1128. after_start_host_load_context))) {
  1129. freez(data);
  1130. }
  1131. break;
  1132. case METADATA_MAINTENANCE:
  1133. if (unlikely(metadata_flag_check(wc, METADATA_FLAG_CLEANUP)))
  1134. break;
  1135. metadata_cleanup_worker.data = wc;
  1136. metadata_flag_set(wc, METADATA_FLAG_CLEANUP);
  1137. if (unlikely(
  1138. uv_queue_work(loop, &metadata_cleanup_worker, start_metadata_cleanup, after_metadata_cleanup))) {
  1139. metadata_flag_clear(wc, METADATA_FLAG_CLEANUP);
  1140. }
  1141. break;
  1142. case METADATA_UNITTEST:;
  1143. struct thread_unittest *tu = (struct thread_unittest *) cmd.param[0];
  1144. sleep_usec(1000); // processing takes 1ms
  1145. __atomic_fetch_add(&tu->processed, 1, __ATOMIC_SEQ_CST);
  1146. break;
  1147. default:
  1148. break;
  1149. }
  1150. if (cmd.completion)
  1151. completion_mark_complete(cmd.completion);
  1152. } while (opcode != METADATA_DATABASE_NOOP);
  1153. }
  1154. if (!uv_timer_stop(&wc->timer_req))
  1155. uv_close((uv_handle_t *)&wc->timer_req, NULL);
  1156. uv_close((uv_handle_t *)&wc->async, NULL);
  1157. uv_cond_destroy(&wc->cmd_cond);
  1158. int rc;
  1159. do {
  1160. rc = uv_loop_close(loop);
  1161. } while (rc != UV_EBUSY);
  1162. buffer_free(work_buffer);
  1163. freez(loop);
  1164. worker_unregister();
  1165. netdata_log_info("METADATA: Shutting down event loop");
  1166. completion_mark_complete(&wc->init_complete);
  1167. return;
  1168. error_after_timer_init:
  1169. uv_close((uv_handle_t *)&wc->async, NULL);
  1170. error_after_async_init:
  1171. fatal_assert(0 == uv_loop_close(loop));
  1172. error_after_loop_init:
  1173. freez(loop);
  1174. worker_unregister();
  1175. }
  1176. struct metadata_wc metasync_worker = {.loop = NULL};
  1177. void metadata_sync_shutdown(void)
  1178. {
  1179. completion_init(&metasync_worker.init_complete);
  1180. struct metadata_cmd cmd;
  1181. memset(&cmd, 0, sizeof(cmd));
  1182. netdata_log_info("METADATA: Sending a shutdown command");
  1183. cmd.opcode = METADATA_SYNC_SHUTDOWN;
  1184. metadata_enq_cmd(&metasync_worker, &cmd);
  1185. /* wait for metadata thread to shut down */
  1186. netdata_log_info("METADATA: Waiting for shutdown ACK");
  1187. completion_wait_for(&metasync_worker.init_complete);
  1188. completion_destroy(&metasync_worker.init_complete);
  1189. netdata_log_info("METADATA: Shutdown complete");
  1190. }
  1191. void metadata_sync_shutdown_prepare(void)
  1192. {
  1193. if (unlikely(!metasync_worker.loop))
  1194. return;
  1195. struct metadata_cmd cmd;
  1196. memset(&cmd, 0, sizeof(cmd));
  1197. struct completion compl;
  1198. completion_init(&compl);
  1199. netdata_log_info("METADATA: Sending a scan host command");
  1200. uint32_t max_wait_iterations = 2000;
  1201. while (unlikely(metadata_flag_check(&metasync_worker, METADATA_FLAG_SCANNING_HOSTS)) && max_wait_iterations--) {
  1202. if (max_wait_iterations == 1999)
  1203. netdata_log_info("METADATA: Current worker is running; waiting to finish");
  1204. sleep_usec(1000);
  1205. }
  1206. cmd.opcode = METADATA_SCAN_HOSTS;
  1207. cmd.completion = &compl;
  1208. metadata_enq_cmd(&metasync_worker, &cmd);
  1209. netdata_log_info("METADATA: Waiting for host scan completion");
  1210. completion_wait_for(&compl);
  1211. completion_destroy(&compl);
  1212. netdata_log_info("METADATA: Host scan complete; can continue with shutdown");
  1213. }
  1214. // -------------------------------------------------------------
  1215. // Init function called on agent startup
  1216. void metadata_sync_init(void)
  1217. {
  1218. struct metadata_wc *wc = &metasync_worker;
  1219. memset(wc, 0, sizeof(*wc));
  1220. metadata_init_cmd_queue(wc);
  1221. completion_init(&wc->init_complete);
  1222. fatal_assert(0 == uv_thread_create(&(wc->thread), metadata_event_loop, wc));
  1223. completion_wait_for(&wc->init_complete);
  1224. completion_destroy(&wc->init_complete);
  1225. netdata_log_info("SQLite metadata sync initialization complete");
  1226. }
  1227. // Helpers
  1228. static inline void queue_metadata_cmd(enum metadata_opcode opcode, const void *param0, const void *param1)
  1229. {
  1230. struct metadata_cmd cmd;
  1231. cmd.opcode = opcode;
  1232. cmd.param[0] = param0;
  1233. cmd.param[1] = param1;
  1234. cmd.completion = NULL;
  1235. metadata_enq_cmd(&metasync_worker, &cmd);
  1236. }
  1237. // Public
  1238. void metaqueue_delete_dimension_uuid(uuid_t *uuid)
  1239. {
  1240. if (unlikely(!metasync_worker.loop))
  1241. return;
  1242. uuid_t *use_uuid = mallocz(sizeof(*uuid));
  1243. uuid_copy(*use_uuid, *uuid);
  1244. queue_metadata_cmd(METADATA_DEL_DIMENSION, use_uuid, NULL);
  1245. }
  1246. void metaqueue_store_claim_id(uuid_t *host_uuid, uuid_t *claim_uuid)
  1247. {
  1248. if (unlikely(!host_uuid))
  1249. return;
  1250. uuid_t *local_host_uuid = mallocz(sizeof(*host_uuid));
  1251. uuid_t *local_claim_uuid = NULL;
  1252. uuid_copy(*local_host_uuid, *host_uuid);
  1253. if (likely(claim_uuid)) {
  1254. local_claim_uuid = mallocz(sizeof(*claim_uuid));
  1255. uuid_copy(*local_claim_uuid, *claim_uuid);
  1256. }
  1257. queue_metadata_cmd(METADATA_STORE_CLAIM_ID, local_host_uuid, local_claim_uuid);
  1258. }
  1259. void metaqueue_host_update_info(RRDHOST *host)
  1260. {
  1261. if (unlikely(!metasync_worker.loop))
  1262. return;
  1263. queue_metadata_cmd(METADATA_ADD_HOST_INFO, host, NULL);
  1264. }
  1265. void metaqueue_ml_load_models(RRDDIM *rd)
  1266. {
  1267. if (unlikely(!metasync_worker.loop))
  1268. return;
  1269. queue_metadata_cmd(METADATA_ML_LOAD_MODELS, rd, NULL);
  1270. }
  1271. void metadata_queue_load_host_context(RRDHOST *host)
  1272. {
  1273. if (unlikely(!metasync_worker.loop))
  1274. return;
  1275. queue_metadata_cmd(METADATA_LOAD_HOST_CONTEXT, host, NULL);
  1276. }
  1277. //
  1278. // unitests
  1279. //
  1280. static void *unittest_queue_metadata(void *arg) {
  1281. struct thread_unittest *tu = arg;
  1282. struct metadata_cmd cmd;
  1283. cmd.opcode = METADATA_UNITTEST;
  1284. cmd.param[0] = tu;
  1285. cmd.param[1] = NULL;
  1286. cmd.completion = NULL;
  1287. metadata_enq_cmd(&metasync_worker, &cmd);
  1288. do {
  1289. __atomic_fetch_add(&tu->added, 1, __ATOMIC_SEQ_CST);
  1290. metadata_enq_cmd(&metasync_worker, &cmd);
  1291. sleep_usec(10000);
  1292. } while (!__atomic_load_n(&tu->join, __ATOMIC_RELAXED));
  1293. return arg;
  1294. }
  1295. static void *metadata_unittest_threads(void)
  1296. {
  1297. unsigned done;
  1298. struct thread_unittest tu = {
  1299. .join = 0,
  1300. .added = 0,
  1301. .processed = 0,
  1302. .done = &done,
  1303. };
  1304. // Queue messages / Time it
  1305. time_t seconds_to_run = 5;
  1306. int threads_to_create = 4;
  1307. fprintf(
  1308. stderr,
  1309. "\nChecking metadata queue using %d threads for %lld seconds...\n",
  1310. threads_to_create,
  1311. (long long)seconds_to_run);
  1312. netdata_thread_t threads[threads_to_create];
  1313. tu.join = 0;
  1314. for (int i = 0; i < threads_to_create; i++) {
  1315. char buf[100 + 1];
  1316. snprintf(buf, 100, "META[%d]", i);
  1317. netdata_thread_create(
  1318. &threads[i],
  1319. buf,
  1320. NETDATA_THREAD_OPTION_DONT_LOG | NETDATA_THREAD_OPTION_JOINABLE,
  1321. unittest_queue_metadata,
  1322. &tu);
  1323. }
  1324. (void) uv_async_send(&metasync_worker.async);
  1325. sleep_usec(seconds_to_run * USEC_PER_SEC);
  1326. __atomic_store_n(&tu.join, 1, __ATOMIC_RELAXED);
  1327. for (int i = 0; i < threads_to_create; i++) {
  1328. void *retval;
  1329. netdata_thread_join(threads[i], &retval);
  1330. }
  1331. sleep_usec(5 * USEC_PER_SEC);
  1332. fprintf(stderr, "Added %u elements, processed %u\n", tu.added, tu.processed);
  1333. return 0;
  1334. }
  1335. int metadata_unittest(void)
  1336. {
  1337. metadata_sync_init();
  1338. // Queue items for a specific period of time
  1339. metadata_unittest_threads();
  1340. fprintf(stderr, "Items still in queue %u\n", metasync_worker.queue_size);
  1341. metadata_sync_shutdown();
  1342. return 0;
  1343. }