sqlite_metadata.c 55 KB

12345678910111213141516171819202122232425262728293031323334353637383940414243444546474849505152535455565758596061626364656667686970717273747576777879808182838485868788899091929394959697989910010110210310410510610710810911011111211311411511611711811912012112212312412512612712812913013113213313413513613713813914014114214314414514614714814915015115215315415515615715815916016116216316416516616716816917017117217317417517617717817918018118218318418518618718818919019119219319419519619719819920020120220320420520620720820921021121221321421521621721821922022122222322422522622722822923023123223323423523623723823924024124224324424524624724824925025125225325425525625725825926026126226326426526626726826927027127227327427527627727827928028128228328428528628728828929029129229329429529629729829930030130230330430530630730830931031131231331431531631731831932032132232332432532632732832933033133233333433533633733833934034134234334434534634734834935035135235335435535635735835936036136236336436536636736836937037137237337437537637737837938038138238338438538638738838939039139239339439539639739839940040140240340440540640740840941041141241341441541641741841942042142242342442542642742842943043143243343443543643743843944044144244344444544644744844945045145245345445545645745845946046146246346446546646746846947047147247347447547647747847948048148248348448548648748848949049149249349449549649749849950050150250350450550650750850951051151251351451551651751851952052152252352452552652752852953053153253353453553653753853954054154254354454554654754854955055155255355455555655755855956056156256356456556656756856957057157257357457557657757857958058158258358458558658758858959059159259359459559659759859960060160260360460560660760860961061161261361461561661761861962062162262362462562662762862963063163263363463563663763863964064164264364464564664764864965065165265365465565665765865966066166266366466566666766866967067167267367467567667767867968068168268368468568668768868969069169269369469569669769869970070170270370470570670770870971071171271371471571671771871972072172272372472572672772872973073173273373473573673773873974074174274374474574674774874975075175275375475575675775875976076176276376476576676776876977077177277377477577677777877978078178278378478578678778878979079179279379479579679779879980080180280380480580680780880981081181281381481581681781881982082182282382482582682782882983083183283383483583683783883984084184284384484584684784884985085185285385485585685785885986086186286386486586686786886987087187287387487587687787887988088188288388488588688788888989089189289389489589689789889990090190290390490590690790890991091191291391491591691791891992092192292392492592692792892993093193293393493593693793893994094194294394494594694794894995095195295395495595695795895996096196296396496596696796896997097197297397497597697797897998098198298398498598698798898999099199299399499599699799899910001001100210031004100510061007100810091010101110121013101410151016101710181019102010211022102310241025102610271028102910301031103210331034103510361037103810391040104110421043104410451046104710481049105010511052105310541055105610571058105910601061106210631064106510661067106810691070107110721073107410751076107710781079108010811082108310841085108610871088108910901091109210931094109510961097109810991100110111021103110411051106110711081109111011111112111311141115111611171118111911201121112211231124112511261127112811291130113111321133113411351136113711381139114011411142114311441145114611471148114911501151115211531154115511561157115811591160116111621163116411651166116711681169117011711172117311741175117611771178117911801181118211831184118511861187118811891190119111921193119411951196119711981199120012011202120312041205120612071208120912101211121212131214121512161217121812191220122112221223122412251226122712281229123012311232123312341235123612371238123912401241124212431244124512461247124812491250125112521253125412551256125712581259126012611262126312641265126612671268126912701271127212731274127512761277127812791280128112821283128412851286128712881289129012911292129312941295129612971298129913001301130213031304130513061307130813091310131113121313131413151316131713181319132013211322132313241325132613271328132913301331133213331334133513361337133813391340134113421343134413451346134713481349135013511352135313541355135613571358135913601361136213631364136513661367136813691370137113721373137413751376137713781379138013811382138313841385138613871388138913901391139213931394139513961397139813991400140114021403140414051406140714081409141014111412141314141415141614171418141914201421142214231424142514261427142814291430143114321433143414351436143714381439144014411442144314441445144614471448144914501451145214531454145514561457145814591460146114621463146414651466146714681469147014711472147314741475147614771478147914801481148214831484148514861487148814891490149114921493149414951496149714981499150015011502150315041505150615071508150915101511151215131514151515161517151815191520152115221523152415251526152715281529153015311532153315341535153615371538153915401541154215431544154515461547154815491550155115521553155415551556155715581559156015611562156315641565156615671568156915701571157215731574157515761577157815791580
  1. // SPDX-License-Identifier: GPL-3.0-or-later
  2. #include "sqlite_metadata.h"
  3. // SQL statements
  4. #define SQL_STORE_CLAIM_ID "insert into node_instance " \
  5. "(host_id, claim_id, date_created) values (@host_id, @claim_id, unixepoch()) " \
  6. "on conflict(host_id) do update set claim_id = excluded.claim_id;"
  7. #define SQL_DELETE_HOST_LABELS "DELETE FROM host_label WHERE host_id = @uuid;"
  8. #define STORE_HOST_LABEL \
  9. "INSERT OR REPLACE INTO host_label (host_id, source_type, label_key, label_value, date_created) VALUES "
  10. #define STORE_CHART_LABEL \
  11. "INSERT OR REPLACE INTO chart_label (chart_id, source_type, label_key, label_value, date_created) VALUES "
  12. #define STORE_HOST_OR_CHART_LABEL_VALUE "(u2h('%s'), %d,'%s','%s', unixepoch())"
  13. #define DELETE_DIMENSION_UUID "DELETE FROM dimension WHERE dim_id = @uuid;"
  14. #define SQL_STORE_HOST_INFO "INSERT OR REPLACE INTO host " \
  15. "(host_id, hostname, registry_hostname, update_every, os, timezone," \
  16. "tags, hops, memory_mode, abbrev_timezone, utc_offset, program_name, program_version," \
  17. "entries, health_enabled) " \
  18. "values (@host_id, @hostname, @registry_hostname, @update_every, @os, @timezone, @tags, @hops, @memory_mode, " \
  19. "@abbrev_timezone, @utc_offset, @program_name, @program_version, " \
  20. "@entries, @health_enabled);"
  21. #define SQL_STORE_CHART "insert or replace into chart (chart_id, host_id, type, id, " \
  22. "name, family, context, title, unit, plugin, module, priority, update_every , chart_type , memory_mode , " \
  23. "history_entries) values (?1,?2,?3,?4,?5,?6,?7,?8,?9,?10,?11,?12,?13,?14,?15,?16);"
  24. #define SQL_STORE_DIMENSION "INSERT OR REPLACE INTO dimension (dim_id, chart_id, id, name, multiplier, divisor , algorithm, options) " \
  25. "VALUES (@dim_id, @chart_id, @id, @name, @multiplier, @divisor, @algorithm, @options);"
  26. #define SELECT_DIMENSION_LIST "SELECT dim_id, rowid FROM dimension WHERE rowid > @row_id"
  27. #define STORE_HOST_INFO "INSERT OR REPLACE INTO host_info (host_id, system_key, system_value, date_created) VALUES "
  28. #define STORE_HOST_INFO_VALUES "(u2h('%s'), '%s','%s', unixepoch())"
  29. #define MIGRATE_LOCALHOST_TO_NEW_MACHINE_GUID \
  30. "UPDATE chart SET host_id = @host_id WHERE host_id in (SELECT host_id FROM host where host_id <> @host_id and hops = 0);"
  31. #define DELETE_NON_EXISTING_LOCALHOST "DELETE FROM host WHERE hops = 0 AND host_id <> @host_id;"
  32. #define DELETE_MISSING_NODE_INSTANCES "DELETE FROM node_instance WHERE host_id NOT IN (SELECT host_id FROM host);"
  33. #define METADATA_CMD_Q_MAX_SIZE (1024) // Max queue size; callers will block until there is room
  34. #define METADATA_MAINTENANCE_FIRST_CHECK (1800) // Maintenance first run after agent startup in seconds
  35. #define METADATA_MAINTENANCE_RETRY (60) // Retry run if already running or last run did actual work
  36. #define METADATA_MAINTENANCE_INTERVAL (3600) // Repeat maintenance after latest successful
  37. #define METADATA_HOST_CHECK_FIRST_CHECK (5) // First check for pending metadata
  38. #define METADATA_HOST_CHECK_INTERVAL (30) // Repeat check for pending metadata
  39. #define METADATA_HOST_CHECK_IMMEDIATE (5) // Repeat immediate run because we have more metadata to write
  40. #define MAX_METADATA_CLEANUP (500) // Maximum metadata write operations (e.g deletes before retrying)
  41. #define METADATA_MAX_BATCH_SIZE (512) // Maximum commands to execute before running the event loop
  42. #define METADATA_MAX_TRANSACTION_BATCH (128) // Maximum commands to add in a transaction
  43. enum metadata_opcode {
  44. METADATA_DATABASE_NOOP = 0,
  45. METADATA_DATABASE_TIMER,
  46. METADATA_ADD_CHART,
  47. METADATA_ADD_CHART_LABEL,
  48. METADATA_ADD_DIMENSION,
  49. METADATA_DEL_DIMENSION,
  50. METADATA_ADD_DIMENSION_OPTION,
  51. METADATA_ADD_HOST_SYSTEM_INFO,
  52. METADATA_ADD_HOST_INFO,
  53. METADATA_STORE_CLAIM_ID,
  54. METADATA_STORE_HOST_LABELS,
  55. METADATA_STORE_BUFFER,
  56. METADATA_SKIP_TRANSACTION, // Dummy -- OPCODES less than this one can be in a tranasction
  57. METADATA_SCAN_HOSTS,
  58. METADATA_MAINTENANCE,
  59. METADATA_SYNC_SHUTDOWN,
  60. METADATA_UNITTEST,
  61. // leave this last
  62. // we need it to check for worker utilization
  63. METADATA_MAX_ENUMERATIONS_DEFINED
  64. };
  65. #define MAX_PARAM_LIST (2)
  66. struct metadata_cmd {
  67. enum metadata_opcode opcode;
  68. struct completion *completion;
  69. const void *param[MAX_PARAM_LIST];
  70. };
  71. struct metadata_database_cmdqueue {
  72. unsigned head, tail;
  73. struct metadata_cmd cmd_array[METADATA_CMD_Q_MAX_SIZE];
  74. };
  75. typedef enum {
  76. METADATA_FLAG_CLEANUP = (1 << 0), // Cleanup is running
  77. METADATA_FLAG_SCANNING_HOSTS = (1 << 1), // Scanning of hosts in worker thread
  78. METADATA_FLAG_SHUTDOWN = (1 << 2), // Shutting down
  79. } METADATA_FLAG;
  80. #define METADATA_WORKER_BUSY (METADATA_FLAG_CLEANUP | METADATA_FLAG_SCANNING_HOSTS)
  81. struct metadata_wc {
  82. uv_thread_t thread;
  83. time_t check_metadata_after;
  84. time_t check_hosts_after;
  85. volatile unsigned queue_size;
  86. uv_loop_t *loop;
  87. uv_async_t async;
  88. METADATA_FLAG flags;
  89. uint64_t row_id;
  90. uv_timer_t timer_req;
  91. struct completion init_complete;
  92. /* FIFO command queue */
  93. uv_mutex_t cmd_mutex;
  94. uv_cond_t cmd_cond;
  95. struct metadata_database_cmdqueue cmd_queue;
  96. };
  97. #define metadata_flag_check(target_flags, flag) (__atomic_load_n(&((target_flags)->flags), __ATOMIC_SEQ_CST) & (flag))
  98. #define metadata_flag_set(target_flags, flag) __atomic_or_fetch(&((target_flags)->flags), (flag), __ATOMIC_SEQ_CST)
  99. #define metadata_flag_clear(target_flags, flag) __atomic_and_fetch(&((target_flags)->flags), ~(flag), __ATOMIC_SEQ_CST)
  100. //
  101. // For unittest
  102. //
  103. struct thread_unittest {
  104. int join;
  105. unsigned added;
  106. unsigned processed;
  107. unsigned *done;
  108. };
  109. // Metadata functions
  110. struct query_build {
  111. BUFFER *sql;
  112. int count;
  113. char uuid_str[UUID_STR_LEN];
  114. };
  115. static int host_label_store_to_sql_callback(const char *name, const char *value, RRDLABEL_SRC ls, void *data) {
  116. struct query_build *lb = data;
  117. if (unlikely(!lb->count))
  118. buffer_sprintf(lb->sql, STORE_HOST_LABEL);
  119. else
  120. buffer_strcat(lb->sql, ", ");
  121. buffer_sprintf(lb->sql, STORE_HOST_OR_CHART_LABEL_VALUE, lb->uuid_str, (int)ls & ~(RRDLABEL_FLAG_INTERNAL), name, value);
  122. lb->count++;
  123. return 1;
  124. }
  125. static int chart_label_store_to_sql_callback(const char *name, const char *value, RRDLABEL_SRC ls, void *data) {
  126. struct query_build *lb = data;
  127. if (unlikely(!lb->count))
  128. buffer_sprintf(lb->sql, STORE_CHART_LABEL);
  129. else
  130. buffer_strcat(lb->sql, ", ");
  131. buffer_sprintf(lb->sql, STORE_HOST_OR_CHART_LABEL_VALUE, lb->uuid_str, ls, name, value);
  132. lb->count++;
  133. return 1;
  134. }
  135. static void check_and_update_chart_labels(RRDSET *st, BUFFER *work_buffer)
  136. {
  137. size_t old_version = st->rrdlabels_last_saved_version;
  138. size_t new_version = dictionary_version(st->rrdlabels);
  139. if(new_version != old_version) {
  140. buffer_flush(work_buffer);
  141. struct query_build tmp = {.sql = work_buffer, .count = 0};
  142. uuid_unparse_lower(st->chart_uuid, tmp.uuid_str);
  143. rrdlabels_walkthrough_read(st->rrdlabels, chart_label_store_to_sql_callback, &tmp);
  144. st->rrdlabels_last_saved_version = new_version;
  145. db_execute(buffer_tostring(work_buffer));
  146. }
  147. }
  148. // Migrate all hosts with hops zero to this host_uuid
  149. void migrate_localhost(uuid_t *host_uuid)
  150. {
  151. int rc;
  152. rc = exec_statement_with_uuid(MIGRATE_LOCALHOST_TO_NEW_MACHINE_GUID, host_uuid);
  153. if (!rc)
  154. rc = exec_statement_with_uuid(DELETE_NON_EXISTING_LOCALHOST, host_uuid);
  155. if (!rc)
  156. db_execute(DELETE_MISSING_NODE_INSTANCES);
  157. }
  158. static void store_claim_id(uuid_t *host_id, uuid_t *claim_id)
  159. {
  160. sqlite3_stmt *res = NULL;
  161. int rc;
  162. if (unlikely(!db_meta)) {
  163. if (default_rrd_memory_mode == RRD_MEMORY_MODE_DBENGINE)
  164. error_report("Database has not been initialized");
  165. return;
  166. }
  167. rc = sqlite3_prepare_v2(db_meta, SQL_STORE_CLAIM_ID, -1, &res, 0);
  168. if (unlikely(rc != SQLITE_OK)) {
  169. error_report("Failed to prepare statement store chart labels");
  170. return;
  171. }
  172. rc = sqlite3_bind_blob(res, 1, host_id, sizeof(*host_id), SQLITE_STATIC);
  173. if (unlikely(rc != SQLITE_OK)) {
  174. error_report("Failed to bind host_id parameter to store node instance information");
  175. goto failed;
  176. }
  177. if (claim_id)
  178. rc = sqlite3_bind_blob(res, 2, claim_id, sizeof(*claim_id), SQLITE_STATIC);
  179. else
  180. rc = sqlite3_bind_null(res, 2);
  181. if (unlikely(rc != SQLITE_OK)) {
  182. error_report("Failed to bind claim_id parameter to store node instance information");
  183. goto failed;
  184. }
  185. rc = execute_insert(res);
  186. if (unlikely(rc != SQLITE_DONE))
  187. error_report("Failed to store node instance information, rc = %d", rc);
  188. failed:
  189. if (unlikely(sqlite3_finalize(res) != SQLITE_OK))
  190. error_report("Failed to finalize the prepared statement when storing node instance information");
  191. }
  192. static void delete_dimension_uuid(uuid_t *dimension_uuid)
  193. {
  194. static __thread sqlite3_stmt *res = NULL;
  195. int rc;
  196. if (unlikely(!res)) {
  197. rc = prepare_statement(db_meta, DELETE_DIMENSION_UUID, &res);
  198. if (rc != SQLITE_OK) {
  199. error_report("Failed to prepare statement to delete a dimension uuid");
  200. return;
  201. }
  202. }
  203. rc = sqlite3_bind_blob(res, 1, dimension_uuid, sizeof(*dimension_uuid), SQLITE_STATIC);
  204. if (unlikely(rc != SQLITE_OK))
  205. goto skip_execution;
  206. rc = sqlite3_step_monitored(res);
  207. if (unlikely(rc != SQLITE_DONE))
  208. error_report("Failed to delete dimension uuid, rc = %d", rc);
  209. skip_execution:
  210. rc = sqlite3_reset(res);
  211. if (unlikely(rc != SQLITE_OK))
  212. error_report("Failed to reset statement when deleting dimension UUID, rc = %d", rc);
  213. }
  214. //
  215. // Store host and host system info information in the database
  216. static int sql_store_host_info(RRDHOST *host)
  217. {
  218. static __thread sqlite3_stmt *res = NULL;
  219. int rc, param = 0;
  220. if (unlikely(!db_meta)) {
  221. if (default_rrd_memory_mode != RRD_MEMORY_MODE_DBENGINE)
  222. return 0;
  223. error_report("Database has not been initialized");
  224. return 1;
  225. }
  226. if (unlikely((!res))) {
  227. rc = prepare_statement(db_meta, SQL_STORE_HOST_INFO, &res);
  228. if (unlikely(rc != SQLITE_OK)) {
  229. error_report("Failed to prepare statement to store host, rc = %d", rc);
  230. return 1;
  231. }
  232. }
  233. rc = sqlite3_bind_blob(res, ++param, &host->host_uuid, sizeof(host->host_uuid), SQLITE_STATIC);
  234. if (unlikely(rc != SQLITE_OK))
  235. goto bind_fail;
  236. rc = bind_text_null(res, ++param, rrdhost_hostname(host), 0);
  237. if (unlikely(rc != SQLITE_OK))
  238. goto bind_fail;
  239. rc = bind_text_null(res, ++param, rrdhost_registry_hostname(host), 1);
  240. if (unlikely(rc != SQLITE_OK))
  241. goto bind_fail;
  242. rc = sqlite3_bind_int(res, ++param, host->rrd_update_every);
  243. if (unlikely(rc != SQLITE_OK))
  244. goto bind_fail;
  245. rc = bind_text_null(res, ++param, rrdhost_os(host), 1);
  246. if (unlikely(rc != SQLITE_OK))
  247. goto bind_fail;
  248. rc = bind_text_null(res, ++param, rrdhost_timezone(host), 1);
  249. if (unlikely(rc != SQLITE_OK))
  250. goto bind_fail;
  251. rc = bind_text_null(res, ++param, rrdhost_tags(host), 1);
  252. if (unlikely(rc != SQLITE_OK))
  253. goto bind_fail;
  254. rc = sqlite3_bind_int(res, ++param, host->system_info ? host->system_info->hops : 0);
  255. if (unlikely(rc != SQLITE_OK))
  256. goto bind_fail;
  257. rc = sqlite3_bind_int(res, ++param, host->rrd_memory_mode);
  258. if (unlikely(rc != SQLITE_OK))
  259. goto bind_fail;
  260. rc = bind_text_null(res, ++param, rrdhost_abbrev_timezone(host), 1);
  261. if (unlikely(rc != SQLITE_OK))
  262. goto bind_fail;
  263. rc = sqlite3_bind_int(res, ++param, host->utc_offset);
  264. if (unlikely(rc != SQLITE_OK))
  265. goto bind_fail;
  266. rc = bind_text_null(res, ++param, rrdhost_program_name(host), 1);
  267. if (unlikely(rc != SQLITE_OK))
  268. goto bind_fail;
  269. rc = bind_text_null(res, ++param, rrdhost_program_version(host), 1);
  270. if (unlikely(rc != SQLITE_OK))
  271. goto bind_fail;
  272. rc = sqlite3_bind_int64(res, ++param, host->rrd_history_entries);
  273. if (unlikely(rc != SQLITE_OK))
  274. goto bind_fail;
  275. rc = sqlite3_bind_int(res, ++param, (int ) host->health_enabled);
  276. if (unlikely(rc != SQLITE_OK))
  277. goto bind_fail;
  278. int store_rc = sqlite3_step_monitored(res);
  279. if (unlikely(store_rc != SQLITE_DONE))
  280. error_report("Failed to store host %s, rc = %d", rrdhost_hostname(host), rc);
  281. rc = sqlite3_reset(res);
  282. if (unlikely(rc != SQLITE_OK))
  283. error_report("Failed to reset statement to store host %s, rc = %d", rrdhost_hostname(host), rc);
  284. return !(store_rc == SQLITE_DONE);
  285. bind_fail:
  286. error_report("Failed to bind %d parameter to store host %s, rc = %d", param, rrdhost_hostname(host), rc);
  287. rc = sqlite3_reset(res);
  288. if (unlikely(rc != SQLITE_OK))
  289. error_report("Failed to reset statement to store host %s, rc = %d", rrdhost_hostname(host), rc);
  290. return 1;
  291. }
  292. static void sql_store_host_system_info_key_value(const char *name, const char *value, void *data)
  293. {
  294. struct query_build *lb = data;
  295. if (unlikely(!value))
  296. return;
  297. if (unlikely(!lb->count))
  298. buffer_sprintf(
  299. lb->sql, STORE_HOST_INFO);
  300. else
  301. buffer_strcat(lb->sql, ", ");
  302. buffer_sprintf(lb->sql, STORE_HOST_INFO_VALUES, lb->uuid_str, name, value);
  303. lb->count++;
  304. }
  305. static BUFFER *sql_store_host_system_info(RRDHOST *host)
  306. {
  307. struct rrdhost_system_info *system_info = host->system_info;
  308. if (unlikely(!system_info))
  309. return NULL;
  310. BUFFER *work_buffer = buffer_create(1024);
  311. struct query_build key_data = {.sql = work_buffer, .count = 0};
  312. uuid_unparse_lower(host->host_uuid, key_data.uuid_str);
  313. sql_store_host_system_info_key_value("NETDATA_CONTAINER_OS_NAME", system_info->container_os_name, &key_data);
  314. sql_store_host_system_info_key_value("NETDATA_CONTAINER_OS_ID", system_info->container_os_id, &key_data);
  315. sql_store_host_system_info_key_value("NETDATA_CONTAINER_OS_ID_LIKE", system_info->container_os_id_like, &key_data);
  316. sql_store_host_system_info_key_value("NETDATA_CONTAINER_OS_VERSION", system_info->container_os_version, &key_data);
  317. sql_store_host_system_info_key_value("NETDATA_CONTAINER_OS_VERSION_ID", system_info->container_os_version_id, &key_data);
  318. sql_store_host_system_info_key_value("NETDATA_CONTAINER_OS_DETECTION", system_info->host_os_detection, &key_data);
  319. sql_store_host_system_info_key_value("NETDATA_HOST_OS_NAME", system_info->host_os_name, &key_data);
  320. sql_store_host_system_info_key_value("NETDATA_HOST_OS_ID", system_info->host_os_id, &key_data);
  321. sql_store_host_system_info_key_value("NETDATA_HOST_OS_ID_LIKE", system_info->host_os_id_like, &key_data);
  322. sql_store_host_system_info_key_value("NETDATA_HOST_OS_VERSION", system_info->host_os_version, &key_data);
  323. sql_store_host_system_info_key_value("NETDATA_HOST_OS_VERSION_ID", system_info->host_os_version_id, &key_data);
  324. sql_store_host_system_info_key_value("NETDATA_HOST_OS_DETECTION", system_info->host_os_detection, &key_data);
  325. sql_store_host_system_info_key_value("NETDATA_SYSTEM_KERNEL_NAME", system_info->kernel_name, &key_data);
  326. sql_store_host_system_info_key_value("NETDATA_SYSTEM_CPU_LOGICAL_CPU_COUNT", system_info->host_cores, &key_data);
  327. sql_store_host_system_info_key_value("NETDATA_SYSTEM_CPU_FREQ", system_info->host_cpu_freq, &key_data);
  328. sql_store_host_system_info_key_value("NETDATA_SYSTEM_TOTAL_RAM", system_info->host_ram_total, &key_data);
  329. sql_store_host_system_info_key_value("NETDATA_SYSTEM_TOTAL_DISK_SIZE", system_info->host_disk_space, &key_data);
  330. sql_store_host_system_info_key_value("NETDATA_SYSTEM_KERNEL_VERSION", system_info->kernel_version, &key_data);
  331. sql_store_host_system_info_key_value("NETDATA_SYSTEM_ARCHITECTURE", system_info->architecture, &key_data);
  332. sql_store_host_system_info_key_value("NETDATA_SYSTEM_VIRTUALIZATION", system_info->virtualization, &key_data);
  333. sql_store_host_system_info_key_value("NETDATA_SYSTEM_VIRT_DETECTION", system_info->virt_detection, &key_data);
  334. sql_store_host_system_info_key_value("NETDATA_SYSTEM_CONTAINER", system_info->container, &key_data);
  335. sql_store_host_system_info_key_value("NETDATA_SYSTEM_CONTAINER_DETECTION", system_info->container_detection, &key_data);
  336. sql_store_host_system_info_key_value("NETDATA_HOST_IS_K8S_NODE", system_info->is_k8s_node, &key_data);
  337. return work_buffer;
  338. }
  339. /*
  340. * Store set option for a dimension
  341. */
  342. static int sql_set_dimension_option(uuid_t *dim_uuid, char *option)
  343. {
  344. sqlite3_stmt *res = NULL;
  345. int rc;
  346. if (unlikely(!db_meta)) {
  347. if (default_rrd_memory_mode != RRD_MEMORY_MODE_DBENGINE)
  348. return 0;
  349. error_report("Database has not been initialized");
  350. return 1;
  351. }
  352. rc = sqlite3_prepare_v2(db_meta, "UPDATE dimension SET options = @options WHERE dim_id = @dim_id", -1, &res, 0);
  353. if (unlikely(rc != SQLITE_OK)) {
  354. error_report("Failed to prepare statement to update dimension options");
  355. return 0;
  356. };
  357. rc = sqlite3_bind_blob(res, 2, dim_uuid, sizeof(*dim_uuid), SQLITE_STATIC);
  358. if (unlikely(rc != SQLITE_OK))
  359. goto bind_fail;
  360. if (!option || !strcmp(option,"unhide"))
  361. rc = sqlite3_bind_null(res, 1);
  362. else
  363. rc = sqlite3_bind_text(res, 1, option, -1, SQLITE_STATIC);
  364. if (unlikely(rc != SQLITE_OK))
  365. goto bind_fail;
  366. rc = execute_insert(res);
  367. if (unlikely(rc != SQLITE_DONE))
  368. error_report("Failed to update dimension option, rc = %d", rc);
  369. bind_fail:
  370. rc = sqlite3_finalize(res);
  371. if (unlikely(rc != SQLITE_OK))
  372. error_report("Failed to finalize statement in update dimension options, rc = %d", rc);
  373. return 0;
  374. }
  375. /*
  376. * Store a chart in the database
  377. */
  378. static int sql_store_chart(
  379. uuid_t *chart_uuid, uuid_t *host_uuid, const char *type, const char *id, const char *name, const char *family,
  380. const char *context, const char *title, const char *units, const char *plugin, const char *module, long priority,
  381. int update_every, int chart_type, int memory_mode, long history_entries)
  382. {
  383. static __thread sqlite3_stmt *res = NULL;
  384. int rc, param = 0;
  385. if (unlikely(!db_meta)) {
  386. if (default_rrd_memory_mode != RRD_MEMORY_MODE_DBENGINE)
  387. return 0;
  388. error_report("Database has not been initialized");
  389. return 1;
  390. }
  391. if (unlikely(!res)) {
  392. rc = prepare_statement(db_meta, SQL_STORE_CHART, &res);
  393. if (unlikely(rc != SQLITE_OK)) {
  394. error_report("Failed to prepare statement to store chart, rc = %d", rc);
  395. return 1;
  396. }
  397. }
  398. param++;
  399. rc = sqlite3_bind_blob(res, 1, chart_uuid, sizeof(*chart_uuid), SQLITE_STATIC);
  400. if (unlikely(rc != SQLITE_OK))
  401. goto bind_fail;
  402. param++;
  403. rc = sqlite3_bind_blob(res, 2, host_uuid, sizeof(*host_uuid), SQLITE_STATIC);
  404. if (unlikely(rc != SQLITE_OK))
  405. goto bind_fail;
  406. param++;
  407. rc = sqlite3_bind_text(res, 3, type, -1, SQLITE_STATIC);
  408. if (unlikely(rc != SQLITE_OK))
  409. goto bind_fail;
  410. param++;
  411. rc = sqlite3_bind_text(res, 4, id, -1, SQLITE_STATIC);
  412. if (unlikely(rc != SQLITE_OK))
  413. goto bind_fail;
  414. param++;
  415. if (name && *name)
  416. rc = sqlite3_bind_text(res, 5, name, -1, SQLITE_STATIC);
  417. else
  418. rc = sqlite3_bind_null(res, 5);
  419. if (unlikely(rc != SQLITE_OK))
  420. goto bind_fail;
  421. param++;
  422. rc = sqlite3_bind_text(res, 6, family, -1, SQLITE_STATIC);
  423. if (unlikely(rc != SQLITE_OK))
  424. goto bind_fail;
  425. param++;
  426. rc = sqlite3_bind_text(res, 7, context, -1, SQLITE_STATIC);
  427. if (unlikely(rc != SQLITE_OK))
  428. goto bind_fail;
  429. param++;
  430. rc = sqlite3_bind_text(res, 8, title, -1, SQLITE_STATIC);
  431. if (unlikely(rc != SQLITE_OK))
  432. goto bind_fail;
  433. param++;
  434. rc = sqlite3_bind_text(res, 9, units, -1, SQLITE_STATIC);
  435. if (unlikely(rc != SQLITE_OK))
  436. goto bind_fail;
  437. param++;
  438. rc = sqlite3_bind_text(res, 10, plugin, -1, SQLITE_STATIC);
  439. if (unlikely(rc != SQLITE_OK))
  440. goto bind_fail;
  441. param++;
  442. rc = sqlite3_bind_text(res, 11, module, -1, SQLITE_STATIC);
  443. if (unlikely(rc != SQLITE_OK))
  444. goto bind_fail;
  445. param++;
  446. rc = sqlite3_bind_int(res, 12, (int) priority);
  447. if (unlikely(rc != SQLITE_OK))
  448. goto bind_fail;
  449. param++;
  450. rc = sqlite3_bind_int(res, 13, update_every);
  451. if (unlikely(rc != SQLITE_OK))
  452. goto bind_fail;
  453. param++;
  454. rc = sqlite3_bind_int(res, 14, chart_type);
  455. if (unlikely(rc != SQLITE_OK))
  456. goto bind_fail;
  457. param++;
  458. rc = sqlite3_bind_int(res, 15, memory_mode);
  459. if (unlikely(rc != SQLITE_OK))
  460. goto bind_fail;
  461. param++;
  462. rc = sqlite3_bind_int(res, 16, (int) history_entries);
  463. if (unlikely(rc != SQLITE_OK))
  464. goto bind_fail;
  465. rc = execute_insert(res);
  466. if (unlikely(rc != SQLITE_DONE))
  467. error_report("Failed to store chart, rc = %d", rc);
  468. rc = sqlite3_reset(res);
  469. if (unlikely(rc != SQLITE_OK))
  470. error_report("Failed to reset statement in chart store function, rc = %d", rc);
  471. return 0;
  472. bind_fail:
  473. error_report("Failed to bind parameter %d to store chart, rc = %d", param, rc);
  474. rc = sqlite3_reset(res);
  475. if (unlikely(rc != SQLITE_OK))
  476. error_report("Failed to reset statement in chart store function, rc = %d", rc);
  477. return 1;
  478. }
  479. /*
  480. * Store a dimension
  481. */
  482. static int sql_store_dimension(
  483. uuid_t *dim_uuid, uuid_t *chart_uuid, const char *id, const char *name, collected_number multiplier,
  484. collected_number divisor, int algorithm, bool hidden)
  485. {
  486. static __thread sqlite3_stmt *res = NULL;
  487. int rc, param = 0;
  488. if (unlikely(!db_meta)) {
  489. if (default_rrd_memory_mode != RRD_MEMORY_MODE_DBENGINE)
  490. return 0;
  491. error_report("Database has not been initialized");
  492. return 1;
  493. }
  494. if (unlikely(!res)) {
  495. rc = prepare_statement(db_meta, SQL_STORE_DIMENSION, &res);
  496. if (unlikely(rc != SQLITE_OK)) {
  497. error_report("Failed to prepare statement to store dimension, rc = %d", rc);
  498. return 1;
  499. }
  500. }
  501. rc = sqlite3_bind_blob(res, ++param, dim_uuid, sizeof(*dim_uuid), SQLITE_STATIC);
  502. if (unlikely(rc != SQLITE_OK))
  503. goto bind_fail;
  504. rc = sqlite3_bind_blob(res, ++param, chart_uuid, sizeof(*chart_uuid), SQLITE_STATIC);
  505. if (unlikely(rc != SQLITE_OK))
  506. goto bind_fail;
  507. rc = sqlite3_bind_text(res, ++param, id, -1, SQLITE_STATIC);
  508. if (unlikely(rc != SQLITE_OK))
  509. goto bind_fail;
  510. rc = sqlite3_bind_text(res, ++param, name, -1, SQLITE_STATIC);
  511. if (unlikely(rc != SQLITE_OK))
  512. goto bind_fail;
  513. rc = sqlite3_bind_int(res, ++param, (int) multiplier);
  514. if (unlikely(rc != SQLITE_OK))
  515. goto bind_fail;
  516. rc = sqlite3_bind_int(res, ++param, (int ) divisor);
  517. if (unlikely(rc != SQLITE_OK))
  518. goto bind_fail;
  519. rc = sqlite3_bind_int(res, ++param, algorithm);
  520. if (unlikely(rc != SQLITE_OK))
  521. goto bind_fail;
  522. if (hidden)
  523. rc = sqlite3_bind_text(res, ++param, "hidden", -1, SQLITE_STATIC);
  524. else
  525. rc = sqlite3_bind_null(res, ++param);
  526. if (unlikely(rc != SQLITE_OK))
  527. goto bind_fail;
  528. rc = execute_insert(res);
  529. if (unlikely(rc != SQLITE_DONE))
  530. error_report("Failed to store dimension, rc = %d", rc);
  531. rc = sqlite3_reset(res);
  532. if (unlikely(rc != SQLITE_OK))
  533. error_report("Failed to reset statement in store dimension, rc = %d", rc);
  534. return 0;
  535. bind_fail:
  536. error_report("Failed to bind parameter %d to store dimension, rc = %d", param, rc);
  537. rc = sqlite3_reset(res);
  538. if (unlikely(rc != SQLITE_OK))
  539. error_report("Failed to reset statement in store dimension, rc = %d", rc);
  540. return 1;
  541. }
  542. static bool dimension_can_be_deleted(uuid_t *dim_uuid)
  543. {
  544. #ifdef ENABLE_DBENGINE
  545. bool no_retention = true;
  546. for (size_t tier = 0; tier < storage_tiers; tier++) {
  547. if (!multidb_ctx[tier])
  548. continue;
  549. time_t first_time_t = 0, last_time_t = 0;
  550. if (rrdeng_metric_retention_by_uuid((void *) multidb_ctx[tier], dim_uuid, &first_time_t, &last_time_t) == 0) {
  551. if (first_time_t > 0) {
  552. no_retention = false;
  553. break;
  554. }
  555. }
  556. }
  557. return no_retention;
  558. #else
  559. return false;
  560. #endif
  561. }
  562. static void check_dimension_metadata(struct metadata_wc *wc)
  563. {
  564. int rc;
  565. sqlite3_stmt *res = NULL;
  566. rc = sqlite3_prepare_v2(db_meta, SELECT_DIMENSION_LIST, -1, &res, 0);
  567. if (unlikely(rc != SQLITE_OK)) {
  568. error_report("Failed to prepare statement to fetch host dimensions");
  569. return;
  570. }
  571. rc = sqlite3_bind_int64(res, 1, (sqlite3_int64) wc->row_id);
  572. if (unlikely(rc != SQLITE_OK)) {
  573. error_report("Failed to row parameter");
  574. goto skip_run;
  575. }
  576. uint32_t total_checked = 0;
  577. uint32_t total_deleted= 0;
  578. uint64_t last_row_id = wc->row_id;
  579. info("METADATA: Checking dimensions starting after row %"PRIu64, wc->row_id);
  580. while (sqlite3_step_monitored(res) == SQLITE_ROW && total_deleted < MAX_METADATA_CLEANUP) {
  581. if (unlikely(metadata_flag_check(wc, METADATA_FLAG_SHUTDOWN)))
  582. break;
  583. last_row_id = sqlite3_column_int64(res, 1);
  584. rc = dimension_can_be_deleted((uuid_t *)sqlite3_column_blob(res, 0));
  585. if (rc == true) {
  586. delete_dimension_uuid((uuid_t *)sqlite3_column_blob(res, 0));
  587. total_deleted++;
  588. }
  589. total_checked++;
  590. }
  591. wc->row_id = last_row_id;
  592. time_t now = now_realtime_sec();
  593. if (total_deleted > 0) {
  594. wc->check_metadata_after = now + METADATA_MAINTENANCE_RETRY;
  595. } else
  596. wc->row_id = 0;
  597. info("METADATA: Checked %u, deleted %u -- will resume after row %"PRIu64" in %lld seconds", total_checked, total_deleted, wc->row_id,
  598. (long long)(wc->check_metadata_after - now));
  599. skip_run:
  600. rc = sqlite3_finalize(res);
  601. if (unlikely(rc != SQLITE_OK))
  602. error_report("Failed to finalize the prepared statement when reading dimensions");
  603. }
  604. //
  605. // EVENT LOOP STARTS HERE
  606. //
  607. static uv_mutex_t metadata_async_lock;
  608. static void metadata_init_cmd_queue(struct metadata_wc *wc)
  609. {
  610. wc->cmd_queue.head = wc->cmd_queue.tail = 0;
  611. wc->queue_size = 0;
  612. fatal_assert(0 == uv_cond_init(&wc->cmd_cond));
  613. fatal_assert(0 == uv_mutex_init(&wc->cmd_mutex));
  614. }
  615. int metadata_enq_cmd_noblock(struct metadata_wc *wc, struct metadata_cmd *cmd)
  616. {
  617. unsigned queue_size;
  618. /* wait for free space in queue */
  619. uv_mutex_lock(&wc->cmd_mutex);
  620. if (cmd->opcode == METADATA_SYNC_SHUTDOWN) {
  621. metadata_flag_set(wc, METADATA_FLAG_SHUTDOWN);
  622. uv_mutex_unlock(&wc->cmd_mutex);
  623. return 0;
  624. }
  625. if (unlikely((queue_size = wc->queue_size) == METADATA_CMD_Q_MAX_SIZE ||
  626. metadata_flag_check(wc, METADATA_FLAG_SHUTDOWN))) {
  627. uv_mutex_unlock(&wc->cmd_mutex);
  628. return 1;
  629. }
  630. fatal_assert(queue_size < METADATA_CMD_Q_MAX_SIZE);
  631. /* enqueue command */
  632. wc->cmd_queue.cmd_array[wc->cmd_queue.tail] = *cmd;
  633. wc->cmd_queue.tail = wc->cmd_queue.tail != METADATA_CMD_Q_MAX_SIZE - 1 ?
  634. wc->cmd_queue.tail + 1 : 0;
  635. wc->queue_size = queue_size + 1;
  636. uv_mutex_unlock(&wc->cmd_mutex);
  637. return 0;
  638. }
  639. static void metadata_enq_cmd(struct metadata_wc *wc, struct metadata_cmd *cmd)
  640. {
  641. unsigned queue_size;
  642. /* wait for free space in queue */
  643. uv_mutex_lock(&wc->cmd_mutex);
  644. if (unlikely(metadata_flag_check(wc, METADATA_FLAG_SHUTDOWN))) {
  645. uv_mutex_unlock(&wc->cmd_mutex);
  646. (void) uv_async_send(&wc->async);
  647. return;
  648. }
  649. if (cmd->opcode == METADATA_SYNC_SHUTDOWN) {
  650. metadata_flag_set(wc, METADATA_FLAG_SHUTDOWN);
  651. uv_mutex_unlock(&wc->cmd_mutex);
  652. (void) uv_async_send(&wc->async);
  653. return;
  654. }
  655. while ((queue_size = wc->queue_size) == METADATA_CMD_Q_MAX_SIZE) {
  656. if (unlikely(metadata_flag_check(wc, METADATA_FLAG_SHUTDOWN))) {
  657. uv_mutex_unlock(&wc->cmd_mutex);
  658. return;
  659. }
  660. uv_cond_wait(&wc->cmd_cond, &wc->cmd_mutex);
  661. }
  662. fatal_assert(queue_size < METADATA_CMD_Q_MAX_SIZE);
  663. /* enqueue command */
  664. wc->cmd_queue.cmd_array[wc->cmd_queue.tail] = *cmd;
  665. wc->cmd_queue.tail = wc->cmd_queue.tail != METADATA_CMD_Q_MAX_SIZE - 1 ?
  666. wc->cmd_queue.tail + 1 : 0;
  667. wc->queue_size = queue_size + 1;
  668. uv_mutex_unlock(&wc->cmd_mutex);
  669. /* wake up event loop */
  670. (void) uv_async_send(&wc->async);
  671. }
  672. static struct metadata_cmd metadata_deq_cmd(struct metadata_wc *wc, enum metadata_opcode *next_opcode)
  673. {
  674. struct metadata_cmd ret;
  675. unsigned queue_size;
  676. uv_mutex_lock(&wc->cmd_mutex);
  677. queue_size = wc->queue_size;
  678. if (queue_size == 0) {
  679. memset(&ret, 0, sizeof(ret));
  680. ret.opcode = METADATA_DATABASE_NOOP;
  681. ret.completion = NULL;
  682. *next_opcode = METADATA_DATABASE_NOOP;
  683. } else {
  684. /* dequeue command */
  685. ret = wc->cmd_queue.cmd_array[wc->cmd_queue.head];
  686. if (queue_size == 1) {
  687. wc->cmd_queue.head = wc->cmd_queue.tail = 0;
  688. } else {
  689. wc->cmd_queue.head = wc->cmd_queue.head != METADATA_CMD_Q_MAX_SIZE - 1 ?
  690. wc->cmd_queue.head + 1 : 0;
  691. }
  692. wc->queue_size = queue_size - 1;
  693. if (wc->queue_size > 0)
  694. *next_opcode = wc->cmd_queue.cmd_array[wc->cmd_queue.head].opcode;
  695. else
  696. *next_opcode = METADATA_DATABASE_NOOP;
  697. /* wake up producers */
  698. uv_cond_signal(&wc->cmd_cond);
  699. }
  700. uv_mutex_unlock(&wc->cmd_mutex);
  701. return ret;
  702. }
  703. static void async_cb(uv_async_t *handle)
  704. {
  705. uv_stop(handle->loop);
  706. uv_update_time(handle->loop);
  707. }
  708. #define TIMER_INITIAL_PERIOD_MS (1000)
  709. #define TIMER_REPEAT_PERIOD_MS (1000)
  710. static void timer_cb(uv_timer_t* handle)
  711. {
  712. uv_stop(handle->loop);
  713. uv_update_time(handle->loop);
  714. struct metadata_wc *wc = handle->data;
  715. struct metadata_cmd cmd;
  716. memset(&cmd, 0, sizeof(cmd));
  717. time_t now = now_realtime_sec();
  718. if (wc->check_metadata_after && wc->check_metadata_after < now) {
  719. cmd.opcode = METADATA_MAINTENANCE;
  720. if (!metadata_enq_cmd_noblock(wc, &cmd))
  721. wc->check_metadata_after = now + METADATA_MAINTENANCE_INTERVAL;
  722. }
  723. if (wc->check_hosts_after && wc->check_hosts_after < now) {
  724. cmd.opcode = METADATA_SCAN_HOSTS;
  725. if (!metadata_enq_cmd_noblock(wc, &cmd))
  726. wc->check_hosts_after = now + METADATA_HOST_CHECK_INTERVAL;
  727. }
  728. }
  729. static void after_metadata_cleanup(uv_work_t *req, int status)
  730. {
  731. UNUSED(status);
  732. struct metadata_wc *wc = req->data;
  733. metadata_flag_clear(wc, METADATA_FLAG_CLEANUP);
  734. }
  735. static void start_metadata_cleanup(uv_work_t *req)
  736. {
  737. struct metadata_wc *wc = req->data;
  738. check_dimension_metadata(wc);
  739. }
  740. struct scan_metadata_payload {
  741. uv_work_t request;
  742. struct metadata_wc *wc;
  743. struct completion *completion;
  744. uint32_t max_count;
  745. };
  746. // Callback after scan of hosts is done
  747. static void after_metadata_hosts(uv_work_t *req, int status __maybe_unused)
  748. {
  749. struct scan_metadata_payload *data = req->data;
  750. struct metadata_wc *wc = data->wc;
  751. metadata_flag_clear(wc, METADATA_FLAG_SCANNING_HOSTS);
  752. internal_error(true, "METADATA: scanning hosts complete");
  753. if (unlikely(data->completion)) {
  754. completion_mark_complete(data->completion);
  755. internal_error(true, "METADATA: Sending completion done");
  756. }
  757. freez(data);
  758. }
  759. static bool metadata_scan_host(RRDHOST *host, uint32_t max_count) {
  760. RRDSET *st;
  761. int rc;
  762. bool more_to_do = false;
  763. uint32_t scan_count = 1;
  764. BUFFER *work_buffer = buffer_create(1024);
  765. rrdset_foreach_reentrant(st, host) {
  766. if (scan_count == max_count) {
  767. more_to_do = true;
  768. break;
  769. }
  770. if(rrdset_flag_check(st, RRDSET_FLAG_METADATA_UPDATE)) {
  771. rrdset_flag_clear(st, RRDSET_FLAG_METADATA_UPDATE);
  772. scan_count++;
  773. check_and_update_chart_labels(st, work_buffer);
  774. rc = sql_store_chart(
  775. &st->chart_uuid,
  776. &st->rrdhost->host_uuid,
  777. string2str(st->parts.type),
  778. string2str(st->parts.id),
  779. string2str(st->parts.name),
  780. rrdset_family(st),
  781. rrdset_context(st),
  782. rrdset_title(st),
  783. rrdset_units(st),
  784. rrdset_plugin_name(st),
  785. rrdset_module_name(st),
  786. st->priority,
  787. st->update_every,
  788. st->chart_type,
  789. st->rrd_memory_mode,
  790. st->entries);
  791. if (unlikely(rc))
  792. internal_error(true, "METADATA: Failed to store chart metadata %s", string2str(st->id));
  793. }
  794. RRDDIM *rd;
  795. rrddim_foreach_read(rd, st) {
  796. if(rrddim_flag_check(rd, RRDDIM_FLAG_METADATA_UPDATE)) {
  797. rrddim_flag_clear(rd, RRDDIM_FLAG_METADATA_UPDATE);
  798. rc = sql_store_dimension(
  799. &rd->metric_uuid,
  800. &rd->rrdset->chart_uuid,
  801. string2str(rd->id),
  802. string2str(rd->name),
  803. rd->multiplier,
  804. rd->divisor,
  805. rd->algorithm,
  806. rrddim_option_check(rd, RRDDIM_OPTION_HIDDEN));
  807. if (unlikely(rc))
  808. error_report("METADATA: Failed to store dimension %s", string2str(rd->id));
  809. }
  810. }
  811. rrddim_foreach_done(rd);
  812. }
  813. rrdset_foreach_done(st);
  814. buffer_free(work_buffer);
  815. return more_to_do;
  816. }
  817. // Worker thread to scan hosts for pending metadata to store
  818. static void start_metadata_hosts(uv_work_t *req __maybe_unused)
  819. {
  820. RRDHOST *host;
  821. struct scan_metadata_payload *data = req->data;
  822. struct metadata_wc *wc = data->wc;
  823. bool run_again = false;
  824. dfe_start_reentrant(rrdhost_root_index, host) {
  825. if (rrdhost_flag_check(host, RRDHOST_FLAG_ARCHIVED) || !rrdhost_flag_check(host, RRDHOST_FLAG_METADATA_UPDATE))
  826. continue;
  827. internal_error(true, "METADATA: Scanning host %s", rrdhost_hostname(host));
  828. rrdhost_flag_clear(host,RRDHOST_FLAG_METADATA_UPDATE);
  829. if (unlikely(metadata_scan_host(host, data->max_count))) {
  830. run_again = true;
  831. rrdhost_flag_set(host,RRDHOST_FLAG_METADATA_UPDATE);
  832. internal_error(true,"METADATA: Rescheduling host %s to run; more charts to store", rrdhost_hostname(host));
  833. }
  834. }
  835. dfe_done(host);
  836. if (unlikely(run_again))
  837. wc->check_hosts_after = now_realtime_sec() + METADATA_HOST_CHECK_IMMEDIATE;
  838. else
  839. wc->check_hosts_after = now_realtime_sec() + METADATA_HOST_CHECK_INTERVAL;
  840. }
  841. static void metadata_event_loop(void *arg)
  842. {
  843. worker_register("METASYNC");
  844. worker_register_job_name(METADATA_DATABASE_NOOP, "noop");
  845. worker_register_job_name(METADATA_DATABASE_TIMER, "timer");
  846. worker_register_job_name(METADATA_ADD_CHART, "add chart");
  847. worker_register_job_name(METADATA_ADD_CHART_LABEL, "add chart label");
  848. worker_register_job_name(METADATA_ADD_DIMENSION, "add dimension");
  849. worker_register_job_name(METADATA_DEL_DIMENSION, "delete dimension");
  850. worker_register_job_name(METADATA_ADD_DIMENSION_OPTION, "dimension option");
  851. worker_register_job_name(METADATA_ADD_HOST_SYSTEM_INFO, "host system info");
  852. worker_register_job_name(METADATA_ADD_HOST_INFO, "host info");
  853. worker_register_job_name(METADATA_STORE_CLAIM_ID, "add claim id");
  854. worker_register_job_name(METADATA_STORE_HOST_LABELS, "host labels");
  855. worker_register_job_name(METADATA_MAINTENANCE, "maintenance");
  856. int ret;
  857. uv_loop_t *loop;
  858. unsigned cmd_batch_size;
  859. struct metadata_wc *wc = arg;
  860. enum metadata_opcode opcode, next_opcode;
  861. uv_work_t metadata_cleanup_worker;
  862. uv_thread_set_name_np(wc->thread, "METASYNC");
  863. loop = wc->loop = mallocz(sizeof(uv_loop_t));
  864. ret = uv_loop_init(loop);
  865. if (ret) {
  866. error("uv_loop_init(): %s", uv_strerror(ret));
  867. goto error_after_loop_init;
  868. }
  869. loop->data = wc;
  870. ret = uv_async_init(wc->loop, &wc->async, async_cb);
  871. if (ret) {
  872. error("uv_async_init(): %s", uv_strerror(ret));
  873. goto error_after_async_init;
  874. }
  875. wc->async.data = wc;
  876. ret = uv_timer_init(loop, &wc->timer_req);
  877. if (ret) {
  878. error("uv_timer_init(): %s", uv_strerror(ret));
  879. goto error_after_timer_init;
  880. }
  881. wc->timer_req.data = wc;
  882. fatal_assert(0 == uv_timer_start(&wc->timer_req, timer_cb, TIMER_INITIAL_PERIOD_MS, TIMER_REPEAT_PERIOD_MS));
  883. info("Starting metadata sync thread with %d entries command queue", METADATA_CMD_Q_MAX_SIZE);
  884. struct metadata_cmd cmd;
  885. memset(&cmd, 0, sizeof(cmd));
  886. metadata_flag_clear(wc, METADATA_FLAG_CLEANUP);
  887. metadata_flag_clear(wc, METADATA_FLAG_SCANNING_HOSTS);
  888. wc->check_metadata_after = now_realtime_sec() + METADATA_MAINTENANCE_FIRST_CHECK;
  889. wc->check_hosts_after = now_realtime_sec() + METADATA_HOST_CHECK_FIRST_CHECK;
  890. int shutdown = 0;
  891. int in_transaction = 0;
  892. int commands_in_transaction = 0;
  893. // This can be used in the event loop for all opcodes (not workers)
  894. BUFFER *work_buffer = buffer_create(1024);
  895. wc->row_id = 0;
  896. completion_mark_complete(&wc->init_complete);
  897. while (shutdown == 0 || (wc->flags & METADATA_WORKER_BUSY)) {
  898. RRDDIM *rd = NULL;
  899. RRDSET *st = NULL;
  900. RRDHOST *host = NULL;
  901. DICTIONARY_ITEM *dict_item = NULL;
  902. BUFFER *buffer = NULL;
  903. uuid_t *uuid;
  904. int rc;
  905. worker_is_idle();
  906. uv_run(loop, UV_RUN_DEFAULT);
  907. /* wait for commands */
  908. cmd_batch_size = 0;
  909. do {
  910. if (unlikely(cmd_batch_size >= METADATA_MAX_BATCH_SIZE))
  911. break;
  912. cmd = metadata_deq_cmd(wc, &next_opcode);
  913. opcode = cmd.opcode;
  914. if (unlikely(opcode == METADATA_DATABASE_NOOP && metadata_flag_check(wc, METADATA_FLAG_SHUTDOWN))) {
  915. shutdown = 1;
  916. continue;
  917. }
  918. ++cmd_batch_size;
  919. // If we are not in transaction and this command is the same with the next ; start a transaction
  920. if (!in_transaction && opcode < METADATA_SKIP_TRANSACTION && opcode == next_opcode) {
  921. if (opcode != METADATA_DATABASE_NOOP) {
  922. in_transaction = 1;
  923. db_execute("BEGIN TRANSACTION;");
  924. }
  925. }
  926. if (likely(in_transaction)) {
  927. commands_in_transaction++;
  928. }
  929. if (likely(opcode != METADATA_DATABASE_NOOP))
  930. worker_is_busy(opcode);
  931. switch (opcode) {
  932. case METADATA_DATABASE_NOOP:
  933. case METADATA_DATABASE_TIMER:
  934. break;
  935. case METADATA_ADD_CHART:
  936. dict_item = (DICTIONARY_ITEM * ) cmd.param[0];
  937. st = (RRDSET *) dictionary_acquired_item_value(dict_item);
  938. rc = sql_store_chart(
  939. &st->chart_uuid,
  940. &st->rrdhost->host_uuid,
  941. string2str(st->parts.type),
  942. string2str(st->parts.id),
  943. string2str(st->parts.name),
  944. rrdset_family(st),
  945. rrdset_context(st),
  946. rrdset_title(st),
  947. rrdset_units(st),
  948. rrdset_plugin_name(st),
  949. rrdset_module_name(st),
  950. st->priority,
  951. st->update_every,
  952. st->chart_type,
  953. st->rrd_memory_mode,
  954. st->entries);
  955. if (unlikely(rc))
  956. error_report("Failed to store chart %s", rrdset_id(st));
  957. dictionary_acquired_item_release(st->rrdhost->rrdset_root_index, dict_item);
  958. break;
  959. case METADATA_ADD_CHART_LABEL:
  960. dict_item = (DICTIONARY_ITEM * ) cmd.param[0];
  961. st = (RRDSET *) dictionary_acquired_item_value(dict_item);
  962. check_and_update_chart_labels(st, work_buffer);
  963. dictionary_acquired_item_release(st->rrdhost->rrdset_root_index, dict_item);
  964. break;
  965. case METADATA_ADD_DIMENSION:
  966. dict_item = (DICTIONARY_ITEM * ) cmd.param[0];
  967. rd = (RRDDIM *) dictionary_acquired_item_value(dict_item);
  968. rc = sql_store_dimension(
  969. &rd->metric_uuid,
  970. &rd->rrdset->chart_uuid,
  971. string2str(rd->id),
  972. string2str(rd->name),
  973. rd->multiplier,
  974. rd->divisor,
  975. rd->algorithm,
  976. rrddim_option_check(rd, RRDDIM_OPTION_HIDDEN));
  977. if (unlikely(rc))
  978. error_report("Failed to store dimension %s", rrddim_id(rd));
  979. dictionary_acquired_item_release(rd->rrdset->rrddim_root_index, dict_item);
  980. break;
  981. case METADATA_DEL_DIMENSION:
  982. uuid = (uuid_t *) cmd.param[0];
  983. if (likely(dimension_can_be_deleted(uuid)))
  984. delete_dimension_uuid(uuid);
  985. freez(uuid);
  986. break;
  987. case METADATA_ADD_DIMENSION_OPTION:
  988. dict_item = (DICTIONARY_ITEM * ) cmd.param[0];
  989. rd = (RRDDIM *) dictionary_acquired_item_value(dict_item);
  990. rc = sql_set_dimension_option(
  991. &rd->metric_uuid, rrddim_flag_check(rd, RRDDIM_FLAG_META_HIDDEN) ? "hidden" : NULL);
  992. if (unlikely(rc))
  993. error_report("Failed to store dimension option for %s", string2str(rd->id));
  994. dictionary_acquired_item_release(rd->rrdset->rrddim_root_index, dict_item);
  995. break;
  996. case METADATA_ADD_HOST_SYSTEM_INFO:
  997. buffer = (BUFFER *) cmd.param[0];
  998. db_execute(buffer_tostring(buffer));
  999. buffer_free(buffer);
  1000. break;
  1001. case METADATA_ADD_HOST_INFO:
  1002. dict_item = (DICTIONARY_ITEM * ) cmd.param[0];
  1003. host = (RRDHOST *) dictionary_acquired_item_value(dict_item);
  1004. rc = sql_store_host_info(host);
  1005. if (unlikely(rc))
  1006. error_report("Failed to store host info in the database for %s", string2str(host->hostname));
  1007. dictionary_acquired_item_release(rrdhost_root_index, dict_item);
  1008. break;
  1009. case METADATA_STORE_CLAIM_ID:
  1010. store_claim_id((uuid_t *) cmd.param[0], (uuid_t *) cmd.param[1]);
  1011. freez((void *) cmd.param[0]);
  1012. freez((void *) cmd.param[1]);
  1013. break;
  1014. case METADATA_STORE_HOST_LABELS:
  1015. dict_item = (DICTIONARY_ITEM * ) cmd.param[0];
  1016. host = (RRDHOST *) dictionary_acquired_item_value(dict_item);
  1017. rc = exec_statement_with_uuid(SQL_DELETE_HOST_LABELS, &host->host_uuid);
  1018. if (likely(rc == SQLITE_OK)) {
  1019. buffer_flush(work_buffer);
  1020. struct query_build tmp = {.sql = work_buffer, .count = 0};
  1021. uuid_unparse_lower(host->host_uuid, tmp.uuid_str);
  1022. rrdlabels_walkthrough_read(host->rrdlabels, host_label_store_to_sql_callback, &tmp);
  1023. db_execute(buffer_tostring(work_buffer));
  1024. }
  1025. dictionary_acquired_item_release(rrdhost_root_index, dict_item);
  1026. break;
  1027. case METADATA_SCAN_HOSTS:
  1028. if (unlikely(metadata_flag_check(wc, METADATA_FLAG_SCANNING_HOSTS)))
  1029. break;
  1030. struct scan_metadata_payload *data = mallocz(sizeof(*data));
  1031. data->request.data = data;
  1032. data->wc = wc;
  1033. data->completion = cmd.completion; // Completion by the worker
  1034. if (unlikely(cmd.completion)) {
  1035. data->max_count = 0; // 0 will process all pending updates
  1036. cmd.completion = NULL; // Do not complete after launching worker (worker will do)
  1037. }
  1038. else
  1039. data->max_count = 1000;
  1040. metadata_flag_set(wc, METADATA_FLAG_SCANNING_HOSTS);
  1041. if (unlikely(
  1042. uv_queue_work(loop,&data->request,
  1043. start_metadata_hosts,
  1044. after_metadata_hosts))) {
  1045. // Failed to launch worker -- let the event loop handle completion
  1046. cmd.completion = data->completion;
  1047. freez(data);
  1048. metadata_flag_clear(wc, METADATA_FLAG_SCANNING_HOSTS);
  1049. }
  1050. break;
  1051. case METADATA_STORE_BUFFER:
  1052. buffer = (BUFFER *) cmd.param[0];
  1053. db_execute(buffer_tostring(buffer));
  1054. buffer_free(buffer);
  1055. break;
  1056. case METADATA_MAINTENANCE:
  1057. if (unlikely(metadata_flag_check(wc, METADATA_FLAG_CLEANUP)))
  1058. break;
  1059. metadata_cleanup_worker.data = wc;
  1060. metadata_flag_set(wc, METADATA_FLAG_CLEANUP);
  1061. if (unlikely(
  1062. uv_queue_work(loop, &metadata_cleanup_worker, start_metadata_cleanup, after_metadata_cleanup))) {
  1063. metadata_flag_clear(wc, METADATA_FLAG_CLEANUP);
  1064. }
  1065. break;
  1066. case METADATA_UNITTEST:;
  1067. struct thread_unittest *tu = (struct thread_unittest *) cmd.param[0];
  1068. sleep_usec(1000); // processing takes 1ms
  1069. __atomic_fetch_add(&tu->processed, 1, __ATOMIC_SEQ_CST);
  1070. break;
  1071. default:
  1072. break;
  1073. }
  1074. if (in_transaction && (commands_in_transaction >= METADATA_MAX_TRANSACTION_BATCH || opcode != next_opcode)) {
  1075. in_transaction = 0;
  1076. db_execute("COMMIT TRANSACTION;");
  1077. commands_in_transaction = 0;
  1078. }
  1079. if (cmd.completion)
  1080. completion_mark_complete(cmd.completion);
  1081. } while (opcode != METADATA_DATABASE_NOOP);
  1082. }
  1083. if (!uv_timer_stop(&wc->timer_req))
  1084. uv_close((uv_handle_t *)&wc->timer_req, NULL);
  1085. /*
  1086. * uv_async_send after uv_close does not seem to crash in linux at the moment,
  1087. * it is however undocumented behaviour we need to be aware if this becomes
  1088. * an issue in the future.
  1089. */
  1090. uv_close((uv_handle_t *)&wc->async, NULL);
  1091. uv_run(loop, UV_RUN_DEFAULT);
  1092. uv_cond_destroy(&wc->cmd_cond);
  1093. /* uv_mutex_destroy(&wc->cmd_mutex); */
  1094. //fatal_assert(0 == uv_loop_close(loop));
  1095. int rc;
  1096. do {
  1097. rc = uv_loop_close(loop);
  1098. } while (rc != UV_EBUSY);
  1099. freez(loop);
  1100. worker_unregister();
  1101. buffer_free(work_buffer);
  1102. info("METADATA: Shutting down event loop");
  1103. completion_mark_complete(&wc->init_complete);
  1104. return;
  1105. error_after_timer_init:
  1106. uv_close((uv_handle_t *)&wc->async, NULL);
  1107. error_after_async_init:
  1108. fatal_assert(0 == uv_loop_close(loop));
  1109. error_after_loop_init:
  1110. freez(loop);
  1111. worker_unregister();
  1112. }
  1113. struct metadata_wc metasync_worker = {.loop = NULL};
  1114. void metadata_sync_shutdown(void)
  1115. {
  1116. completion_init(&metasync_worker.init_complete);
  1117. struct metadata_cmd cmd;
  1118. memset(&cmd, 0, sizeof(cmd));
  1119. info("METADATA: Sending a shutdown command");
  1120. cmd.opcode = METADATA_SYNC_SHUTDOWN;
  1121. metadata_enq_cmd(&metasync_worker, &cmd);
  1122. /* wait for metadata thread to shut down */
  1123. info("METADATA: Waiting for shutdown ACK");
  1124. completion_wait_for(&metasync_worker.init_complete);
  1125. completion_destroy(&metasync_worker.init_complete);
  1126. info("METADATA: Shutdown complete");
  1127. }
  1128. void metadata_sync_shutdown_prepare(void)
  1129. {
  1130. struct metadata_cmd cmd;
  1131. memset(&cmd, 0, sizeof(cmd));
  1132. struct completion compl;
  1133. completion_init(&compl);
  1134. info("METADATA: Sending a scan host command");
  1135. uint32_t max_wait_iterations = 2000;
  1136. while (unlikely(metadata_flag_check(&metasync_worker, METADATA_FLAG_SCANNING_HOSTS)) && max_wait_iterations--) {
  1137. if (max_wait_iterations == 1999)
  1138. info("METADATA: Current worker is running; waiting to finish");
  1139. sleep_usec(1000);
  1140. }
  1141. cmd.opcode = METADATA_SCAN_HOSTS;
  1142. cmd.completion = &compl;
  1143. metadata_enq_cmd(&metasync_worker, &cmd);
  1144. info("METADATA: Waiting for host scan completion");
  1145. completion_wait_for(&compl);
  1146. completion_destroy(&compl);
  1147. info("METADATA: Host scan complete; can continue with shutdown");
  1148. }
  1149. // -------------------------------------------------------------
  1150. // Init function called on agent startup
  1151. void metadata_sync_init(void)
  1152. {
  1153. struct metadata_wc *wc = &metasync_worker;
  1154. fatal_assert(0 == uv_mutex_init(&metadata_async_lock));
  1155. memset(wc, 0, sizeof(*wc));
  1156. metadata_init_cmd_queue(wc);
  1157. completion_init(&wc->init_complete);
  1158. fatal_assert(0 == uv_thread_create(&(wc->thread), metadata_event_loop, wc));
  1159. completion_wait_for(&wc->init_complete);
  1160. completion_destroy(&wc->init_complete);
  1161. info("SQLite metadata sync initialization complete");
  1162. }
  1163. // Helpers
  1164. static inline void queue_metadata_cmd(enum metadata_opcode opcode, const void *param0, const void *param1)
  1165. {
  1166. struct metadata_cmd cmd;
  1167. cmd.opcode = opcode;
  1168. cmd.param[0] = param0;
  1169. cmd.param[1] = param1;
  1170. cmd.completion = NULL;
  1171. metadata_enq_cmd(&metasync_worker, &cmd);
  1172. }
  1173. // Public
  1174. void metaqueue_chart_update(RRDSET *st)
  1175. {
  1176. const DICTIONARY_ITEM *acquired_st = dictionary_get_and_acquire_item(st->rrdhost->rrdset_root_index, string2str(st->id));
  1177. queue_metadata_cmd(METADATA_ADD_CHART, acquired_st, NULL);
  1178. }
  1179. //
  1180. // RD may not be collected, so we may store it needlessly
  1181. void metaqueue_dimension_update(RRDDIM *rd)
  1182. {
  1183. const DICTIONARY_ITEM *acquired_rd =
  1184. dictionary_get_and_acquire_item(rd->rrdset->rrddim_root_index, string2str(rd->id));
  1185. if (unlikely(rrdset_flag_check(rd->rrdset, RRDSET_FLAG_METADATA_UPDATE))) {
  1186. metaqueue_chart_update(rd->rrdset);
  1187. rrdset_flag_clear(rd->rrdset, RRDSET_FLAG_METADATA_UPDATE);
  1188. }
  1189. queue_metadata_cmd(METADATA_ADD_DIMENSION, acquired_rd, NULL);
  1190. }
  1191. void metaqueue_dimension_update_flags(RRDDIM *rd)
  1192. {
  1193. const DICTIONARY_ITEM *acquired_rd =
  1194. dictionary_get_and_acquire_item(rd->rrdset->rrddim_root_index, string2str(rd->id));
  1195. queue_metadata_cmd(METADATA_ADD_DIMENSION_OPTION, acquired_rd, NULL);
  1196. }
  1197. void metaqueue_host_update_system_info(RRDHOST *host)
  1198. {
  1199. BUFFER *work_buffer = sql_store_host_system_info(host);
  1200. if (unlikely(!work_buffer))
  1201. return;
  1202. queue_metadata_cmd(METADATA_ADD_HOST_SYSTEM_INFO, work_buffer, NULL);
  1203. }
  1204. void metaqueue_host_update_info(const char *machine_guid)
  1205. {
  1206. const DICTIONARY_ITEM *acquired_host = dictionary_get_and_acquire_item(rrdhost_root_index, machine_guid);
  1207. queue_metadata_cmd(METADATA_ADD_HOST_INFO, acquired_host, NULL);
  1208. }
  1209. void metaqueue_delete_dimension_uuid(uuid_t *uuid)
  1210. {
  1211. if (unlikely(!metasync_worker.loop))
  1212. return;
  1213. uuid_t *use_uuid = mallocz(sizeof(*uuid));
  1214. uuid_copy(*use_uuid, *uuid);
  1215. queue_metadata_cmd(METADATA_DEL_DIMENSION, use_uuid, NULL);
  1216. }
  1217. void metaqueue_store_claim_id(uuid_t *host_uuid, uuid_t *claim_uuid)
  1218. {
  1219. if (unlikely(!host_uuid))
  1220. return;
  1221. uuid_t *local_host_uuid = mallocz(sizeof(*host_uuid));
  1222. uuid_t *local_claim_uuid = NULL;
  1223. uuid_copy(*local_host_uuid, *host_uuid);
  1224. if (likely(claim_uuid)) {
  1225. local_claim_uuid = mallocz(sizeof(*claim_uuid));
  1226. uuid_copy(*local_claim_uuid, *claim_uuid);
  1227. }
  1228. queue_metadata_cmd(METADATA_STORE_CLAIM_ID, local_host_uuid, local_claim_uuid);
  1229. }
  1230. void metaqueue_store_host_labels(const char *machine_guid)
  1231. {
  1232. const DICTIONARY_ITEM *acquired_host = dictionary_get_and_acquire_item(rrdhost_root_index, machine_guid);
  1233. queue_metadata_cmd(METADATA_STORE_HOST_LABELS, acquired_host, NULL);
  1234. }
  1235. void metaqueue_buffer(BUFFER *buffer)
  1236. {
  1237. queue_metadata_cmd(METADATA_STORE_BUFFER, buffer, NULL);
  1238. }
  1239. void metaqueue_chart_labels(RRDSET *st)
  1240. {
  1241. const DICTIONARY_ITEM *acquired_st = dictionary_get_and_acquire_item(st->rrdhost->rrdset_root_index, string2str(st->id));
  1242. queue_metadata_cmd(METADATA_ADD_CHART_LABEL, acquired_st, NULL);
  1243. }
  1244. //
  1245. // unitests
  1246. //
  1247. static void *unittest_queue_metadata(void *arg) {
  1248. struct thread_unittest *tu = arg;
  1249. struct metadata_cmd cmd;
  1250. cmd.opcode = METADATA_UNITTEST;
  1251. cmd.param[0] = tu;
  1252. cmd.param[1] = NULL;
  1253. cmd.completion = NULL;
  1254. metadata_enq_cmd(&metasync_worker, &cmd);
  1255. do {
  1256. __atomic_fetch_add(&tu->added, 1, __ATOMIC_SEQ_CST);
  1257. metadata_enq_cmd(&metasync_worker, &cmd);
  1258. sleep_usec(10000);
  1259. } while (!__atomic_load_n(&tu->join, __ATOMIC_RELAXED));
  1260. return arg;
  1261. }
  1262. static void *metadata_unittest_threads(void)
  1263. {
  1264. unsigned done;
  1265. struct thread_unittest tu = {
  1266. .join = 0,
  1267. .added = 0,
  1268. .processed = 0,
  1269. .done = &done,
  1270. };
  1271. // Queue messages / Time it
  1272. time_t seconds_to_run = 5;
  1273. int threads_to_create = 4;
  1274. fprintf(
  1275. stderr,
  1276. "\nChecking metadata queue using %d threads for %lld seconds...\n",
  1277. threads_to_create,
  1278. (long long)seconds_to_run);
  1279. netdata_thread_t threads[threads_to_create];
  1280. tu.join = 0;
  1281. for (int i = 0; i < threads_to_create; i++) {
  1282. char buf[100 + 1];
  1283. snprintf(buf, 100, "meta%d", i);
  1284. netdata_thread_create(
  1285. &threads[i],
  1286. buf,
  1287. NETDATA_THREAD_OPTION_DONT_LOG | NETDATA_THREAD_OPTION_JOINABLE,
  1288. unittest_queue_metadata,
  1289. &tu);
  1290. }
  1291. uv_async_send(&metasync_worker.async);
  1292. sleep_usec(seconds_to_run * USEC_PER_SEC);
  1293. __atomic_store_n(&tu.join, 1, __ATOMIC_RELAXED);
  1294. for (int i = 0; i < threads_to_create; i++) {
  1295. void *retval;
  1296. netdata_thread_join(threads[i], &retval);
  1297. }
  1298. // uv_async_send(&metasync_worker.async);
  1299. sleep_usec(5 * USEC_PER_SEC);
  1300. fprintf(stderr, "Added %u elements, processed %u\n", tu.added, tu.processed);
  1301. return 0;
  1302. }
  1303. int metadata_unittest(void)
  1304. {
  1305. metadata_sync_init();
  1306. // Queue items for a specific period of time
  1307. metadata_unittest_threads();
  1308. fprintf(stderr, "Items still in queue %u\n", metasync_worker.queue_size);
  1309. metadata_sync_shutdown();
  1310. return 0;
  1311. }