sqlite_metadata.c 55 KB

12345678910111213141516171819202122232425262728293031323334353637383940414243444546474849505152535455565758596061626364656667686970717273747576777879808182838485868788899091929394959697989910010110210310410510610710810911011111211311411511611711811912012112212312412512612712812913013113213313413513613713813914014114214314414514614714814915015115215315415515615715815916016116216316416516616716816917017117217317417517617717817918018118218318418518618718818919019119219319419519619719819920020120220320420520620720820921021121221321421521621721821922022122222322422522622722822923023123223323423523623723823924024124224324424524624724824925025125225325425525625725825926026126226326426526626726826927027127227327427527627727827928028128228328428528628728828929029129229329429529629729829930030130230330430530630730830931031131231331431531631731831932032132232332432532632732832933033133233333433533633733833934034134234334434534634734834935035135235335435535635735835936036136236336436536636736836937037137237337437537637737837938038138238338438538638738838939039139239339439539639739839940040140240340440540640740840941041141241341441541641741841942042142242342442542642742842943043143243343443543643743843944044144244344444544644744844945045145245345445545645745845946046146246346446546646746846947047147247347447547647747847948048148248348448548648748848949049149249349449549649749849950050150250350450550650750850951051151251351451551651751851952052152252352452552652752852953053153253353453553653753853954054154254354454554654754854955055155255355455555655755855956056156256356456556656756856957057157257357457557657757857958058158258358458558658758858959059159259359459559659759859960060160260360460560660760860961061161261361461561661761861962062162262362462562662762862963063163263363463563663763863964064164264364464564664764864965065165265365465565665765865966066166266366466566666766866967067167267367467567667767867968068168268368468568668768868969069169269369469569669769869970070170270370470570670770870971071171271371471571671771871972072172272372472572672772872973073173273373473573673773873974074174274374474574674774874975075175275375475575675775875976076176276376476576676776876977077177277377477577677777877978078178278378478578678778878979079179279379479579679779879980080180280380480580680780880981081181281381481581681781881982082182282382482582682782882983083183283383483583683783883984084184284384484584684784884985085185285385485585685785885986086186286386486586686786886987087187287387487587687787887988088188288388488588688788888989089189289389489589689789889990090190290390490590690790890991091191291391491591691791891992092192292392492592692792892993093193293393493593693793893994094194294394494594694794894995095195295395495595695795895996096196296396496596696796896997097197297397497597697797897998098198298398498598698798898999099199299399499599699799899910001001100210031004100510061007100810091010101110121013101410151016101710181019102010211022102310241025102610271028102910301031103210331034103510361037103810391040104110421043104410451046104710481049105010511052105310541055105610571058105910601061106210631064106510661067106810691070107110721073107410751076107710781079108010811082108310841085108610871088108910901091109210931094109510961097109810991100110111021103110411051106110711081109111011111112111311141115111611171118111911201121112211231124112511261127112811291130113111321133113411351136113711381139114011411142114311441145114611471148114911501151115211531154115511561157115811591160116111621163116411651166116711681169117011711172117311741175117611771178117911801181118211831184118511861187118811891190119111921193119411951196119711981199120012011202120312041205120612071208120912101211121212131214121512161217121812191220122112221223122412251226122712281229123012311232123312341235123612371238123912401241124212431244124512461247124812491250125112521253125412551256125712581259126012611262126312641265126612671268126912701271127212731274127512761277127812791280128112821283128412851286128712881289129012911292129312941295129612971298129913001301130213031304130513061307130813091310131113121313131413151316131713181319132013211322132313241325132613271328132913301331133213331334133513361337133813391340134113421343134413451346134713481349135013511352135313541355135613571358135913601361136213631364136513661367136813691370137113721373137413751376137713781379138013811382138313841385138613871388138913901391139213931394139513961397139813991400140114021403140414051406140714081409141014111412141314141415141614171418141914201421142214231424142514261427142814291430143114321433143414351436143714381439144014411442144314441445144614471448144914501451145214531454145514561457145814591460146114621463146414651466146714681469147014711472147314741475147614771478147914801481148214831484148514861487148814891490149114921493149414951496149714981499150015011502150315041505150615071508150915101511151215131514151515161517151815191520152115221523152415251526152715281529153015311532153315341535153615371538153915401541154215431544154515461547154815491550155115521553155415551556155715581559156015611562156315641565156615671568156915701571157215731574157515761577157815791580158115821583158415851586158715881589159015911592159315941595159615971598159916001601160216031604160516061607160816091610161116121613161416151616161716181619
  1. // SPDX-License-Identifier: GPL-3.0-or-later
  2. #include "sqlite_metadata.h"
  3. // SQL statements
  4. #define SQL_STORE_CLAIM_ID "INSERT INTO node_instance " \
  5. "(host_id, claim_id, date_created) VALUES (@host_id, @claim_id, unixepoch()) " \
  6. "ON CONFLICT(host_id) DO UPDATE SET claim_id = excluded.claim_id;"
  7. #define SQL_DELETE_HOST_LABELS "DELETE FROM host_label WHERE host_id = @uuid;"
  8. #define STORE_HOST_LABEL \
  9. "INSERT OR REPLACE INTO host_label (host_id, source_type, label_key, label_value, date_created) VALUES "
  10. #define STORE_CHART_LABEL \
  11. "INSERT OR REPLACE INTO chart_label (chart_id, source_type, label_key, label_value, date_created) VALUES "
  12. #define STORE_HOST_OR_CHART_LABEL_VALUE "(u2h('%s'), %d,'%s','%s', unixepoch())"
  13. #define DELETE_DIMENSION_UUID "DELETE FROM dimension WHERE dim_id = @uuid;"
  14. #define SQL_STORE_HOST_INFO "INSERT OR REPLACE INTO host " \
  15. "(host_id, hostname, registry_hostname, update_every, os, timezone," \
  16. "tags, hops, memory_mode, abbrev_timezone, utc_offset, program_name, program_version," \
  17. "entries, health_enabled) " \
  18. "values (@host_id, @hostname, @registry_hostname, @update_every, @os, @timezone, @tags, @hops, @memory_mode, " \
  19. "@abbrev_timezone, @utc_offset, @program_name, @program_version, " \
  20. "@entries, @health_enabled);"
  21. #define SQL_STORE_CHART "insert or replace into chart (chart_id, host_id, type, id, " \
  22. "name, family, context, title, unit, plugin, module, priority, update_every , chart_type , memory_mode , " \
  23. "history_entries) values (?1,?2,?3,?4,?5,?6,?7,?8,?9,?10,?11,?12,?13,?14,?15,?16);"
  24. #define SQL_STORE_DIMENSION "INSERT OR REPLACE INTO dimension (dim_id, chart_id, id, name, multiplier, divisor , algorithm, options) " \
  25. "VALUES (@dim_id, @chart_id, @id, @name, @multiplier, @divisor, @algorithm, @options);"
  26. #define SELECT_DIMENSION_LIST "SELECT dim_id, rowid FROM dimension WHERE rowid > @row_id"
  27. #define STORE_HOST_INFO "INSERT OR REPLACE INTO host_info (host_id, system_key, system_value, date_created) VALUES "
  28. #define STORE_HOST_INFO_VALUES "(u2h('%s'), '%s','%s', unixepoch())"
  29. #define MIGRATE_LOCALHOST_TO_NEW_MACHINE_GUID \
  30. "UPDATE chart SET host_id = @host_id WHERE host_id in (SELECT host_id FROM host where host_id <> @host_id and hops = 0);"
  31. #define DELETE_NON_EXISTING_LOCALHOST "DELETE FROM host WHERE hops = 0 AND host_id <> @host_id;"
  32. #define DELETE_MISSING_NODE_INSTANCES "DELETE FROM node_instance WHERE host_id NOT IN (SELECT host_id FROM host);"
  33. #define METADATA_CMD_Q_MAX_SIZE (1024) // Max queue size; callers will block until there is room
  34. #define METADATA_MAINTENANCE_FIRST_CHECK (1800) // Maintenance first run after agent startup in seconds
  35. #define METADATA_MAINTENANCE_RETRY (60) // Retry run if already running or last run did actual work
  36. #define METADATA_MAINTENANCE_INTERVAL (3600) // Repeat maintenance after latest successful
  37. #define METADATA_HOST_CHECK_FIRST_CHECK (5) // First check for pending metadata
  38. #define METADATA_HOST_CHECK_INTERVAL (30) // Repeat check for pending metadata
  39. #define METADATA_HOST_CHECK_IMMEDIATE (5) // Repeat immediate run because we have more metadata to write
  40. #define MAX_METADATA_CLEANUP (500) // Maximum metadata write operations (e.g deletes before retrying)
  41. #define METADATA_MAX_BATCH_SIZE (512) // Maximum commands to execute before running the event loop
  42. enum metadata_opcode {
  43. METADATA_DATABASE_NOOP = 0,
  44. METADATA_DATABASE_TIMER,
  45. METADATA_DEL_DIMENSION,
  46. METADATA_STORE_CLAIM_ID,
  47. METADATA_ADD_HOST_INFO,
  48. METADATA_SCAN_HOSTS,
  49. METADATA_LOAD_HOST_CONTEXT,
  50. METADATA_MAINTENANCE,
  51. METADATA_SYNC_SHUTDOWN,
  52. METADATA_UNITTEST,
  53. METADATA_ML_LOAD_MODELS,
  54. // leave this last
  55. // we need it to check for worker utilization
  56. METADATA_MAX_ENUMERATIONS_DEFINED
  57. };
  58. #define MAX_PARAM_LIST (2)
  59. struct metadata_cmd {
  60. enum metadata_opcode opcode;
  61. struct completion *completion;
  62. const void *param[MAX_PARAM_LIST];
  63. };
  64. struct metadata_database_cmdqueue {
  65. unsigned head, tail;
  66. struct metadata_cmd cmd_array[METADATA_CMD_Q_MAX_SIZE];
  67. };
  68. typedef enum {
  69. METADATA_FLAG_CLEANUP = (1 << 0), // Cleanup is running
  70. METADATA_FLAG_SCANNING_HOSTS = (1 << 1), // Scanning of hosts in worker thread
  71. METADATA_FLAG_SHUTDOWN = (1 << 2), // Shutting down
  72. } METADATA_FLAG;
  73. #define METADATA_WORKER_BUSY (METADATA_FLAG_CLEANUP | METADATA_FLAG_SCANNING_HOSTS)
  74. struct metadata_wc {
  75. uv_thread_t thread;
  76. uv_loop_t *loop;
  77. uv_async_t async;
  78. uv_timer_t timer_req;
  79. time_t check_metadata_after;
  80. time_t check_hosts_after;
  81. volatile unsigned queue_size;
  82. METADATA_FLAG flags;
  83. uint64_t row_id;
  84. struct completion init_complete;
  85. /* FIFO command queue */
  86. uv_mutex_t cmd_mutex;
  87. uv_cond_t cmd_cond;
  88. struct metadata_database_cmdqueue cmd_queue;
  89. };
  90. #define metadata_flag_check(target_flags, flag) (__atomic_load_n(&((target_flags)->flags), __ATOMIC_SEQ_CST) & (flag))
  91. #define metadata_flag_set(target_flags, flag) __atomic_or_fetch(&((target_flags)->flags), (flag), __ATOMIC_SEQ_CST)
  92. #define metadata_flag_clear(target_flags, flag) __atomic_and_fetch(&((target_flags)->flags), ~(flag), __ATOMIC_SEQ_CST)
  93. //
  94. // For unittest
  95. //
  96. struct thread_unittest {
  97. int join;
  98. unsigned added;
  99. unsigned processed;
  100. unsigned *done;
  101. };
  102. // Metadata functions
  103. struct query_build {
  104. BUFFER *sql;
  105. int count;
  106. char uuid_str[UUID_STR_LEN];
  107. };
  108. static int host_label_store_to_sql_callback(const char *name, const char *value, RRDLABEL_SRC ls, void *data) {
  109. struct query_build *lb = data;
  110. if (unlikely(!lb->count))
  111. buffer_sprintf(lb->sql, STORE_HOST_LABEL);
  112. else
  113. buffer_strcat(lb->sql, ", ");
  114. buffer_sprintf(lb->sql, STORE_HOST_OR_CHART_LABEL_VALUE, lb->uuid_str, (int)ls & ~(RRDLABEL_FLAG_INTERNAL), name, value);
  115. lb->count++;
  116. return 1;
  117. }
  118. static int chart_label_store_to_sql_callback(const char *name, const char *value, RRDLABEL_SRC ls, void *data) {
  119. struct query_build *lb = data;
  120. if (unlikely(!lb->count))
  121. buffer_sprintf(lb->sql, STORE_CHART_LABEL);
  122. else
  123. buffer_strcat(lb->sql, ", ");
  124. buffer_sprintf(lb->sql, STORE_HOST_OR_CHART_LABEL_VALUE, lb->uuid_str, ls, name, value);
  125. lb->count++;
  126. return 1;
  127. }
  128. #define SQL_DELETE_CHART_LABEL "DELETE FROM chart_label WHERE chart_id = @chart_id;"
  129. #define SQL_DELETE_CHART_LABEL_HISTORY "DELETE FROM chart_label WHERE date_created < %ld AND chart_id = @chart_id;"
  130. static void clean_old_chart_labels(RRDSET *st)
  131. {
  132. char sql[512];
  133. time_t first_time_s = rrdset_first_entry_s(st);
  134. if (unlikely(!first_time_s))
  135. snprintfz(sql, 511,SQL_DELETE_CHART_LABEL);
  136. else
  137. snprintfz(sql, 511,SQL_DELETE_CHART_LABEL_HISTORY, first_time_s);
  138. int rc = exec_statement_with_uuid(sql, &st->chart_uuid);
  139. if (unlikely(rc))
  140. error_report("METADATA: 'host:%s' Failed to clean old labels for chart %s", rrdhost_hostname(st->rrdhost), rrdset_name(st));
  141. }
  142. static int check_and_update_chart_labels(RRDSET *st, BUFFER *work_buffer, size_t *query_counter)
  143. {
  144. size_t old_version = st->rrdlabels_last_saved_version;
  145. size_t new_version = dictionary_version(st->rrdlabels);
  146. if (new_version == old_version)
  147. return 0;
  148. struct query_build tmp = {.sql = work_buffer, .count = 0};
  149. uuid_unparse_lower(st->chart_uuid, tmp.uuid_str);
  150. rrdlabels_walkthrough_read(st->rrdlabels, chart_label_store_to_sql_callback, &tmp);
  151. int rc = db_execute(db_meta, buffer_tostring(work_buffer));
  152. if (likely(!rc)) {
  153. st->rrdlabels_last_saved_version = new_version;
  154. (*query_counter)++;
  155. }
  156. clean_old_chart_labels(st);
  157. return rc;
  158. }
  159. // Migrate all hosts with hops zero to this host_uuid
  160. void migrate_localhost(uuid_t *host_uuid)
  161. {
  162. int rc;
  163. rc = exec_statement_with_uuid(MIGRATE_LOCALHOST_TO_NEW_MACHINE_GUID, host_uuid);
  164. if (!rc)
  165. rc = exec_statement_with_uuid(DELETE_NON_EXISTING_LOCALHOST, host_uuid);
  166. if (!rc) {
  167. if (unlikely(db_execute(db_meta, DELETE_MISSING_NODE_INSTANCES)))
  168. error_report("Failed to remove deleted hosts from node instances");
  169. }
  170. }
  171. static int store_claim_id(uuid_t *host_id, uuid_t *claim_id)
  172. {
  173. sqlite3_stmt *res = NULL;
  174. int rc;
  175. if (unlikely(!db_meta)) {
  176. if (default_rrd_memory_mode == RRD_MEMORY_MODE_DBENGINE)
  177. error_report("Database has not been initialized");
  178. return 1;
  179. }
  180. rc = sqlite3_prepare_v2(db_meta, SQL_STORE_CLAIM_ID, -1, &res, 0);
  181. if (unlikely(rc != SQLITE_OK)) {
  182. error_report("Failed to prepare statement to store host claim id");
  183. return 1;
  184. }
  185. rc = sqlite3_bind_blob(res, 1, host_id, sizeof(*host_id), SQLITE_STATIC);
  186. if (unlikely(rc != SQLITE_OK)) {
  187. error_report("Failed to bind host_id parameter to store claim id");
  188. goto failed;
  189. }
  190. if (claim_id)
  191. rc = sqlite3_bind_blob(res, 2, claim_id, sizeof(*claim_id), SQLITE_STATIC);
  192. else
  193. rc = sqlite3_bind_null(res, 2);
  194. if (unlikely(rc != SQLITE_OK)) {
  195. error_report("Failed to bind claim_id parameter to host claim id");
  196. goto failed;
  197. }
  198. rc = execute_insert(res);
  199. if (unlikely(rc != SQLITE_DONE))
  200. error_report("Failed to store host claim id rc = %d", rc);
  201. failed:
  202. if (unlikely(sqlite3_finalize(res) != SQLITE_OK))
  203. error_report("Failed to finalize the prepared statement when storing a host claim id");
  204. return rc != SQLITE_DONE;
  205. }
  206. static void delete_dimension_uuid(uuid_t *dimension_uuid)
  207. {
  208. static __thread sqlite3_stmt *res = NULL;
  209. int rc;
  210. if (unlikely(!res)) {
  211. rc = prepare_statement(db_meta, DELETE_DIMENSION_UUID, &res);
  212. if (rc != SQLITE_OK) {
  213. error_report("Failed to prepare statement to delete a dimension uuid");
  214. return;
  215. }
  216. }
  217. rc = sqlite3_bind_blob(res, 1, dimension_uuid, sizeof(*dimension_uuid), SQLITE_STATIC);
  218. if (unlikely(rc != SQLITE_OK))
  219. goto skip_execution;
  220. rc = sqlite3_step_monitored(res);
  221. if (unlikely(rc != SQLITE_DONE))
  222. error_report("Failed to delete dimension uuid, rc = %d", rc);
  223. skip_execution:
  224. rc = sqlite3_reset(res);
  225. if (unlikely(rc != SQLITE_OK))
  226. error_report("Failed to reset statement when deleting dimension UUID, rc = %d", rc);
  227. }
  228. //
  229. // Store host and host system info information in the database
  230. static int store_host_metadata(RRDHOST *host)
  231. {
  232. static __thread sqlite3_stmt *res = NULL;
  233. int rc, param = 0;
  234. if (unlikely(!db_meta)) {
  235. if (default_rrd_memory_mode != RRD_MEMORY_MODE_DBENGINE)
  236. return 0;
  237. error_report("Database has not been initialized");
  238. return 1;
  239. }
  240. if (unlikely((!res))) {
  241. rc = prepare_statement(db_meta, SQL_STORE_HOST_INFO, &res);
  242. if (unlikely(rc != SQLITE_OK)) {
  243. error_report("Failed to prepare statement to store host, rc = %d", rc);
  244. return 1;
  245. }
  246. }
  247. rc = sqlite3_bind_blob(res, ++param, &host->host_uuid, sizeof(host->host_uuid), SQLITE_STATIC);
  248. if (unlikely(rc != SQLITE_OK))
  249. goto bind_fail;
  250. rc = bind_text_null(res, ++param, rrdhost_hostname(host), 0);
  251. if (unlikely(rc != SQLITE_OK))
  252. goto bind_fail;
  253. rc = bind_text_null(res, ++param, rrdhost_registry_hostname(host), 1);
  254. if (unlikely(rc != SQLITE_OK))
  255. goto bind_fail;
  256. rc = sqlite3_bind_int(res, ++param, host->rrd_update_every);
  257. if (unlikely(rc != SQLITE_OK))
  258. goto bind_fail;
  259. rc = bind_text_null(res, ++param, rrdhost_os(host), 1);
  260. if (unlikely(rc != SQLITE_OK))
  261. goto bind_fail;
  262. rc = bind_text_null(res, ++param, rrdhost_timezone(host), 1);
  263. if (unlikely(rc != SQLITE_OK))
  264. goto bind_fail;
  265. rc = bind_text_null(res, ++param, rrdhost_tags(host), 1);
  266. if (unlikely(rc != SQLITE_OK))
  267. goto bind_fail;
  268. rc = sqlite3_bind_int(res, ++param, host->system_info ? host->system_info->hops : 0);
  269. if (unlikely(rc != SQLITE_OK))
  270. goto bind_fail;
  271. rc = sqlite3_bind_int(res, ++param, host->rrd_memory_mode);
  272. if (unlikely(rc != SQLITE_OK))
  273. goto bind_fail;
  274. rc = bind_text_null(res, ++param, rrdhost_abbrev_timezone(host), 1);
  275. if (unlikely(rc != SQLITE_OK))
  276. goto bind_fail;
  277. rc = sqlite3_bind_int(res, ++param, host->utc_offset);
  278. if (unlikely(rc != SQLITE_OK))
  279. goto bind_fail;
  280. rc = bind_text_null(res, ++param, rrdhost_program_name(host), 1);
  281. if (unlikely(rc != SQLITE_OK))
  282. goto bind_fail;
  283. rc = bind_text_null(res, ++param, rrdhost_program_version(host), 1);
  284. if (unlikely(rc != SQLITE_OK))
  285. goto bind_fail;
  286. rc = sqlite3_bind_int64(res, ++param, host->rrd_history_entries);
  287. if (unlikely(rc != SQLITE_OK))
  288. goto bind_fail;
  289. rc = sqlite3_bind_int(res, ++param, (int ) host->health.health_enabled);
  290. if (unlikely(rc != SQLITE_OK))
  291. goto bind_fail;
  292. int store_rc = sqlite3_step_monitored(res);
  293. if (unlikely(store_rc != SQLITE_DONE))
  294. error_report("Failed to store host %s, rc = %d", rrdhost_hostname(host), rc);
  295. rc = sqlite3_reset(res);
  296. if (unlikely(rc != SQLITE_OK))
  297. error_report("Failed to reset statement to store host %s, rc = %d", rrdhost_hostname(host), rc);
  298. return store_rc != SQLITE_DONE;
  299. bind_fail:
  300. error_report("Failed to bind %d parameter to store host %s, rc = %d", param, rrdhost_hostname(host), rc);
  301. rc = sqlite3_reset(res);
  302. if (unlikely(rc != SQLITE_OK))
  303. error_report("Failed to reset statement to store host %s, rc = %d", rrdhost_hostname(host), rc);
  304. return 1;
  305. }
  306. static void add_host_sysinfo_key_value(const char *name, const char *value, void *data)
  307. {
  308. struct query_build *lb = data;
  309. if (unlikely(!value))
  310. return;
  311. if (unlikely(!lb->count))
  312. buffer_sprintf(
  313. lb->sql, STORE_HOST_INFO);
  314. else
  315. buffer_strcat(lb->sql, ", ");
  316. buffer_sprintf(lb->sql, STORE_HOST_INFO_VALUES, lb->uuid_str, name, value);
  317. lb->count++;
  318. }
  319. static bool build_host_system_info_statements(RRDHOST *host, BUFFER *work_buffer)
  320. {
  321. struct rrdhost_system_info *system_info = host->system_info;
  322. if (unlikely(!system_info))
  323. return false;
  324. buffer_flush(work_buffer);
  325. struct query_build key_data = {.sql = work_buffer, .count = 0};
  326. uuid_unparse_lower(host->host_uuid, key_data.uuid_str);
  327. add_host_sysinfo_key_value("NETDATA_CONTAINER_OS_NAME", system_info->container_os_name, &key_data);
  328. add_host_sysinfo_key_value("NETDATA_CONTAINER_OS_ID", system_info->container_os_id, &key_data);
  329. add_host_sysinfo_key_value("NETDATA_CONTAINER_OS_ID_LIKE", system_info->container_os_id_like, &key_data);
  330. add_host_sysinfo_key_value("NETDATA_CONTAINER_OS_VERSION", system_info->container_os_version, &key_data);
  331. add_host_sysinfo_key_value("NETDATA_CONTAINER_OS_VERSION_ID", system_info->container_os_version_id, &key_data);
  332. add_host_sysinfo_key_value("NETDATA_CONTAINER_OS_DETECTION", system_info->host_os_detection, &key_data);
  333. add_host_sysinfo_key_value("NETDATA_HOST_OS_NAME", system_info->host_os_name, &key_data);
  334. add_host_sysinfo_key_value("NETDATA_HOST_OS_ID", system_info->host_os_id, &key_data);
  335. add_host_sysinfo_key_value("NETDATA_HOST_OS_ID_LIKE", system_info->host_os_id_like, &key_data);
  336. add_host_sysinfo_key_value("NETDATA_HOST_OS_VERSION", system_info->host_os_version, &key_data);
  337. add_host_sysinfo_key_value("NETDATA_HOST_OS_VERSION_ID", system_info->host_os_version_id, &key_data);
  338. add_host_sysinfo_key_value("NETDATA_HOST_OS_DETECTION", system_info->host_os_detection, &key_data);
  339. add_host_sysinfo_key_value("NETDATA_SYSTEM_KERNEL_NAME", system_info->kernel_name, &key_data);
  340. add_host_sysinfo_key_value("NETDATA_SYSTEM_CPU_LOGICAL_CPU_COUNT", system_info->host_cores, &key_data);
  341. add_host_sysinfo_key_value("NETDATA_SYSTEM_CPU_FREQ", system_info->host_cpu_freq, &key_data);
  342. add_host_sysinfo_key_value("NETDATA_SYSTEM_TOTAL_RAM", system_info->host_ram_total, &key_data);
  343. add_host_sysinfo_key_value("NETDATA_SYSTEM_TOTAL_DISK_SIZE", system_info->host_disk_space, &key_data);
  344. add_host_sysinfo_key_value("NETDATA_SYSTEM_KERNEL_VERSION", system_info->kernel_version, &key_data);
  345. add_host_sysinfo_key_value("NETDATA_SYSTEM_ARCHITECTURE", system_info->architecture, &key_data);
  346. add_host_sysinfo_key_value("NETDATA_SYSTEM_VIRTUALIZATION", system_info->virtualization, &key_data);
  347. add_host_sysinfo_key_value("NETDATA_SYSTEM_VIRT_DETECTION", system_info->virt_detection, &key_data);
  348. add_host_sysinfo_key_value("NETDATA_SYSTEM_CONTAINER", system_info->container, &key_data);
  349. add_host_sysinfo_key_value("NETDATA_SYSTEM_CONTAINER_DETECTION", system_info->container_detection, &key_data);
  350. add_host_sysinfo_key_value("NETDATA_HOST_IS_K8S_NODE", system_info->is_k8s_node, &key_data);
  351. return true;
  352. }
  353. /*
  354. * Store a chart in the database
  355. */
  356. static int store_chart_metadata(RRDSET *st)
  357. {
  358. static __thread sqlite3_stmt *res = NULL;
  359. int rc, param = 0, store_rc = 0;
  360. if (unlikely(!db_meta)) {
  361. if (default_rrd_memory_mode != RRD_MEMORY_MODE_DBENGINE)
  362. return 0;
  363. error_report("Database has not been initialized");
  364. return 1;
  365. }
  366. if (unlikely(!res)) {
  367. rc = prepare_statement(db_meta, SQL_STORE_CHART, &res);
  368. if (unlikely(rc != SQLITE_OK)) {
  369. error_report("Failed to prepare statement to store chart, rc = %d", rc);
  370. return 1;
  371. }
  372. }
  373. rc = sqlite3_bind_blob(res, ++param, &st->chart_uuid, sizeof(st->chart_uuid), SQLITE_STATIC);
  374. if (unlikely(rc != SQLITE_OK))
  375. goto bind_fail;
  376. rc = sqlite3_bind_blob(res, ++param, &st->rrdhost->host_uuid, sizeof(st->rrdhost->host_uuid), SQLITE_STATIC);
  377. if (unlikely(rc != SQLITE_OK))
  378. goto bind_fail;
  379. rc = sqlite3_bind_text(res, ++param, string2str(st->parts.type), -1, SQLITE_STATIC);
  380. if (unlikely(rc != SQLITE_OK))
  381. goto bind_fail;
  382. rc = sqlite3_bind_text(res, ++param, string2str(st->parts.id), -1, SQLITE_STATIC);
  383. if (unlikely(rc != SQLITE_OK))
  384. goto bind_fail;
  385. const char *name = string2str(st->parts.name);
  386. if (name && *name)
  387. rc = sqlite3_bind_text(res, ++param, name, -1, SQLITE_STATIC);
  388. else
  389. rc = sqlite3_bind_null(res, ++param);
  390. if (unlikely(rc != SQLITE_OK))
  391. goto bind_fail;
  392. rc = sqlite3_bind_text(res, ++param, rrdset_family(st), -1, SQLITE_STATIC);
  393. if (unlikely(rc != SQLITE_OK))
  394. goto bind_fail;
  395. rc = sqlite3_bind_text(res, ++param, rrdset_context(st), -1, SQLITE_STATIC);
  396. if (unlikely(rc != SQLITE_OK))
  397. goto bind_fail;
  398. rc = sqlite3_bind_text(res, ++param, rrdset_title(st), -1, SQLITE_STATIC);
  399. if (unlikely(rc != SQLITE_OK))
  400. goto bind_fail;
  401. rc = sqlite3_bind_text(res, ++param, rrdset_units(st), -1, SQLITE_STATIC);
  402. if (unlikely(rc != SQLITE_OK))
  403. goto bind_fail;
  404. rc = sqlite3_bind_text(res, ++param, rrdset_plugin_name(st), -1, SQLITE_STATIC);
  405. if (unlikely(rc != SQLITE_OK))
  406. goto bind_fail;
  407. rc = sqlite3_bind_text(res, ++param, rrdset_module_name(st), -1, SQLITE_STATIC);
  408. if (unlikely(rc != SQLITE_OK))
  409. goto bind_fail;
  410. rc = sqlite3_bind_int(res, ++param, (int) st->priority);
  411. if (unlikely(rc != SQLITE_OK))
  412. goto bind_fail;
  413. rc = sqlite3_bind_int(res, ++param, st->update_every);
  414. if (unlikely(rc != SQLITE_OK))
  415. goto bind_fail;
  416. rc = sqlite3_bind_int(res, ++param, st->chart_type);
  417. if (unlikely(rc != SQLITE_OK))
  418. goto bind_fail;
  419. rc = sqlite3_bind_int(res, ++param, st->rrd_memory_mode);
  420. if (unlikely(rc != SQLITE_OK))
  421. goto bind_fail;
  422. rc = sqlite3_bind_int(res, ++param, (int) st->entries);
  423. if (unlikely(rc != SQLITE_OK))
  424. goto bind_fail;
  425. store_rc = execute_insert(res);
  426. if (unlikely(store_rc != SQLITE_DONE))
  427. error_report("Failed to store chart, rc = %d", store_rc);
  428. rc = sqlite3_reset(res);
  429. if (unlikely(rc != SQLITE_OK))
  430. error_report("Failed to reset statement in chart store function, rc = %d", rc);
  431. return store_rc != SQLITE_DONE;
  432. bind_fail:
  433. error_report("Failed to bind parameter %d to store chart, rc = %d", param, rc);
  434. rc = sqlite3_reset(res);
  435. if (unlikely(rc != SQLITE_OK))
  436. error_report("Failed to reset statement in chart store function, rc = %d", rc);
  437. return 1;
  438. }
  439. /*
  440. * Store a dimension
  441. */
  442. static int store_dimension_metadata(RRDDIM *rd)
  443. {
  444. static __thread sqlite3_stmt *res = NULL;
  445. int rc, param = 0;
  446. if (unlikely(!db_meta)) {
  447. if (default_rrd_memory_mode != RRD_MEMORY_MODE_DBENGINE)
  448. return 0;
  449. error_report("Database has not been initialized");
  450. return 1;
  451. }
  452. if (unlikely(!res)) {
  453. rc = prepare_statement(db_meta, SQL_STORE_DIMENSION, &res);
  454. if (unlikely(rc != SQLITE_OK)) {
  455. error_report("Failed to prepare statement to store dimension, rc = %d", rc);
  456. return 1;
  457. }
  458. }
  459. rc = sqlite3_bind_blob(res, ++param, &rd->metric_uuid, sizeof(rd->metric_uuid), SQLITE_STATIC);
  460. if (unlikely(rc != SQLITE_OK))
  461. goto bind_fail;
  462. rc = sqlite3_bind_blob(res, ++param, &rd->rrdset->chart_uuid, sizeof(rd->rrdset->chart_uuid), SQLITE_STATIC);
  463. if (unlikely(rc != SQLITE_OK))
  464. goto bind_fail;
  465. rc = sqlite3_bind_text(res, ++param, string2str(rd->id), -1, SQLITE_STATIC);
  466. if (unlikely(rc != SQLITE_OK))
  467. goto bind_fail;
  468. rc = sqlite3_bind_text(res, ++param, string2str(rd->name), -1, SQLITE_STATIC);
  469. if (unlikely(rc != SQLITE_OK))
  470. goto bind_fail;
  471. rc = sqlite3_bind_int(res, ++param, (int) rd->multiplier);
  472. if (unlikely(rc != SQLITE_OK))
  473. goto bind_fail;
  474. rc = sqlite3_bind_int(res, ++param, (int ) rd->divisor);
  475. if (unlikely(rc != SQLITE_OK))
  476. goto bind_fail;
  477. rc = sqlite3_bind_int(res, ++param, rd->algorithm);
  478. if (unlikely(rc != SQLITE_OK))
  479. goto bind_fail;
  480. if (rrddim_option_check(rd, RRDDIM_OPTION_HIDDEN))
  481. rc = sqlite3_bind_text(res, ++param, "hidden", -1, SQLITE_STATIC);
  482. else
  483. rc = sqlite3_bind_null(res, ++param);
  484. if (unlikely(rc != SQLITE_OK))
  485. goto bind_fail;
  486. rc = execute_insert(res);
  487. if (unlikely(rc != SQLITE_DONE))
  488. error_report("Failed to store dimension, rc = %d", rc);
  489. rc = sqlite3_reset(res);
  490. if (unlikely(rc != SQLITE_OK))
  491. error_report("Failed to reset statement in store dimension, rc = %d", rc);
  492. return 0;
  493. bind_fail:
  494. error_report("Failed to bind parameter %d to store dimension, rc = %d", param, rc);
  495. rc = sqlite3_reset(res);
  496. if (unlikely(rc != SQLITE_OK))
  497. error_report("Failed to reset statement in store dimension, rc = %d", rc);
  498. return 1;
  499. }
  500. static bool dimension_can_be_deleted(uuid_t *dim_uuid __maybe_unused)
  501. {
  502. #ifdef ENABLE_DBENGINE
  503. if(dbengine_enabled) {
  504. bool no_retention = true;
  505. for (size_t tier = 0; tier < storage_tiers; tier++) {
  506. if (!multidb_ctx[tier])
  507. continue;
  508. time_t first_time_t = 0, last_time_t = 0;
  509. if (rrdeng_metric_retention_by_uuid((void *) multidb_ctx[tier], dim_uuid, &first_time_t, &last_time_t)) {
  510. if (first_time_t > 0) {
  511. no_retention = false;
  512. break;
  513. }
  514. }
  515. }
  516. return no_retention;
  517. }
  518. else
  519. return false;
  520. #else
  521. return false;
  522. #endif
  523. }
  524. static void check_dimension_metadata(struct metadata_wc *wc)
  525. {
  526. int rc;
  527. sqlite3_stmt *res = NULL;
  528. rc = sqlite3_prepare_v2(db_meta, SELECT_DIMENSION_LIST, -1, &res, 0);
  529. if (unlikely(rc != SQLITE_OK)) {
  530. error_report("Failed to prepare statement to fetch host dimensions");
  531. return;
  532. }
  533. rc = sqlite3_bind_int64(res, 1, (sqlite3_int64) wc->row_id);
  534. if (unlikely(rc != SQLITE_OK)) {
  535. error_report("Failed to row parameter");
  536. goto skip_run;
  537. }
  538. uint32_t total_checked = 0;
  539. uint32_t total_deleted= 0;
  540. uint64_t last_row_id = wc->row_id;
  541. info("METADATA: Checking dimensions starting after row %"PRIu64, wc->row_id);
  542. while (sqlite3_step_monitored(res) == SQLITE_ROW && total_deleted < MAX_METADATA_CLEANUP) {
  543. if (unlikely(metadata_flag_check(wc, METADATA_FLAG_SHUTDOWN)))
  544. break;
  545. last_row_id = sqlite3_column_int64(res, 1);
  546. rc = dimension_can_be_deleted((uuid_t *)sqlite3_column_blob(res, 0));
  547. if (rc == true) {
  548. delete_dimension_uuid((uuid_t *)sqlite3_column_blob(res, 0));
  549. total_deleted++;
  550. }
  551. total_checked++;
  552. }
  553. wc->row_id = last_row_id;
  554. time_t now = now_realtime_sec();
  555. if (total_deleted > 0) {
  556. wc->check_metadata_after = now + METADATA_MAINTENANCE_RETRY;
  557. } else
  558. wc->row_id = 0;
  559. info("METADATA: Checked %u, deleted %u -- will resume after row %"PRIu64" in %lld seconds", total_checked, total_deleted, wc->row_id,
  560. (long long)(wc->check_metadata_after - now));
  561. skip_run:
  562. rc = sqlite3_finalize(res);
  563. if (unlikely(rc != SQLITE_OK))
  564. error_report("Failed to finalize the prepared statement when reading dimensions");
  565. }
  566. static void cleanup_health_log(void)
  567. {
  568. RRDHOST *host;
  569. dfe_start_reentrant(rrdhost_root_index, host) {
  570. if (rrdhost_flag_check(host, RRDHOST_FLAG_ARCHIVED))
  571. continue;
  572. sql_health_alarm_log_cleanup(host);
  573. }
  574. dfe_done(host);
  575. }
  576. //
  577. // EVENT LOOP STARTS HERE
  578. //
  579. static void metadata_init_cmd_queue(struct metadata_wc *wc)
  580. {
  581. wc->cmd_queue.head = wc->cmd_queue.tail = 0;
  582. wc->queue_size = 0;
  583. fatal_assert(0 == uv_cond_init(&wc->cmd_cond));
  584. fatal_assert(0 == uv_mutex_init(&wc->cmd_mutex));
  585. }
  586. int metadata_enq_cmd_noblock(struct metadata_wc *wc, struct metadata_cmd *cmd)
  587. {
  588. unsigned queue_size;
  589. /* wait for free space in queue */
  590. uv_mutex_lock(&wc->cmd_mutex);
  591. if (cmd->opcode == METADATA_SYNC_SHUTDOWN) {
  592. metadata_flag_set(wc, METADATA_FLAG_SHUTDOWN);
  593. uv_mutex_unlock(&wc->cmd_mutex);
  594. return 0;
  595. }
  596. if (unlikely((queue_size = wc->queue_size) == METADATA_CMD_Q_MAX_SIZE ||
  597. metadata_flag_check(wc, METADATA_FLAG_SHUTDOWN))) {
  598. uv_mutex_unlock(&wc->cmd_mutex);
  599. return 1;
  600. }
  601. fatal_assert(queue_size < METADATA_CMD_Q_MAX_SIZE);
  602. /* enqueue command */
  603. wc->cmd_queue.cmd_array[wc->cmd_queue.tail] = *cmd;
  604. wc->cmd_queue.tail = wc->cmd_queue.tail != METADATA_CMD_Q_MAX_SIZE - 1 ?
  605. wc->cmd_queue.tail + 1 : 0;
  606. wc->queue_size = queue_size + 1;
  607. uv_mutex_unlock(&wc->cmd_mutex);
  608. return 0;
  609. }
  610. static void metadata_enq_cmd(struct metadata_wc *wc, struct metadata_cmd *cmd)
  611. {
  612. unsigned queue_size;
  613. /* wait for free space in queue */
  614. uv_mutex_lock(&wc->cmd_mutex);
  615. if (unlikely(metadata_flag_check(wc, METADATA_FLAG_SHUTDOWN))) {
  616. uv_mutex_unlock(&wc->cmd_mutex);
  617. (void) uv_async_send(&wc->async);
  618. return;
  619. }
  620. if (cmd->opcode == METADATA_SYNC_SHUTDOWN) {
  621. metadata_flag_set(wc, METADATA_FLAG_SHUTDOWN);
  622. uv_mutex_unlock(&wc->cmd_mutex);
  623. (void) uv_async_send(&wc->async);
  624. return;
  625. }
  626. while ((queue_size = wc->queue_size) == METADATA_CMD_Q_MAX_SIZE) {
  627. if (unlikely(metadata_flag_check(wc, METADATA_FLAG_SHUTDOWN))) {
  628. uv_mutex_unlock(&wc->cmd_mutex);
  629. return;
  630. }
  631. uv_cond_wait(&wc->cmd_cond, &wc->cmd_mutex);
  632. }
  633. fatal_assert(queue_size < METADATA_CMD_Q_MAX_SIZE);
  634. /* enqueue command */
  635. wc->cmd_queue.cmd_array[wc->cmd_queue.tail] = *cmd;
  636. wc->cmd_queue.tail = wc->cmd_queue.tail != METADATA_CMD_Q_MAX_SIZE - 1 ?
  637. wc->cmd_queue.tail + 1 : 0;
  638. wc->queue_size = queue_size + 1;
  639. uv_mutex_unlock(&wc->cmd_mutex);
  640. /* wake up event loop */
  641. (void) uv_async_send(&wc->async);
  642. }
  643. static struct metadata_cmd metadata_deq_cmd(struct metadata_wc *wc)
  644. {
  645. struct metadata_cmd ret;
  646. unsigned queue_size;
  647. uv_mutex_lock(&wc->cmd_mutex);
  648. queue_size = wc->queue_size;
  649. if (queue_size == 0) {
  650. memset(&ret, 0, sizeof(ret));
  651. ret.opcode = METADATA_DATABASE_NOOP;
  652. ret.completion = NULL;
  653. } else {
  654. /* dequeue command */
  655. ret = wc->cmd_queue.cmd_array[wc->cmd_queue.head];
  656. if (queue_size == 1) {
  657. wc->cmd_queue.head = wc->cmd_queue.tail = 0;
  658. } else {
  659. wc->cmd_queue.head = wc->cmd_queue.head != METADATA_CMD_Q_MAX_SIZE - 1 ?
  660. wc->cmd_queue.head + 1 : 0;
  661. }
  662. wc->queue_size = queue_size - 1;
  663. /* wake up producers */
  664. uv_cond_signal(&wc->cmd_cond);
  665. }
  666. uv_mutex_unlock(&wc->cmd_mutex);
  667. return ret;
  668. }
  669. static void async_cb(uv_async_t *handle)
  670. {
  671. uv_stop(handle->loop);
  672. uv_update_time(handle->loop);
  673. }
  674. #define TIMER_INITIAL_PERIOD_MS (1000)
  675. #define TIMER_REPEAT_PERIOD_MS (1000)
  676. static void timer_cb(uv_timer_t* handle)
  677. {
  678. uv_stop(handle->loop);
  679. uv_update_time(handle->loop);
  680. struct metadata_wc *wc = handle->data;
  681. struct metadata_cmd cmd;
  682. memset(&cmd, 0, sizeof(cmd));
  683. time_t now = now_realtime_sec();
  684. if (wc->check_metadata_after && wc->check_metadata_after < now) {
  685. cmd.opcode = METADATA_MAINTENANCE;
  686. if (!metadata_enq_cmd_noblock(wc, &cmd))
  687. wc->check_metadata_after = now + METADATA_MAINTENANCE_INTERVAL;
  688. }
  689. if (wc->check_hosts_after && wc->check_hosts_after < now) {
  690. cmd.opcode = METADATA_SCAN_HOSTS;
  691. if (!metadata_enq_cmd_noblock(wc, &cmd))
  692. wc->check_hosts_after = now + METADATA_HOST_CHECK_INTERVAL;
  693. }
  694. }
  695. static void after_metadata_cleanup(uv_work_t *req, int status)
  696. {
  697. UNUSED(status);
  698. struct metadata_wc *wc = req->data;
  699. metadata_flag_clear(wc, METADATA_FLAG_CLEANUP);
  700. }
  701. static void start_metadata_cleanup(uv_work_t *req)
  702. {
  703. register_libuv_worker_jobs();
  704. worker_is_busy(UV_EVENT_METADATA_CLEANUP);
  705. struct metadata_wc *wc = req->data;
  706. check_dimension_metadata(wc);
  707. cleanup_health_log();
  708. (void) sqlite3_wal_checkpoint(db_meta, NULL);
  709. worker_is_idle();
  710. }
  711. struct scan_metadata_payload {
  712. uv_work_t request;
  713. struct metadata_wc *wc;
  714. struct completion *completion;
  715. BUFFER *work_buffer;
  716. uint32_t max_count;
  717. };
  718. struct host_context_load_thread {
  719. uv_thread_t thread;
  720. RRDHOST *host;
  721. bool busy;
  722. bool finished;
  723. };
  724. static void restore_host_context(void *arg)
  725. {
  726. struct host_context_load_thread *hclt = arg;
  727. RRDHOST *host = hclt->host;
  728. usec_t started_ut = now_monotonic_usec(); (void)started_ut;
  729. rrdhost_load_rrdcontext_data(host);
  730. usec_t ended_ut = now_monotonic_usec(); (void)ended_ut;
  731. rrdhost_flag_clear(host, RRDHOST_FLAG_PENDING_CONTEXT_LOAD | RRDHOST_FLAG_CONTEXT_LOAD_IN_PROGRESS);
  732. #ifdef ENABLE_ACLK
  733. aclk_queue_node_info(host, false);
  734. #endif
  735. internal_error(true, "METADATA: 'host:%s' context load in %0.2f ms", rrdhost_hostname(host),
  736. (double)(ended_ut - started_ut) / USEC_PER_MS);
  737. __atomic_store_n(&hclt->finished, true, __ATOMIC_RELEASE);
  738. }
  739. // Callback after scan of hosts is done
  740. static void after_start_host_load_context(uv_work_t *req, int status __maybe_unused)
  741. {
  742. struct scan_metadata_payload *data = req->data;
  743. freez(data);
  744. }
  745. #define MAX_FIND_THREAD_RETRIES (10)
  746. static void cleanup_finished_threads(struct host_context_load_thread *hclt, size_t max_thread_slots, bool wait)
  747. {
  748. for (size_t index = 0; index < max_thread_slots; index++) {
  749. if (__atomic_load_n(&(hclt[index].finished), __ATOMIC_RELAXED)
  750. || (wait && __atomic_load_n(&(hclt[index].busy), __ATOMIC_ACQUIRE))) {
  751. int rc = uv_thread_join(&(hclt[index].thread));
  752. if (rc)
  753. error("Failed to join thread, rc = %d",rc);
  754. __atomic_store_n(&(hclt[index].busy), false, __ATOMIC_RELEASE);
  755. __atomic_store_n(&(hclt[index].finished), false, __ATOMIC_RELEASE);
  756. }
  757. }
  758. }
  759. static size_t find_available_thread_slot(struct host_context_load_thread *hclt, size_t max_thread_slots, size_t *found_index)
  760. {
  761. size_t retries = MAX_FIND_THREAD_RETRIES;
  762. while (retries--) {
  763. size_t index = 0;
  764. while (index < max_thread_slots) {
  765. if (false == __atomic_load_n(&(hclt[index].busy), __ATOMIC_ACQUIRE)) {
  766. *found_index = index;
  767. return true;
  768. }
  769. index++;
  770. }
  771. sleep_usec(10 * USEC_PER_MS);
  772. }
  773. return false;
  774. }
  775. static void start_all_host_load_context(uv_work_t *req __maybe_unused)
  776. {
  777. register_libuv_worker_jobs();
  778. struct scan_metadata_payload *data = req->data;
  779. UNUSED(data);
  780. worker_is_busy(UV_EVENT_HOST_CONTEXT_LOAD);
  781. usec_t started_ut = now_monotonic_usec(); (void)started_ut;
  782. RRDHOST *host;
  783. size_t max_threads = MIN(get_netdata_cpus() / 2, 6);
  784. struct host_context_load_thread *hclt = callocz(max_threads, sizeof(*hclt));
  785. size_t thread_index;
  786. dfe_start_reentrant(rrdhost_root_index, host) {
  787. if (rrdhost_flag_check(host, RRDHOST_FLAG_CONTEXT_LOAD_IN_PROGRESS) ||
  788. !rrdhost_flag_check(host, RRDHOST_FLAG_PENDING_CONTEXT_LOAD))
  789. continue;
  790. rrdhost_flag_set(host, RRDHOST_FLAG_CONTEXT_LOAD_IN_PROGRESS);
  791. internal_error(true, "METADATA: 'host:%s' loading context", rrdhost_hostname(host));
  792. cleanup_finished_threads(hclt, max_threads, false);
  793. bool found_slot = find_available_thread_slot(hclt, max_threads, &thread_index);
  794. if (unlikely(!found_slot)) {
  795. struct host_context_load_thread hclt_sync = {.host = host};
  796. restore_host_context(&hclt_sync);
  797. }
  798. else {
  799. __atomic_store_n(&hclt[thread_index].busy, true, __ATOMIC_RELAXED);
  800. hclt[thread_index].host = host;
  801. assert(0 == uv_thread_create(&hclt[thread_index].thread, restore_host_context, &hclt[thread_index]));
  802. }
  803. }
  804. dfe_done(host);
  805. cleanup_finished_threads(hclt, max_threads, true);
  806. freez(hclt);
  807. usec_t ended_ut = now_monotonic_usec(); (void)ended_ut;
  808. internal_error(true, "METADATA: 'host:ALL' contexts loaded in %0.2f ms", (double)(ended_ut - started_ut) / USEC_PER_MS);
  809. worker_is_idle();
  810. }
  811. // Callback after scan of hosts is done
  812. static void after_metadata_hosts(uv_work_t *req, int status __maybe_unused)
  813. {
  814. struct scan_metadata_payload *data = req->data;
  815. struct metadata_wc *wc = data->wc;
  816. metadata_flag_clear(wc, METADATA_FLAG_SCANNING_HOSTS);
  817. internal_error(true, "METADATA: scanning hosts complete");
  818. if (unlikely(data->completion)) {
  819. completion_mark_complete(data->completion);
  820. internal_error(true, "METADATA: Sending completion done");
  821. }
  822. freez(data);
  823. }
  824. static bool metadata_scan_host(RRDHOST *host, uint32_t max_count, bool use_transaction, BUFFER *work_buffer, size_t *query_counter) {
  825. RRDSET *st;
  826. int rc;
  827. bool more_to_do = false;
  828. uint32_t scan_count = 1;
  829. if (use_transaction)
  830. (void)db_execute(db_meta, "BEGIN TRANSACTION;");
  831. rrdset_foreach_reentrant(st, host) {
  832. if (scan_count == max_count) {
  833. more_to_do = true;
  834. break;
  835. }
  836. if(rrdset_flag_check(st, RRDSET_FLAG_METADATA_UPDATE)) {
  837. (*query_counter)++;
  838. rrdset_flag_clear(st, RRDSET_FLAG_METADATA_UPDATE);
  839. scan_count++;
  840. buffer_flush(work_buffer);
  841. rc = check_and_update_chart_labels(st, work_buffer, query_counter);
  842. if (unlikely(rc))
  843. error_report("METADATA: 'host:%s': Failed to update labels for chart %s", rrdhost_hostname(host), rrdset_name(st));
  844. else
  845. (*query_counter)++;
  846. rc = store_chart_metadata(st);
  847. if (unlikely(rc))
  848. error_report("METADATA: 'host:%s': Failed to store metadata for chart %s", rrdhost_hostname(host), rrdset_name(st));
  849. }
  850. RRDDIM *rd;
  851. rrddim_foreach_read(rd, st) {
  852. if(rrddim_flag_check(rd, RRDDIM_FLAG_METADATA_UPDATE)) {
  853. (*query_counter)++;
  854. rrddim_flag_clear(rd, RRDDIM_FLAG_METADATA_UPDATE);
  855. if (rrddim_option_check(rd, RRDDIM_OPTION_HIDDEN))
  856. rrddim_flag_set(rd, RRDDIM_FLAG_META_HIDDEN);
  857. else
  858. rrddim_flag_clear(rd, RRDDIM_FLAG_META_HIDDEN);
  859. rc = store_dimension_metadata(rd);
  860. if (unlikely(rc))
  861. error_report("METADATA: 'host:%s': Failed to dimension metadata for chart %s. dimension %s",
  862. rrdhost_hostname(host), rrdset_name(st),
  863. rrddim_name(rd));
  864. }
  865. }
  866. rrddim_foreach_done(rd);
  867. }
  868. rrdset_foreach_done(st);
  869. if (use_transaction)
  870. (void)db_execute(db_meta, "COMMIT TRANSACTION;");
  871. return more_to_do;
  872. }
  873. static void store_host_and_system_info(RRDHOST *host, BUFFER *work_buffer, size_t *query_counter)
  874. {
  875. bool free_work_buffer = (NULL == work_buffer);
  876. if (unlikely(free_work_buffer))
  877. work_buffer = buffer_create(1024, &netdata_buffers_statistics.buffers_sqlite);
  878. if (build_host_system_info_statements(host, work_buffer)) {
  879. int rc = db_execute(db_meta, buffer_tostring(work_buffer));
  880. if (unlikely(rc)) {
  881. error_report("METADATA: 'host:%s': Failed to store host updated information in the database", rrdhost_hostname(host));
  882. rrdhost_flag_set(host, RRDHOST_FLAG_METADATA_INFO | RRDHOST_FLAG_METADATA_UPDATE);
  883. }
  884. else {
  885. if (likely(query_counter))
  886. (*query_counter)++;
  887. }
  888. }
  889. if (unlikely(store_host_metadata(host))) {
  890. error_report("METADATA: 'host:%s': Failed to store host info in the database", rrdhost_hostname(host));
  891. rrdhost_flag_set(host, RRDHOST_FLAG_METADATA_INFO | RRDHOST_FLAG_METADATA_UPDATE);
  892. }
  893. else {
  894. if (likely(query_counter))
  895. (*query_counter)++;
  896. }
  897. if (unlikely(free_work_buffer))
  898. buffer_free(work_buffer);
  899. }
  900. // Worker thread to scan hosts for pending metadata to store
  901. static void start_metadata_hosts(uv_work_t *req __maybe_unused)
  902. {
  903. register_libuv_worker_jobs();
  904. RRDHOST *host;
  905. int transaction_started = 0;
  906. struct scan_metadata_payload *data = req->data;
  907. struct metadata_wc *wc = data->wc;
  908. BUFFER *work_buffer = data->work_buffer;
  909. usec_t all_started_ut = now_monotonic_usec(); (void)all_started_ut;
  910. internal_error(true, "METADATA: checking all hosts...");
  911. usec_t started_ut = now_monotonic_usec(); (void)started_ut;
  912. bool run_again = false;
  913. worker_is_busy(UV_EVENT_METADATA_STORE);
  914. if (!data->max_count)
  915. transaction_started = !db_execute(db_meta, "BEGIN TRANSACTION;");
  916. dfe_start_reentrant(rrdhost_root_index, host) {
  917. if (rrdhost_flag_check(host, RRDHOST_FLAG_ARCHIVED) || !rrdhost_flag_check(host, RRDHOST_FLAG_METADATA_UPDATE))
  918. continue;
  919. size_t query_counter = 0; (void)query_counter;
  920. rrdhost_flag_clear(host,RRDHOST_FLAG_METADATA_UPDATE);
  921. if (unlikely(rrdhost_flag_check(host, RRDHOST_FLAG_METADATA_LABELS))) {
  922. rrdhost_flag_clear(host, RRDHOST_FLAG_METADATA_LABELS);
  923. int rc = exec_statement_with_uuid(SQL_DELETE_HOST_LABELS, &host->host_uuid);
  924. if (likely(!rc)) {
  925. query_counter++;
  926. buffer_flush(work_buffer);
  927. struct query_build tmp = {.sql = work_buffer, .count = 0};
  928. uuid_unparse_lower(host->host_uuid, tmp.uuid_str);
  929. rrdlabels_walkthrough_read(host->rrdlabels, host_label_store_to_sql_callback, &tmp);
  930. rc = db_execute(db_meta, buffer_tostring(work_buffer));
  931. if (unlikely(rc)) {
  932. error_report("METADATA: 'host:%s': failed to update metadata host labels", rrdhost_hostname(host));
  933. rrdhost_flag_set(host, RRDHOST_FLAG_METADATA_LABELS | RRDHOST_FLAG_METADATA_UPDATE);
  934. }
  935. else
  936. query_counter++;
  937. } else {
  938. error_report("METADATA: 'host:%s': failed to delete old host labels", rrdhost_hostname(host));
  939. rrdhost_flag_set(host, RRDHOST_FLAG_METADATA_LABELS | RRDHOST_FLAG_METADATA_UPDATE);
  940. }
  941. }
  942. if (unlikely(rrdhost_flag_check(host, RRDHOST_FLAG_METADATA_CLAIMID))) {
  943. rrdhost_flag_clear(host, RRDHOST_FLAG_METADATA_CLAIMID);
  944. uuid_t uuid;
  945. int rc;
  946. if (likely(host->aclk_state.claimed_id && !uuid_parse(host->aclk_state.claimed_id, uuid)))
  947. rc = store_claim_id(&host->host_uuid, &uuid);
  948. else
  949. rc = store_claim_id(&host->host_uuid, NULL);
  950. if (unlikely(rc))
  951. rrdhost_flag_set(host, RRDHOST_FLAG_METADATA_CLAIMID | RRDHOST_FLAG_METADATA_UPDATE);
  952. else
  953. query_counter++;
  954. }
  955. if (unlikely(rrdhost_flag_check(host, RRDHOST_FLAG_METADATA_INFO))) {
  956. rrdhost_flag_clear(host, RRDHOST_FLAG_METADATA_INFO);
  957. store_host_and_system_info(host, work_buffer, &query_counter);
  958. }
  959. // For clarity
  960. bool use_transaction = data->max_count;
  961. if (unlikely(metadata_scan_host(host, data->max_count, use_transaction, work_buffer, &query_counter))) {
  962. run_again = true;
  963. rrdhost_flag_set(host,RRDHOST_FLAG_METADATA_UPDATE);
  964. internal_error(true,"METADATA: 'host:%s': scheduling another run, more charts to store", rrdhost_hostname(host));
  965. }
  966. usec_t ended_ut = now_monotonic_usec(); (void)ended_ut;
  967. internal_error(true, "METADATA: 'host:%s': saved metadata with %zu SQL statements, in %0.2f ms",
  968. rrdhost_hostname(host), query_counter,
  969. (double)(ended_ut - started_ut) / USEC_PER_MS);
  970. }
  971. dfe_done(host);
  972. if (!data->max_count && transaction_started)
  973. transaction_started = db_execute(db_meta, "COMMIT TRANSACTION;");
  974. usec_t all_ended_ut = now_monotonic_usec(); (void)all_ended_ut;
  975. internal_error(true, "METADATA: checking all hosts completed in %0.2f ms",
  976. (double)(all_ended_ut - all_started_ut) / USEC_PER_MS);
  977. if (unlikely(run_again))
  978. wc->check_hosts_after = now_realtime_sec() + METADATA_HOST_CHECK_IMMEDIATE;
  979. else
  980. wc->check_hosts_after = now_realtime_sec() + METADATA_HOST_CHECK_INTERVAL;
  981. worker_is_idle();
  982. }
  983. static void metadata_event_loop(void *arg)
  984. {
  985. worker_register("METASYNC");
  986. worker_register_job_name(METADATA_DATABASE_NOOP, "noop");
  987. worker_register_job_name(METADATA_DATABASE_TIMER, "timer");
  988. worker_register_job_name(METADATA_DEL_DIMENSION, "delete dimension");
  989. worker_register_job_name(METADATA_STORE_CLAIM_ID, "add claim id");
  990. worker_register_job_name(METADATA_ADD_HOST_INFO, "add host info");
  991. worker_register_job_name(METADATA_MAINTENANCE, "maintenance");
  992. worker_register_job_name(METADATA_ML_LOAD_MODELS, "ml load models");
  993. int ret;
  994. uv_loop_t *loop;
  995. unsigned cmd_batch_size;
  996. struct metadata_wc *wc = arg;
  997. enum metadata_opcode opcode;
  998. uv_work_t metadata_cleanup_worker;
  999. uv_thread_set_name_np(wc->thread, "METASYNC");
  1000. // service_register(SERVICE_THREAD_TYPE_EVENT_LOOP, NULL, NULL, NULL, true);
  1001. loop = wc->loop = mallocz(sizeof(uv_loop_t));
  1002. ret = uv_loop_init(loop);
  1003. if (ret) {
  1004. error("uv_loop_init(): %s", uv_strerror(ret));
  1005. goto error_after_loop_init;
  1006. }
  1007. loop->data = wc;
  1008. ret = uv_async_init(wc->loop, &wc->async, async_cb);
  1009. if (ret) {
  1010. error("uv_async_init(): %s", uv_strerror(ret));
  1011. goto error_after_async_init;
  1012. }
  1013. wc->async.data = wc;
  1014. ret = uv_timer_init(loop, &wc->timer_req);
  1015. if (ret) {
  1016. error("uv_timer_init(): %s", uv_strerror(ret));
  1017. goto error_after_timer_init;
  1018. }
  1019. wc->timer_req.data = wc;
  1020. fatal_assert(0 == uv_timer_start(&wc->timer_req, timer_cb, TIMER_INITIAL_PERIOD_MS, TIMER_REPEAT_PERIOD_MS));
  1021. info("Starting metadata sync thread with %d entries command queue", METADATA_CMD_Q_MAX_SIZE);
  1022. struct metadata_cmd cmd;
  1023. memset(&cmd, 0, sizeof(cmd));
  1024. metadata_flag_clear(wc, METADATA_FLAG_CLEANUP);
  1025. metadata_flag_clear(wc, METADATA_FLAG_SCANNING_HOSTS);
  1026. wc->check_metadata_after = now_realtime_sec() + METADATA_MAINTENANCE_FIRST_CHECK;
  1027. wc->check_hosts_after = now_realtime_sec() + METADATA_HOST_CHECK_FIRST_CHECK;
  1028. int shutdown = 0;
  1029. wc->row_id = 0;
  1030. completion_mark_complete(&wc->init_complete);
  1031. BUFFER *work_buffer = buffer_create(1024, &netdata_buffers_statistics.buffers_sqlite);
  1032. struct scan_metadata_payload *data;
  1033. while (shutdown == 0 || (wc->flags & METADATA_WORKER_BUSY)) {
  1034. uuid_t *uuid;
  1035. RRDHOST *host = NULL;
  1036. worker_is_idle();
  1037. uv_run(loop, UV_RUN_DEFAULT);
  1038. /* wait for commands */
  1039. cmd_batch_size = 0;
  1040. do {
  1041. if (unlikely(cmd_batch_size >= METADATA_MAX_BATCH_SIZE))
  1042. break;
  1043. cmd = metadata_deq_cmd(wc);
  1044. opcode = cmd.opcode;
  1045. if (unlikely(opcode == METADATA_DATABASE_NOOP && metadata_flag_check(wc, METADATA_FLAG_SHUTDOWN))) {
  1046. shutdown = 1;
  1047. continue;
  1048. }
  1049. ++cmd_batch_size;
  1050. if (likely(opcode != METADATA_DATABASE_NOOP))
  1051. worker_is_busy(opcode);
  1052. switch (opcode) {
  1053. case METADATA_DATABASE_NOOP:
  1054. case METADATA_DATABASE_TIMER:
  1055. break;
  1056. case METADATA_ML_LOAD_MODELS: {
  1057. RRDDIM *rd = (RRDDIM *) cmd.param[0];
  1058. ml_dimension_load_models(rd);
  1059. break;
  1060. }
  1061. case METADATA_DEL_DIMENSION:
  1062. uuid = (uuid_t *) cmd.param[0];
  1063. if (likely(dimension_can_be_deleted(uuid)))
  1064. delete_dimension_uuid(uuid);
  1065. freez(uuid);
  1066. break;
  1067. case METADATA_STORE_CLAIM_ID:
  1068. store_claim_id((uuid_t *) cmd.param[0], (uuid_t *) cmd.param[1]);
  1069. freez((void *) cmd.param[0]);
  1070. freez((void *) cmd.param[1]);
  1071. break;
  1072. case METADATA_ADD_HOST_INFO:
  1073. host = (RRDHOST *) cmd.param[0];
  1074. store_host_and_system_info(host, NULL, NULL);
  1075. break;
  1076. case METADATA_SCAN_HOSTS:
  1077. if (unlikely(metadata_flag_check(wc, METADATA_FLAG_SCANNING_HOSTS)))
  1078. break;
  1079. if (unittest_running)
  1080. break;
  1081. data = mallocz(sizeof(*data));
  1082. data->request.data = data;
  1083. data->wc = wc;
  1084. data->completion = cmd.completion; // Completion by the worker
  1085. data->work_buffer = work_buffer;
  1086. if (unlikely(cmd.completion)) {
  1087. data->max_count = 0; // 0 will process all pending updates
  1088. cmd.completion = NULL; // Do not complete after launching worker (worker will do)
  1089. }
  1090. else
  1091. data->max_count = 5000;
  1092. metadata_flag_set(wc, METADATA_FLAG_SCANNING_HOSTS);
  1093. if (unlikely(
  1094. uv_queue_work(loop,&data->request,
  1095. start_metadata_hosts,
  1096. after_metadata_hosts))) {
  1097. // Failed to launch worker -- let the event loop handle completion
  1098. cmd.completion = data->completion;
  1099. freez(data);
  1100. metadata_flag_clear(wc, METADATA_FLAG_SCANNING_HOSTS);
  1101. }
  1102. break;
  1103. case METADATA_LOAD_HOST_CONTEXT:;
  1104. if (unittest_running)
  1105. break;
  1106. data = callocz(1,sizeof(*data));
  1107. data->request.data = data;
  1108. data->wc = wc;
  1109. if (unlikely(
  1110. uv_queue_work(loop,&data->request, start_all_host_load_context,
  1111. after_start_host_load_context))) {
  1112. freez(data);
  1113. }
  1114. break;
  1115. case METADATA_MAINTENANCE:
  1116. if (unlikely(metadata_flag_check(wc, METADATA_FLAG_CLEANUP)))
  1117. break;
  1118. metadata_cleanup_worker.data = wc;
  1119. metadata_flag_set(wc, METADATA_FLAG_CLEANUP);
  1120. if (unlikely(
  1121. uv_queue_work(loop, &metadata_cleanup_worker, start_metadata_cleanup, after_metadata_cleanup))) {
  1122. metadata_flag_clear(wc, METADATA_FLAG_CLEANUP);
  1123. }
  1124. break;
  1125. case METADATA_UNITTEST:;
  1126. struct thread_unittest *tu = (struct thread_unittest *) cmd.param[0];
  1127. sleep_usec(1000); // processing takes 1ms
  1128. __atomic_fetch_add(&tu->processed, 1, __ATOMIC_SEQ_CST);
  1129. break;
  1130. default:
  1131. break;
  1132. }
  1133. if (cmd.completion)
  1134. completion_mark_complete(cmd.completion);
  1135. } while (opcode != METADATA_DATABASE_NOOP);
  1136. }
  1137. if (!uv_timer_stop(&wc->timer_req))
  1138. uv_close((uv_handle_t *)&wc->timer_req, NULL);
  1139. uv_close((uv_handle_t *)&wc->async, NULL);
  1140. uv_cond_destroy(&wc->cmd_cond);
  1141. int rc;
  1142. do {
  1143. rc = uv_loop_close(loop);
  1144. } while (rc != UV_EBUSY);
  1145. buffer_free(work_buffer);
  1146. freez(loop);
  1147. worker_unregister();
  1148. info("METADATA: Shutting down event loop");
  1149. completion_mark_complete(&wc->init_complete);
  1150. return;
  1151. error_after_timer_init:
  1152. uv_close((uv_handle_t *)&wc->async, NULL);
  1153. error_after_async_init:
  1154. fatal_assert(0 == uv_loop_close(loop));
  1155. error_after_loop_init:
  1156. freez(loop);
  1157. worker_unregister();
  1158. }
  1159. struct metadata_wc metasync_worker = {.loop = NULL};
  1160. void metadata_sync_shutdown(void)
  1161. {
  1162. completion_init(&metasync_worker.init_complete);
  1163. struct metadata_cmd cmd;
  1164. memset(&cmd, 0, sizeof(cmd));
  1165. info("METADATA: Sending a shutdown command");
  1166. cmd.opcode = METADATA_SYNC_SHUTDOWN;
  1167. metadata_enq_cmd(&metasync_worker, &cmd);
  1168. /* wait for metadata thread to shut down */
  1169. info("METADATA: Waiting for shutdown ACK");
  1170. completion_wait_for(&metasync_worker.init_complete);
  1171. completion_destroy(&metasync_worker.init_complete);
  1172. info("METADATA: Shutdown complete");
  1173. }
  1174. void metadata_sync_shutdown_prepare(void)
  1175. {
  1176. if (unlikely(!metasync_worker.loop))
  1177. return;
  1178. struct metadata_cmd cmd;
  1179. memset(&cmd, 0, sizeof(cmd));
  1180. struct completion compl;
  1181. completion_init(&compl);
  1182. info("METADATA: Sending a scan host command");
  1183. uint32_t max_wait_iterations = 2000;
  1184. while (unlikely(metadata_flag_check(&metasync_worker, METADATA_FLAG_SCANNING_HOSTS)) && max_wait_iterations--) {
  1185. if (max_wait_iterations == 1999)
  1186. info("METADATA: Current worker is running; waiting to finish");
  1187. sleep_usec(1000);
  1188. }
  1189. cmd.opcode = METADATA_SCAN_HOSTS;
  1190. cmd.completion = &compl;
  1191. metadata_enq_cmd(&metasync_worker, &cmd);
  1192. info("METADATA: Waiting for host scan completion");
  1193. completion_wait_for(&compl);
  1194. completion_destroy(&compl);
  1195. info("METADATA: Host scan complete; can continue with shutdown");
  1196. }
  1197. // -------------------------------------------------------------
  1198. // Init function called on agent startup
  1199. void metadata_sync_init(void)
  1200. {
  1201. struct metadata_wc *wc = &metasync_worker;
  1202. memset(wc, 0, sizeof(*wc));
  1203. metadata_init_cmd_queue(wc);
  1204. completion_init(&wc->init_complete);
  1205. fatal_assert(0 == uv_thread_create(&(wc->thread), metadata_event_loop, wc));
  1206. completion_wait_for(&wc->init_complete);
  1207. completion_destroy(&wc->init_complete);
  1208. info("SQLite metadata sync initialization complete");
  1209. }
  1210. // Helpers
  1211. static inline void queue_metadata_cmd(enum metadata_opcode opcode, const void *param0, const void *param1)
  1212. {
  1213. struct metadata_cmd cmd;
  1214. cmd.opcode = opcode;
  1215. cmd.param[0] = param0;
  1216. cmd.param[1] = param1;
  1217. cmd.completion = NULL;
  1218. metadata_enq_cmd(&metasync_worker, &cmd);
  1219. }
  1220. // Public
  1221. void metaqueue_delete_dimension_uuid(uuid_t *uuid)
  1222. {
  1223. if (unlikely(!metasync_worker.loop))
  1224. return;
  1225. uuid_t *use_uuid = mallocz(sizeof(*uuid));
  1226. uuid_copy(*use_uuid, *uuid);
  1227. queue_metadata_cmd(METADATA_DEL_DIMENSION, use_uuid, NULL);
  1228. }
  1229. void metaqueue_store_claim_id(uuid_t *host_uuid, uuid_t *claim_uuid)
  1230. {
  1231. if (unlikely(!host_uuid))
  1232. return;
  1233. uuid_t *local_host_uuid = mallocz(sizeof(*host_uuid));
  1234. uuid_t *local_claim_uuid = NULL;
  1235. uuid_copy(*local_host_uuid, *host_uuid);
  1236. if (likely(claim_uuid)) {
  1237. local_claim_uuid = mallocz(sizeof(*claim_uuid));
  1238. uuid_copy(*local_claim_uuid, *claim_uuid);
  1239. }
  1240. queue_metadata_cmd(METADATA_STORE_CLAIM_ID, local_host_uuid, local_claim_uuid);
  1241. }
  1242. void metaqueue_host_update_info(RRDHOST *host)
  1243. {
  1244. if (unlikely(!metasync_worker.loop))
  1245. return;
  1246. queue_metadata_cmd(METADATA_ADD_HOST_INFO, host, NULL);
  1247. }
  1248. void metaqueue_ml_load_models(RRDDIM *rd)
  1249. {
  1250. if (unlikely(!metasync_worker.loop))
  1251. return;
  1252. queue_metadata_cmd(METADATA_ML_LOAD_MODELS, rd, NULL);
  1253. }
  1254. void metadata_queue_load_host_context(RRDHOST *host)
  1255. {
  1256. if (unlikely(!metasync_worker.loop))
  1257. return;
  1258. queue_metadata_cmd(METADATA_LOAD_HOST_CONTEXT, host, NULL);
  1259. }
  1260. //
  1261. // unitests
  1262. //
  1263. static void *unittest_queue_metadata(void *arg) {
  1264. struct thread_unittest *tu = arg;
  1265. struct metadata_cmd cmd;
  1266. cmd.opcode = METADATA_UNITTEST;
  1267. cmd.param[0] = tu;
  1268. cmd.param[1] = NULL;
  1269. cmd.completion = NULL;
  1270. metadata_enq_cmd(&metasync_worker, &cmd);
  1271. do {
  1272. __atomic_fetch_add(&tu->added, 1, __ATOMIC_SEQ_CST);
  1273. metadata_enq_cmd(&metasync_worker, &cmd);
  1274. sleep_usec(10000);
  1275. } while (!__atomic_load_n(&tu->join, __ATOMIC_RELAXED));
  1276. return arg;
  1277. }
  1278. static void *metadata_unittest_threads(void)
  1279. {
  1280. unsigned done;
  1281. struct thread_unittest tu = {
  1282. .join = 0,
  1283. .added = 0,
  1284. .processed = 0,
  1285. .done = &done,
  1286. };
  1287. // Queue messages / Time it
  1288. time_t seconds_to_run = 5;
  1289. int threads_to_create = 4;
  1290. fprintf(
  1291. stderr,
  1292. "\nChecking metadata queue using %d threads for %lld seconds...\n",
  1293. threads_to_create,
  1294. (long long)seconds_to_run);
  1295. netdata_thread_t threads[threads_to_create];
  1296. tu.join = 0;
  1297. for (int i = 0; i < threads_to_create; i++) {
  1298. char buf[100 + 1];
  1299. snprintf(buf, 100, "META[%d]", i);
  1300. netdata_thread_create(
  1301. &threads[i],
  1302. buf,
  1303. NETDATA_THREAD_OPTION_DONT_LOG | NETDATA_THREAD_OPTION_JOINABLE,
  1304. unittest_queue_metadata,
  1305. &tu);
  1306. }
  1307. (void) uv_async_send(&metasync_worker.async);
  1308. sleep_usec(seconds_to_run * USEC_PER_SEC);
  1309. __atomic_store_n(&tu.join, 1, __ATOMIC_RELAXED);
  1310. for (int i = 0; i < threads_to_create; i++) {
  1311. void *retval;
  1312. netdata_thread_join(threads[i], &retval);
  1313. }
  1314. sleep_usec(5 * USEC_PER_SEC);
  1315. fprintf(stderr, "Added %u elements, processed %u\n", tu.added, tu.processed);
  1316. return 0;
  1317. }
  1318. int metadata_unittest(void)
  1319. {
  1320. metadata_sync_init();
  1321. // Queue items for a specific period of time
  1322. metadata_unittest_threads();
  1323. fprintf(stderr, "Items still in queue %u\n", metasync_worker.queue_size);
  1324. metadata_sync_shutdown();
  1325. return 0;
  1326. }