rrdengine.c 68 KB

1234567891011121314151617181920212223242526272829303132333435363738394041424344454647484950515253545556575859606162636465666768697071727374757677787980818283848586878889909192939495969798991001011021031041051061071081091101111121131141151161171181191201211221231241251261271281291301311321331341351361371381391401411421431441451461471481491501511521531541551561571581591601611621631641651661671681691701711721731741751761771781791801811821831841851861871881891901911921931941951961971981992002012022032042052062072082092102112122132142152162172182192202212222232242252262272282292302312322332342352362372382392402412422432442452462472482492502512522532542552562572582592602612622632642652662672682692702712722732742752762772782792802812822832842852862872882892902912922932942952962972982993003013023033043053063073083093103113123133143153163173183193203213223233243253263273283293303313323333343353363373383393403413423433443453463473483493503513523533543553563573583593603613623633643653663673683693703713723733743753763773783793803813823833843853863873883893903913923933943953963973983994004014024034044054064074084094104114124134144154164174184194204214224234244254264274284294304314324334344354364374384394404414424434444454464474484494504514524534544554564574584594604614624634644654664674684694704714724734744754764774784794804814824834844854864874884894904914924934944954964974984995005015025035045055065075085095105115125135145155165175185195205215225235245255265275285295305315325335345355365375385395405415425435445455465475485495505515525535545555565575585595605615625635645655665675685695705715725735745755765775785795805815825835845855865875885895905915925935945955965975985996006016026036046056066076086096106116126136146156166176186196206216226236246256266276286296306316326336346356366376386396406416426436446456466476486496506516526536546556566576586596606616626636646656666676686696706716726736746756766776786796806816826836846856866876886896906916926936946956966976986997007017027037047057067077087097107117127137147157167177187197207217227237247257267277287297307317327337347357367377387397407417427437447457467477487497507517527537547557567577587597607617627637647657667677687697707717727737747757767777787797807817827837847857867877887897907917927937947957967977987998008018028038048058068078088098108118128138148158168178188198208218228238248258268278288298308318328338348358368378388398408418428438448458468478488498508518528538548558568578588598608618628638648658668678688698708718728738748758768778788798808818828838848858868878888898908918928938948958968978988999009019029039049059069079089099109119129139149159169179189199209219229239249259269279289299309319329339349359369379389399409419429439449459469479489499509519529539549559569579589599609619629639649659669679689699709719729739749759769779789799809819829839849859869879889899909919929939949959969979989991000100110021003100410051006100710081009101010111012101310141015101610171018101910201021102210231024102510261027102810291030103110321033103410351036103710381039104010411042104310441045104610471048104910501051105210531054105510561057105810591060106110621063106410651066106710681069107010711072107310741075107610771078107910801081108210831084108510861087108810891090109110921093109410951096109710981099110011011102110311041105110611071108110911101111111211131114111511161117111811191120112111221123112411251126112711281129113011311132113311341135113611371138113911401141114211431144114511461147114811491150115111521153115411551156115711581159116011611162116311641165116611671168116911701171117211731174117511761177117811791180118111821183118411851186118711881189119011911192119311941195119611971198119912001201120212031204120512061207120812091210121112121213121412151216121712181219122012211222122312241225122612271228122912301231123212331234123512361237123812391240124112421243124412451246124712481249125012511252125312541255125612571258125912601261126212631264126512661267126812691270127112721273127412751276127712781279128012811282128312841285128612871288128912901291129212931294129512961297129812991300130113021303130413051306130713081309131013111312131313141315131613171318131913201321132213231324132513261327132813291330133113321333133413351336133713381339134013411342134313441345134613471348134913501351135213531354135513561357135813591360136113621363136413651366136713681369137013711372137313741375137613771378137913801381138213831384138513861387138813891390139113921393139413951396139713981399140014011402140314041405140614071408140914101411141214131414141514161417141814191420142114221423142414251426142714281429143014311432143314341435143614371438143914401441144214431444144514461447144814491450145114521453145414551456145714581459146014611462146314641465146614671468146914701471147214731474147514761477147814791480148114821483148414851486148714881489149014911492149314941495149614971498149915001501150215031504150515061507150815091510151115121513151415151516151715181519152015211522152315241525152615271528152915301531153215331534153515361537153815391540154115421543154415451546154715481549155015511552155315541555155615571558155915601561156215631564156515661567156815691570157115721573157415751576157715781579158015811582158315841585158615871588158915901591159215931594159515961597159815991600160116021603160416051606160716081609161016111612161316141615161616171618161916201621162216231624162516261627162816291630163116321633163416351636163716381639164016411642164316441645164616471648164916501651165216531654165516561657165816591660166116621663166416651666166716681669167016711672167316741675167616771678167916801681168216831684168516861687168816891690169116921693169416951696169716981699170017011702170317041705170617071708170917101711171217131714171517161717171817191720172117221723172417251726172717281729173017311732173317341735173617371738173917401741174217431744174517461747174817491750175117521753175417551756175717581759176017611762176317641765176617671768176917701771177217731774177517761777177817791780178117821783178417851786178717881789179017911792179317941795179617971798179918001801180218031804
  1. // SPDX-License-Identifier: GPL-3.0-or-later
  2. #define NETDATA_RRD_INTERNALS
  3. #include "rrdengine.h"
  4. #include "pdc.h"
  5. rrdeng_stats_t global_io_errors = 0;
  6. rrdeng_stats_t global_fs_errors = 0;
  7. rrdeng_stats_t rrdeng_reserved_file_descriptors = 0;
  8. rrdeng_stats_t global_pg_cache_over_half_dirty_events = 0;
  9. rrdeng_stats_t global_flushing_pressure_page_deletions = 0;
  10. unsigned rrdeng_pages_per_extent = MAX_PAGES_PER_EXTENT;
  11. #if WORKER_UTILIZATION_MAX_JOB_TYPES < (RRDENG_OPCODE_MAX + 2)
  12. #error Please increase WORKER_UTILIZATION_MAX_JOB_TYPES to at least (RRDENG_MAX_OPCODE + 2)
  13. #endif
  14. struct rrdeng_main {
  15. uv_thread_t thread;
  16. uv_loop_t loop;
  17. uv_async_t async;
  18. uv_timer_t timer;
  19. pid_t tid;
  20. size_t flushes_running;
  21. size_t evictions_running;
  22. size_t cleanup_running;
  23. struct {
  24. ARAL *ar;
  25. struct {
  26. SPINLOCK spinlock;
  27. size_t waiting;
  28. struct rrdeng_cmd *waiting_items_by_priority[STORAGE_PRIORITY_INTERNAL_MAX_DONT_USE];
  29. size_t executed_by_priority[STORAGE_PRIORITY_INTERNAL_MAX_DONT_USE];
  30. } unsafe;
  31. } cmd_queue;
  32. struct {
  33. ARAL *ar;
  34. struct {
  35. size_t dispatched;
  36. size_t executing;
  37. size_t pending_cb;
  38. } atomics;
  39. } work_cmd;
  40. struct {
  41. ARAL *ar;
  42. } handles;
  43. struct {
  44. ARAL *ar;
  45. } descriptors;
  46. struct {
  47. ARAL *ar;
  48. } xt_io_descr;
  49. } rrdeng_main = {
  50. .thread = 0,
  51. .loop = {},
  52. .async = {},
  53. .timer = {},
  54. .flushes_running = 0,
  55. .evictions_running = 0,
  56. .cleanup_running = 0,
  57. .cmd_queue = {
  58. .unsafe = {
  59. .spinlock = NETDATA_SPINLOCK_INITIALIZER,
  60. },
  61. }
  62. };
  63. static void sanity_check(void)
  64. {
  65. BUILD_BUG_ON(WORKER_UTILIZATION_MAX_JOB_TYPES < (RRDENG_OPCODE_MAX + 2));
  66. /* Magic numbers must fit in the super-blocks */
  67. BUILD_BUG_ON(strlen(RRDENG_DF_MAGIC) > RRDENG_MAGIC_SZ);
  68. BUILD_BUG_ON(strlen(RRDENG_JF_MAGIC) > RRDENG_MAGIC_SZ);
  69. /* Version strings must fit in the super-blocks */
  70. BUILD_BUG_ON(strlen(RRDENG_DF_VER) > RRDENG_VER_SZ);
  71. BUILD_BUG_ON(strlen(RRDENG_JF_VER) > RRDENG_VER_SZ);
  72. /* Data file super-block cannot be larger than RRDENG_BLOCK_SIZE */
  73. BUILD_BUG_ON(RRDENG_DF_SB_PADDING_SZ < 0);
  74. BUILD_BUG_ON(sizeof(uuid_t) != UUID_SZ); /* check UUID size */
  75. /* page count must fit in 8 bits */
  76. BUILD_BUG_ON(MAX_PAGES_PER_EXTENT > 255);
  77. /* extent cache count must fit in 32 bits */
  78. // BUILD_BUG_ON(MAX_CACHED_EXTENTS > 32);
  79. /* page info scratch space must be able to hold 2 32-bit integers */
  80. BUILD_BUG_ON(sizeof(((struct rrdeng_page_info *)0)->scratch) < 2 * sizeof(uint32_t));
  81. }
  82. // ----------------------------------------------------------------------------
  83. // work request cache
  84. typedef void *(*work_cb)(struct rrdengine_instance *ctx, void *data, struct completion *completion, uv_work_t* req);
  85. typedef void (*after_work_cb)(struct rrdengine_instance *ctx, void *data, struct completion *completion, uv_work_t* req, int status);
  86. struct rrdeng_work {
  87. uv_work_t req;
  88. struct rrdengine_instance *ctx;
  89. void *data;
  90. struct completion *completion;
  91. work_cb work_cb;
  92. after_work_cb after_work_cb;
  93. enum rrdeng_opcode opcode;
  94. };
  95. static void work_request_init(void) {
  96. rrdeng_main.work_cmd.ar = aral_create(
  97. "dbengine-work-cmd",
  98. sizeof(struct rrdeng_work),
  99. 0,
  100. 65536, NULL,
  101. NULL, NULL, false, false
  102. );
  103. }
  104. static inline bool work_request_full(void) {
  105. return __atomic_load_n(&rrdeng_main.work_cmd.atomics.dispatched, __ATOMIC_RELAXED) >= (size_t)(libuv_worker_threads - RESERVED_LIBUV_WORKER_THREADS);
  106. }
  107. static inline void work_done(struct rrdeng_work *work_request) {
  108. aral_freez(rrdeng_main.work_cmd.ar, work_request);
  109. }
  110. static void work_standard_worker(uv_work_t *req) {
  111. __atomic_add_fetch(&rrdeng_main.work_cmd.atomics.executing, 1, __ATOMIC_RELAXED);
  112. register_libuv_worker_jobs();
  113. worker_is_busy(UV_EVENT_WORKER_INIT);
  114. struct rrdeng_work *work_request = req->data;
  115. work_request->data = work_request->work_cb(work_request->ctx, work_request->data, work_request->completion, req);
  116. worker_is_idle();
  117. __atomic_sub_fetch(&rrdeng_main.work_cmd.atomics.dispatched, 1, __ATOMIC_RELAXED);
  118. __atomic_sub_fetch(&rrdeng_main.work_cmd.atomics.executing, 1, __ATOMIC_RELAXED);
  119. __atomic_add_fetch(&rrdeng_main.work_cmd.atomics.pending_cb, 1, __ATOMIC_RELAXED);
  120. // signal the event loop a worker is available
  121. fatal_assert(0 == uv_async_send(&rrdeng_main.async));
  122. }
  123. static void after_work_standard_callback(uv_work_t* req, int status) {
  124. struct rrdeng_work *work_request = req->data;
  125. worker_is_busy(RRDENG_OPCODE_MAX + work_request->opcode);
  126. if(work_request->after_work_cb)
  127. work_request->after_work_cb(work_request->ctx, work_request->data, work_request->completion, req, status);
  128. work_done(work_request);
  129. __atomic_sub_fetch(&rrdeng_main.work_cmd.atomics.pending_cb, 1, __ATOMIC_RELAXED);
  130. worker_is_idle();
  131. }
  132. static bool work_dispatch(struct rrdengine_instance *ctx, void *data, struct completion *completion, enum rrdeng_opcode opcode, work_cb work_cb, after_work_cb after_work_cb) {
  133. struct rrdeng_work *work_request = NULL;
  134. internal_fatal(rrdeng_main.tid != gettid(), "work_dispatch() can only be run from the event loop thread");
  135. work_request = aral_mallocz(rrdeng_main.work_cmd.ar);
  136. memset(work_request, 0, sizeof(struct rrdeng_work));
  137. work_request->req.data = work_request;
  138. work_request->ctx = ctx;
  139. work_request->data = data;
  140. work_request->completion = completion;
  141. work_request->work_cb = work_cb;
  142. work_request->after_work_cb = after_work_cb;
  143. work_request->opcode = opcode;
  144. if(uv_queue_work(&rrdeng_main.loop, &work_request->req, work_standard_worker, after_work_standard_callback)) {
  145. internal_fatal(true, "DBENGINE: cannot queue work");
  146. work_done(work_request);
  147. return false;
  148. }
  149. __atomic_add_fetch(&rrdeng_main.work_cmd.atomics.dispatched, 1, __ATOMIC_RELAXED);
  150. return true;
  151. }
  152. // ----------------------------------------------------------------------------
  153. // page descriptor cache
  154. void page_descriptors_init(void) {
  155. rrdeng_main.descriptors.ar = aral_create(
  156. "dbengine-descriptors",
  157. sizeof(struct page_descr_with_data),
  158. 0,
  159. 65536 * 4,
  160. NULL,
  161. NULL, NULL, false, false);
  162. }
  163. struct page_descr_with_data *page_descriptor_get(void) {
  164. struct page_descr_with_data *descr = aral_mallocz(rrdeng_main.descriptors.ar);
  165. memset(descr, 0, sizeof(struct page_descr_with_data));
  166. return descr;
  167. }
  168. static inline void page_descriptor_release(struct page_descr_with_data *descr) {
  169. aral_freez(rrdeng_main.descriptors.ar, descr);
  170. }
  171. // ----------------------------------------------------------------------------
  172. // extent io descriptor cache
  173. static void extent_io_descriptor_init(void) {
  174. rrdeng_main.xt_io_descr.ar = aral_create(
  175. "dbengine-extent-io",
  176. sizeof(struct extent_io_descriptor),
  177. 0,
  178. 65536,
  179. NULL,
  180. NULL, NULL, false, false
  181. );
  182. }
  183. static struct extent_io_descriptor *extent_io_descriptor_get(void) {
  184. struct extent_io_descriptor *xt_io_descr = aral_mallocz(rrdeng_main.xt_io_descr.ar);
  185. memset(xt_io_descr, 0, sizeof(struct extent_io_descriptor));
  186. return xt_io_descr;
  187. }
  188. static inline void extent_io_descriptor_release(struct extent_io_descriptor *xt_io_descr) {
  189. aral_freez(rrdeng_main.xt_io_descr.ar, xt_io_descr);
  190. }
  191. // ----------------------------------------------------------------------------
  192. // query handle cache
  193. void rrdeng_query_handle_init(void) {
  194. rrdeng_main.handles.ar = aral_create(
  195. "dbengine-query-handles",
  196. sizeof(struct rrdeng_query_handle),
  197. 0,
  198. 65536,
  199. NULL,
  200. NULL, NULL, false, false);
  201. }
  202. struct rrdeng_query_handle *rrdeng_query_handle_get(void) {
  203. struct rrdeng_query_handle *handle = aral_mallocz(rrdeng_main.handles.ar);
  204. memset(handle, 0, sizeof(struct rrdeng_query_handle));
  205. return handle;
  206. }
  207. void rrdeng_query_handle_release(struct rrdeng_query_handle *handle) {
  208. aral_freez(rrdeng_main.handles.ar, handle);
  209. }
  210. // ----------------------------------------------------------------------------
  211. // WAL cache
  212. static struct {
  213. struct {
  214. SPINLOCK spinlock;
  215. WAL *available_items;
  216. size_t available;
  217. } protected;
  218. struct {
  219. size_t allocated;
  220. } atomics;
  221. } wal_globals = {
  222. .protected = {
  223. .spinlock = NETDATA_SPINLOCK_INITIALIZER,
  224. .available_items = NULL,
  225. .available = 0,
  226. },
  227. .atomics = {
  228. .allocated = 0,
  229. },
  230. };
  231. static void wal_cleanup1(void) {
  232. WAL *wal = NULL;
  233. if(!netdata_spinlock_trylock(&wal_globals.protected.spinlock))
  234. return;
  235. if(wal_globals.protected.available_items && wal_globals.protected.available > storage_tiers) {
  236. wal = wal_globals.protected.available_items;
  237. DOUBLE_LINKED_LIST_REMOVE_ITEM_UNSAFE(wal_globals.protected.available_items, wal, cache.prev, cache.next);
  238. wal_globals.protected.available--;
  239. }
  240. netdata_spinlock_unlock(&wal_globals.protected.spinlock);
  241. if(wal) {
  242. posix_memfree(wal->buf);
  243. freez(wal);
  244. __atomic_sub_fetch(&wal_globals.atomics.allocated, 1, __ATOMIC_RELAXED);
  245. }
  246. }
  247. WAL *wal_get(struct rrdengine_instance *ctx, unsigned size) {
  248. if(!size || size > RRDENG_BLOCK_SIZE)
  249. fatal("DBENGINE: invalid WAL size requested");
  250. WAL *wal = NULL;
  251. netdata_spinlock_lock(&wal_globals.protected.spinlock);
  252. if(likely(wal_globals.protected.available_items)) {
  253. wal = wal_globals.protected.available_items;
  254. DOUBLE_LINKED_LIST_REMOVE_ITEM_UNSAFE(wal_globals.protected.available_items, wal, cache.prev, cache.next);
  255. wal_globals.protected.available--;
  256. }
  257. uint64_t transaction_id = __atomic_fetch_add(&ctx->atomic.transaction_id, 1, __ATOMIC_RELAXED);
  258. netdata_spinlock_unlock(&wal_globals.protected.spinlock);
  259. if(unlikely(!wal)) {
  260. wal = mallocz(sizeof(WAL));
  261. wal->buf_size = RRDENG_BLOCK_SIZE;
  262. int ret = posix_memalign((void *)&wal->buf, RRDFILE_ALIGNMENT, wal->buf_size);
  263. if (unlikely(ret))
  264. fatal("DBENGINE: posix_memalign:%s", strerror(ret));
  265. __atomic_add_fetch(&wal_globals.atomics.allocated, 1, __ATOMIC_RELAXED);
  266. }
  267. // these need to survive
  268. unsigned buf_size = wal->buf_size;
  269. void *buf = wal->buf;
  270. memset(wal, 0, sizeof(WAL));
  271. // put them back
  272. wal->buf_size = buf_size;
  273. wal->buf = buf;
  274. memset(wal->buf, 0, wal->buf_size);
  275. wal->transaction_id = transaction_id;
  276. wal->size = size;
  277. return wal;
  278. }
  279. void wal_release(WAL *wal) {
  280. if(unlikely(!wal)) return;
  281. netdata_spinlock_lock(&wal_globals.protected.spinlock);
  282. DOUBLE_LINKED_LIST_APPEND_ITEM_UNSAFE(wal_globals.protected.available_items, wal, cache.prev, cache.next);
  283. wal_globals.protected.available++;
  284. netdata_spinlock_unlock(&wal_globals.protected.spinlock);
  285. }
  286. // ----------------------------------------------------------------------------
  287. // command queue cache
  288. struct rrdeng_cmd {
  289. struct rrdengine_instance *ctx;
  290. enum rrdeng_opcode opcode;
  291. void *data;
  292. struct completion *completion;
  293. enum storage_priority priority;
  294. dequeue_callback_t dequeue_cb;
  295. struct {
  296. struct rrdeng_cmd *prev;
  297. struct rrdeng_cmd *next;
  298. } queue;
  299. };
  300. static void rrdeng_cmd_queue_init(void) {
  301. rrdeng_main.cmd_queue.ar = aral_create("dbengine-opcodes",
  302. sizeof(struct rrdeng_cmd),
  303. 0,
  304. 65536,
  305. NULL,
  306. NULL, NULL, false, false);
  307. }
  308. static inline STORAGE_PRIORITY rrdeng_enq_cmd_map_opcode_to_priority(enum rrdeng_opcode opcode, STORAGE_PRIORITY priority) {
  309. if(unlikely(priority >= STORAGE_PRIORITY_INTERNAL_MAX_DONT_USE))
  310. priority = STORAGE_PRIORITY_BEST_EFFORT;
  311. switch(opcode) {
  312. case RRDENG_OPCODE_QUERY:
  313. priority = STORAGE_PRIORITY_INTERNAL_QUERY_PREP;
  314. break;
  315. default:
  316. break;
  317. }
  318. return priority;
  319. }
  320. void rrdeng_enqueue_epdl_cmd(struct rrdeng_cmd *cmd) {
  321. epdl_cmd_queued(cmd->data, cmd);
  322. }
  323. void rrdeng_dequeue_epdl_cmd(struct rrdeng_cmd *cmd) {
  324. epdl_cmd_dequeued(cmd->data);
  325. }
  326. void rrdeng_req_cmd(requeue_callback_t get_cmd_cb, void *data, STORAGE_PRIORITY priority) {
  327. netdata_spinlock_lock(&rrdeng_main.cmd_queue.unsafe.spinlock);
  328. struct rrdeng_cmd *cmd = get_cmd_cb(data);
  329. if(cmd) {
  330. priority = rrdeng_enq_cmd_map_opcode_to_priority(cmd->opcode, priority);
  331. if (cmd->priority > priority) {
  332. DOUBLE_LINKED_LIST_REMOVE_ITEM_UNSAFE(rrdeng_main.cmd_queue.unsafe.waiting_items_by_priority[cmd->priority], cmd, queue.prev, queue.next);
  333. DOUBLE_LINKED_LIST_APPEND_ITEM_UNSAFE(rrdeng_main.cmd_queue.unsafe.waiting_items_by_priority[priority], cmd, queue.prev, queue.next);
  334. cmd->priority = priority;
  335. }
  336. }
  337. netdata_spinlock_unlock(&rrdeng_main.cmd_queue.unsafe.spinlock);
  338. }
  339. void rrdeng_enq_cmd(struct rrdengine_instance *ctx, enum rrdeng_opcode opcode, void *data, struct completion *completion,
  340. enum storage_priority priority, enqueue_callback_t enqueue_cb, dequeue_callback_t dequeue_cb) {
  341. priority = rrdeng_enq_cmd_map_opcode_to_priority(opcode, priority);
  342. struct rrdeng_cmd *cmd = aral_mallocz(rrdeng_main.cmd_queue.ar);
  343. memset(cmd, 0, sizeof(struct rrdeng_cmd));
  344. cmd->ctx = ctx;
  345. cmd->opcode = opcode;
  346. cmd->data = data;
  347. cmd->completion = completion;
  348. cmd->priority = priority;
  349. cmd->dequeue_cb = dequeue_cb;
  350. netdata_spinlock_lock(&rrdeng_main.cmd_queue.unsafe.spinlock);
  351. DOUBLE_LINKED_LIST_APPEND_ITEM_UNSAFE(rrdeng_main.cmd_queue.unsafe.waiting_items_by_priority[priority], cmd, queue.prev, queue.next);
  352. rrdeng_main.cmd_queue.unsafe.waiting++;
  353. if(enqueue_cb)
  354. enqueue_cb(cmd);
  355. netdata_spinlock_unlock(&rrdeng_main.cmd_queue.unsafe.spinlock);
  356. fatal_assert(0 == uv_async_send(&rrdeng_main.async));
  357. }
  358. static inline bool rrdeng_cmd_has_waiting_opcodes_in_lower_priorities(STORAGE_PRIORITY priority, STORAGE_PRIORITY max_priority) {
  359. for(; priority <= max_priority ; priority++)
  360. if(rrdeng_main.cmd_queue.unsafe.waiting_items_by_priority[priority])
  361. return true;
  362. return false;
  363. }
  364. static inline struct rrdeng_cmd rrdeng_deq_cmd(void) {
  365. struct rrdeng_cmd *cmd = NULL;
  366. STORAGE_PRIORITY max_priority = work_request_full() ? STORAGE_PRIORITY_INTERNAL_DBENGINE : STORAGE_PRIORITY_BEST_EFFORT;
  367. // find an opcode to execute from the queue
  368. netdata_spinlock_lock(&rrdeng_main.cmd_queue.unsafe.spinlock);
  369. for(STORAGE_PRIORITY priority = STORAGE_PRIORITY_INTERNAL_DBENGINE; priority <= max_priority ; priority++) {
  370. cmd = rrdeng_main.cmd_queue.unsafe.waiting_items_by_priority[priority];
  371. if(cmd) {
  372. // avoid starvation of lower priorities
  373. if(unlikely(priority >= STORAGE_PRIORITY_HIGH &&
  374. priority < STORAGE_PRIORITY_BEST_EFFORT &&
  375. ++rrdeng_main.cmd_queue.unsafe.executed_by_priority[priority] % 50 == 0 &&
  376. rrdeng_cmd_has_waiting_opcodes_in_lower_priorities(priority + 1, max_priority))) {
  377. // let the others run 2% of the requests
  378. cmd = NULL;
  379. continue;
  380. }
  381. // remove it from the queue
  382. DOUBLE_LINKED_LIST_REMOVE_ITEM_UNSAFE(rrdeng_main.cmd_queue.unsafe.waiting_items_by_priority[priority], cmd, queue.prev, queue.next);
  383. rrdeng_main.cmd_queue.unsafe.waiting--;
  384. break;
  385. }
  386. }
  387. if(cmd && cmd->dequeue_cb) {
  388. cmd->dequeue_cb(cmd);
  389. cmd->dequeue_cb = NULL;
  390. }
  391. netdata_spinlock_unlock(&rrdeng_main.cmd_queue.unsafe.spinlock);
  392. struct rrdeng_cmd ret;
  393. if(cmd) {
  394. // copy it, to return it
  395. ret = *cmd;
  396. aral_freez(rrdeng_main.cmd_queue.ar, cmd);
  397. }
  398. else
  399. ret = (struct rrdeng_cmd) {
  400. .ctx = NULL,
  401. .opcode = RRDENG_OPCODE_NOOP,
  402. .priority = STORAGE_PRIORITY_BEST_EFFORT,
  403. .completion = NULL,
  404. .data = NULL,
  405. };
  406. return ret;
  407. }
  408. // ----------------------------------------------------------------------------
  409. struct {
  410. ARAL *aral[RRD_STORAGE_TIERS];
  411. } dbengine_page_alloc_globals = {};
  412. static inline ARAL *page_size_lookup(size_t size) {
  413. for(size_t tier = 0; tier < storage_tiers ;tier++)
  414. if(size == tier_page_size[tier])
  415. return dbengine_page_alloc_globals.aral[tier];
  416. return NULL;
  417. }
  418. static void dbengine_page_alloc_init(void) {
  419. for(size_t i = storage_tiers; i > 0 ;i--) {
  420. size_t tier = storage_tiers - i;
  421. char buf[20 + 1];
  422. snprintfz(buf, 20, "tier%zu-pages", tier);
  423. dbengine_page_alloc_globals.aral[tier] = aral_create(
  424. buf,
  425. tier_page_size[tier],
  426. 64,
  427. 512 * tier_page_size[tier],
  428. pgc_aral_statistics(),
  429. NULL, NULL, false, false);
  430. }
  431. }
  432. void *dbengine_page_alloc(size_t size) {
  433. ARAL *ar = page_size_lookup(size);
  434. if(ar) return aral_mallocz(ar);
  435. return mallocz(size);
  436. }
  437. void dbengine_page_free(void *page, size_t size __maybe_unused) {
  438. if(unlikely(!page || page == DBENGINE_EMPTY_PAGE))
  439. return;
  440. ARAL *ar = page_size_lookup(size);
  441. if(ar)
  442. aral_freez(ar, page);
  443. else
  444. freez(page);
  445. }
  446. // ----------------------------------------------------------------------------
  447. void *dbengine_extent_alloc(size_t size) {
  448. void *extent = mallocz(size);
  449. return extent;
  450. }
  451. void dbengine_extent_free(void *extent, size_t size __maybe_unused) {
  452. freez(extent);
  453. }
  454. static void journalfile_extent_build(struct rrdengine_instance *ctx, struct extent_io_descriptor *xt_io_descr) {
  455. unsigned count, payload_length, descr_size, size_bytes;
  456. void *buf;
  457. /* persistent structures */
  458. struct rrdeng_df_extent_header *df_header;
  459. struct rrdeng_jf_transaction_header *jf_header;
  460. struct rrdeng_jf_store_data *jf_metric_data;
  461. struct rrdeng_jf_transaction_trailer *jf_trailer;
  462. uLong crc;
  463. df_header = xt_io_descr->buf;
  464. count = df_header->number_of_pages;
  465. descr_size = sizeof(*jf_metric_data->descr) * count;
  466. payload_length = sizeof(*jf_metric_data) + descr_size;
  467. size_bytes = sizeof(*jf_header) + payload_length + sizeof(*jf_trailer);
  468. xt_io_descr->wal = wal_get(ctx, size_bytes);
  469. buf = xt_io_descr->wal->buf;
  470. jf_header = buf;
  471. jf_header->type = STORE_DATA;
  472. jf_header->reserved = 0;
  473. jf_header->id = xt_io_descr->wal->transaction_id;
  474. jf_header->payload_length = payload_length;
  475. jf_metric_data = buf + sizeof(*jf_header);
  476. jf_metric_data->extent_offset = xt_io_descr->pos;
  477. jf_metric_data->extent_size = xt_io_descr->bytes;
  478. jf_metric_data->number_of_pages = count;
  479. memcpy(jf_metric_data->descr, df_header->descr, descr_size);
  480. jf_trailer = buf + sizeof(*jf_header) + payload_length;
  481. crc = crc32(0L, Z_NULL, 0);
  482. crc = crc32(crc, buf, sizeof(*jf_header) + payload_length);
  483. crc32set(jf_trailer->checksum, crc);
  484. }
  485. static void after_extent_flushed_to_open(struct rrdengine_instance *ctx __maybe_unused, void *data __maybe_unused, struct completion *completion __maybe_unused, uv_work_t* req __maybe_unused, int status __maybe_unused) {
  486. if(completion)
  487. completion_mark_complete(completion);
  488. if(ctx_is_available_for_queries(ctx))
  489. rrdeng_enq_cmd(ctx, RRDENG_OPCODE_DATABASE_ROTATE, NULL, NULL, STORAGE_PRIORITY_INTERNAL_DBENGINE, NULL, NULL);
  490. }
  491. static void *extent_flushed_to_open_tp_worker(struct rrdengine_instance *ctx __maybe_unused, void *data __maybe_unused, struct completion *completion __maybe_unused, uv_work_t *uv_work_req __maybe_unused) {
  492. worker_is_busy(UV_EVENT_DBENGINE_FLUSHED_TO_OPEN);
  493. uv_fs_t *uv_fs_request = data;
  494. struct extent_io_descriptor *xt_io_descr = uv_fs_request->data;
  495. struct page_descr_with_data *descr;
  496. struct rrdengine_datafile *datafile;
  497. unsigned i;
  498. datafile = xt_io_descr->datafile;
  499. bool still_running = ctx_is_available_for_queries(ctx);
  500. for (i = 0 ; i < xt_io_descr->descr_count ; ++i) {
  501. descr = xt_io_descr->descr_array[i];
  502. if (likely(still_running))
  503. pgc_open_add_hot_page(
  504. (Word_t)ctx, descr->metric_id,
  505. (time_t) (descr->start_time_ut / USEC_PER_SEC),
  506. (time_t) (descr->end_time_ut / USEC_PER_SEC),
  507. descr->update_every_s,
  508. datafile,
  509. xt_io_descr->pos, xt_io_descr->bytes, descr->page_length);
  510. page_descriptor_release(descr);
  511. }
  512. uv_fs_req_cleanup(uv_fs_request);
  513. posix_memfree(xt_io_descr->buf);
  514. extent_io_descriptor_release(xt_io_descr);
  515. netdata_spinlock_lock(&datafile->writers.spinlock);
  516. datafile->writers.flushed_to_open_running--;
  517. netdata_spinlock_unlock(&datafile->writers.spinlock);
  518. if(datafile->fileno != ctx_last_fileno_get(ctx) && still_running)
  519. // we just finished a flushing on a datafile that is not the active one
  520. rrdeng_enq_cmd(ctx, RRDENG_OPCODE_JOURNAL_INDEX, datafile, NULL, STORAGE_PRIORITY_INTERNAL_DBENGINE, NULL, NULL);
  521. return data;
  522. }
  523. // Main event loop callback
  524. static void after_extent_write_datafile_io(uv_fs_t *uv_fs_request) {
  525. worker_is_busy(RRDENG_OPCODE_MAX + RRDENG_OPCODE_EXTENT_WRITE);
  526. struct extent_io_descriptor *xt_io_descr = uv_fs_request->data;
  527. struct rrdengine_datafile *datafile = xt_io_descr->datafile;
  528. struct rrdengine_instance *ctx = datafile->ctx;
  529. if (uv_fs_request->result < 0) {
  530. ctx_io_error(ctx);
  531. error("DBENGINE: %s: uv_fs_write(): %s", __func__, uv_strerror((int)uv_fs_request->result));
  532. }
  533. journalfile_v1_extent_write(ctx, xt_io_descr->datafile, xt_io_descr->wal, &rrdeng_main.loop);
  534. netdata_spinlock_lock(&datafile->writers.spinlock);
  535. datafile->writers.running--;
  536. datafile->writers.flushed_to_open_running++;
  537. netdata_spinlock_unlock(&datafile->writers.spinlock);
  538. rrdeng_enq_cmd(xt_io_descr->ctx,
  539. RRDENG_OPCODE_FLUSHED_TO_OPEN,
  540. uv_fs_request,
  541. xt_io_descr->completion,
  542. STORAGE_PRIORITY_INTERNAL_DBENGINE,
  543. NULL,
  544. NULL);
  545. worker_is_idle();
  546. }
  547. static bool datafile_is_full(struct rrdengine_instance *ctx, struct rrdengine_datafile *datafile) {
  548. bool ret = false;
  549. netdata_spinlock_lock(&datafile->writers.spinlock);
  550. if(ctx_is_available_for_queries(ctx) && datafile->pos > rrdeng_target_data_file_size(ctx))
  551. ret = true;
  552. netdata_spinlock_unlock(&datafile->writers.spinlock);
  553. return ret;
  554. }
  555. static struct rrdengine_datafile *get_datafile_to_write_extent(struct rrdengine_instance *ctx) {
  556. struct rrdengine_datafile *datafile;
  557. // get the latest datafile
  558. uv_rwlock_rdlock(&ctx->datafiles.rwlock);
  559. datafile = ctx->datafiles.first->prev;
  560. // become a writer on this datafile, to prevent it from vanishing
  561. netdata_spinlock_lock(&datafile->writers.spinlock);
  562. datafile->writers.running++;
  563. netdata_spinlock_unlock(&datafile->writers.spinlock);
  564. uv_rwlock_rdunlock(&ctx->datafiles.rwlock);
  565. if(datafile_is_full(ctx, datafile)) {
  566. // remember the datafile we have become writers to
  567. struct rrdengine_datafile *old_datafile = datafile;
  568. // only 1 datafile creation at a time
  569. static netdata_mutex_t mutex = NETDATA_MUTEX_INITIALIZER;
  570. netdata_mutex_lock(&mutex);
  571. // take the latest datafile again - without this, multiple threads may create multiple files
  572. uv_rwlock_rdlock(&ctx->datafiles.rwlock);
  573. datafile = ctx->datafiles.first->prev;
  574. uv_rwlock_rdunlock(&ctx->datafiles.rwlock);
  575. if(datafile_is_full(ctx, datafile) && create_new_datafile_pair(ctx) == 0)
  576. rrdeng_enq_cmd(ctx, RRDENG_OPCODE_JOURNAL_INDEX, datafile, NULL, STORAGE_PRIORITY_INTERNAL_DBENGINE, NULL,
  577. NULL);
  578. netdata_mutex_unlock(&mutex);
  579. // get the new latest datafile again, like above
  580. uv_rwlock_rdlock(&ctx->datafiles.rwlock);
  581. datafile = ctx->datafiles.first->prev;
  582. // become a writer on this datafile, to prevent it from vanishing
  583. netdata_spinlock_lock(&datafile->writers.spinlock);
  584. datafile->writers.running++;
  585. netdata_spinlock_unlock(&datafile->writers.spinlock);
  586. uv_rwlock_rdunlock(&ctx->datafiles.rwlock);
  587. // release the writers on the old datafile
  588. netdata_spinlock_lock(&old_datafile->writers.spinlock);
  589. old_datafile->writers.running--;
  590. netdata_spinlock_unlock(&old_datafile->writers.spinlock);
  591. }
  592. return datafile;
  593. }
  594. /*
  595. * Take a page list in a judy array and write them
  596. */
  597. static struct extent_io_descriptor *datafile_extent_build(struct rrdengine_instance *ctx, struct page_descr_with_data *base, struct completion *completion) {
  598. int ret;
  599. int compressed_size, max_compressed_size = 0;
  600. unsigned i, count, size_bytes, pos, real_io_size;
  601. uint32_t uncompressed_payload_length, payload_offset;
  602. struct page_descr_with_data *descr, *eligible_pages[MAX_PAGES_PER_EXTENT];
  603. struct extent_io_descriptor *xt_io_descr;
  604. struct extent_buffer *eb = NULL;
  605. void *compressed_buf = NULL;
  606. Word_t Index;
  607. uint8_t compression_algorithm = ctx->config.global_compress_alg;
  608. struct rrdengine_datafile *datafile;
  609. /* persistent structures */
  610. struct rrdeng_df_extent_header *header;
  611. struct rrdeng_df_extent_trailer *trailer;
  612. uLong crc;
  613. for(descr = base, Index = 0, count = 0, uncompressed_payload_length = 0;
  614. descr && count != rrdeng_pages_per_extent;
  615. descr = descr->link.next, Index++) {
  616. uncompressed_payload_length += descr->page_length;
  617. eligible_pages[count++] = descr;
  618. }
  619. if (!count) {
  620. if (completion)
  621. completion_mark_complete(completion);
  622. __atomic_sub_fetch(&ctx->atomic.extents_currently_being_flushed, 1, __ATOMIC_RELAXED);
  623. return NULL;
  624. }
  625. xt_io_descr = extent_io_descriptor_get();
  626. xt_io_descr->ctx = ctx;
  627. payload_offset = sizeof(*header) + count * sizeof(header->descr[0]);
  628. switch (compression_algorithm) {
  629. case RRD_NO_COMPRESSION:
  630. size_bytes = payload_offset + uncompressed_payload_length + sizeof(*trailer);
  631. break;
  632. default: /* Compress */
  633. fatal_assert(uncompressed_payload_length < LZ4_MAX_INPUT_SIZE);
  634. max_compressed_size = LZ4_compressBound(uncompressed_payload_length);
  635. eb = extent_buffer_get(max_compressed_size);
  636. compressed_buf = eb->data;
  637. size_bytes = payload_offset + MAX(uncompressed_payload_length, (unsigned)max_compressed_size) + sizeof(*trailer);
  638. break;
  639. }
  640. ret = posix_memalign((void *)&xt_io_descr->buf, RRDFILE_ALIGNMENT, ALIGN_BYTES_CEILING(size_bytes));
  641. if (unlikely(ret)) {
  642. fatal("DBENGINE: posix_memalign:%s", strerror(ret));
  643. /* freez(xt_io_descr);*/
  644. }
  645. memset(xt_io_descr->buf, 0, ALIGN_BYTES_CEILING(size_bytes));
  646. (void) memcpy(xt_io_descr->descr_array, eligible_pages, sizeof(struct page_descr_with_data *) * count);
  647. xt_io_descr->descr_count = count;
  648. pos = 0;
  649. header = xt_io_descr->buf;
  650. header->compression_algorithm = compression_algorithm;
  651. header->number_of_pages = count;
  652. pos += sizeof(*header);
  653. for (i = 0 ; i < count ; ++i) {
  654. descr = xt_io_descr->descr_array[i];
  655. header->descr[i].type = descr->type;
  656. uuid_copy(*(uuid_t *)header->descr[i].uuid, *descr->id);
  657. header->descr[i].page_length = descr->page_length;
  658. header->descr[i].start_time_ut = descr->start_time_ut;
  659. header->descr[i].end_time_ut = descr->end_time_ut;
  660. pos += sizeof(header->descr[i]);
  661. }
  662. for (i = 0 ; i < count ; ++i) {
  663. descr = xt_io_descr->descr_array[i];
  664. (void) memcpy(xt_io_descr->buf + pos, descr->page, descr->page_length);
  665. pos += descr->page_length;
  666. }
  667. if(likely(compression_algorithm == RRD_LZ4)) {
  668. compressed_size = LZ4_compress_default(
  669. xt_io_descr->buf + payload_offset,
  670. compressed_buf,
  671. (int)uncompressed_payload_length,
  672. max_compressed_size);
  673. __atomic_add_fetch(&ctx->stats.before_compress_bytes, uncompressed_payload_length, __ATOMIC_RELAXED);
  674. __atomic_add_fetch(&ctx->stats.after_compress_bytes, compressed_size, __ATOMIC_RELAXED);
  675. (void) memcpy(xt_io_descr->buf + payload_offset, compressed_buf, compressed_size);
  676. extent_buffer_release(eb);
  677. size_bytes = payload_offset + compressed_size + sizeof(*trailer);
  678. header->payload_length = compressed_size;
  679. }
  680. else { // RRD_NO_COMPRESSION
  681. header->payload_length = uncompressed_payload_length;
  682. }
  683. real_io_size = ALIGN_BYTES_CEILING(size_bytes);
  684. datafile = get_datafile_to_write_extent(ctx);
  685. netdata_spinlock_lock(&datafile->writers.spinlock);
  686. xt_io_descr->datafile = datafile;
  687. xt_io_descr->pos = datafile->pos;
  688. datafile->pos += real_io_size;
  689. netdata_spinlock_unlock(&datafile->writers.spinlock);
  690. xt_io_descr->bytes = size_bytes;
  691. xt_io_descr->uv_fs_request.data = xt_io_descr;
  692. xt_io_descr->completion = completion;
  693. trailer = xt_io_descr->buf + size_bytes - sizeof(*trailer);
  694. crc = crc32(0L, Z_NULL, 0);
  695. crc = crc32(crc, xt_io_descr->buf, size_bytes - sizeof(*trailer));
  696. crc32set(trailer->checksum, crc);
  697. xt_io_descr->iov = uv_buf_init((void *)xt_io_descr->buf, real_io_size);
  698. journalfile_extent_build(ctx, xt_io_descr);
  699. ctx_last_flush_fileno_set(ctx, datafile->fileno);
  700. ctx_current_disk_space_increase(ctx, real_io_size);
  701. ctx_io_write_op_bytes(ctx, real_io_size);
  702. return xt_io_descr;
  703. }
  704. static void after_extent_write(struct rrdengine_instance *ctx __maybe_unused, void *data __maybe_unused, struct completion *completion __maybe_unused, uv_work_t* uv_work_req __maybe_unused, int status __maybe_unused) {
  705. struct extent_io_descriptor *xt_io_descr = data;
  706. if(xt_io_descr) {
  707. int ret = uv_fs_write(&rrdeng_main.loop,
  708. &xt_io_descr->uv_fs_request,
  709. xt_io_descr->datafile->file,
  710. &xt_io_descr->iov,
  711. 1,
  712. (int64_t) xt_io_descr->pos,
  713. after_extent_write_datafile_io);
  714. fatal_assert(-1 != ret);
  715. }
  716. }
  717. static void *extent_write_tp_worker(struct rrdengine_instance *ctx __maybe_unused, void *data __maybe_unused, struct completion *completion __maybe_unused, uv_work_t *uv_work_req __maybe_unused) {
  718. worker_is_busy(UV_EVENT_DBENGINE_EXTENT_WRITE);
  719. struct page_descr_with_data *base = data;
  720. struct extent_io_descriptor *xt_io_descr = datafile_extent_build(ctx, base, completion);
  721. return xt_io_descr;
  722. }
  723. static void after_database_rotate(struct rrdengine_instance *ctx __maybe_unused, void *data __maybe_unused, struct completion *completion __maybe_unused, uv_work_t* req __maybe_unused, int status __maybe_unused) {
  724. __atomic_store_n(&ctx->atomic.now_deleting_files, false, __ATOMIC_RELAXED);
  725. }
  726. struct uuid_first_time_s {
  727. uuid_t *uuid;
  728. time_t first_time_s;
  729. METRIC *metric;
  730. size_t pages_found;
  731. size_t df_matched;
  732. size_t df_index_oldest;
  733. };
  734. static int journal_metric_compare(const void *key, const void *metric)
  735. {
  736. return uuid_compare(*(uuid_t *) key, ((struct journal_metric_list *) metric)->uuid);
  737. }
  738. struct rrdengine_datafile *datafile_release_and_acquire_next_for_retention(struct rrdengine_instance *ctx, struct rrdengine_datafile *datafile) {
  739. uv_rwlock_rdlock(&ctx->datafiles.rwlock);
  740. struct rrdengine_datafile *next_datafile = datafile->next;
  741. while(next_datafile && !datafile_acquire(next_datafile, DATAFILE_ACQUIRE_RETENTION))
  742. next_datafile = next_datafile->next;
  743. uv_rwlock_rdunlock(&ctx->datafiles.rwlock);
  744. datafile_release(datafile, DATAFILE_ACQUIRE_RETENTION);
  745. return next_datafile;
  746. }
  747. void find_uuid_first_time(
  748. struct rrdengine_instance *ctx,
  749. struct rrdengine_datafile *datafile,
  750. struct uuid_first_time_s *uuid_first_entry_list,
  751. size_t count)
  752. {
  753. // acquire the datafile to work with it
  754. uv_rwlock_rdlock(&ctx->datafiles.rwlock);
  755. while(datafile && !datafile_acquire(datafile, DATAFILE_ACQUIRE_RETENTION))
  756. datafile = datafile->next;
  757. uv_rwlock_rdunlock(&ctx->datafiles.rwlock);
  758. if (unlikely(!datafile))
  759. return;
  760. unsigned journalfile_count = 0;
  761. size_t binary_match = 0;
  762. size_t not_matching_bsearches = 0;
  763. while (datafile) {
  764. struct journal_v2_header *j2_header = journalfile_v2_data_acquire(datafile->journalfile, NULL, 0, 0);
  765. if (!j2_header) {
  766. datafile = datafile_release_and_acquire_next_for_retention(ctx, datafile);
  767. continue;
  768. }
  769. time_t journal_start_time_s = (time_t) (j2_header->start_time_ut / USEC_PER_SEC);
  770. struct journal_metric_list *uuid_list = (struct journal_metric_list *)((uint8_t *) j2_header + j2_header->metric_offset);
  771. struct uuid_first_time_s *uuid_original_entry;
  772. size_t journal_metric_count = j2_header->metric_count;
  773. for (size_t index = 0; index < count; ++index) {
  774. uuid_original_entry = &uuid_first_entry_list[index];
  775. // Check here if we should skip this
  776. if (uuid_original_entry->df_matched > 3 || uuid_original_entry->pages_found > 5)
  777. continue;
  778. struct journal_metric_list *live_entry = bsearch(uuid_original_entry->uuid,uuid_list,journal_metric_count,sizeof(*uuid_list), journal_metric_compare);
  779. if (!live_entry) {
  780. // Not found in this journal
  781. not_matching_bsearches++;
  782. continue;
  783. }
  784. uuid_original_entry->pages_found += live_entry->entries;
  785. uuid_original_entry->df_matched++;
  786. time_t old_first_time_s = uuid_original_entry->first_time_s;
  787. // Calculate first / last for this match
  788. time_t first_time_s = live_entry->delta_start_s + journal_start_time_s;
  789. uuid_original_entry->first_time_s = MIN(uuid_original_entry->first_time_s, first_time_s);
  790. if (uuid_original_entry->first_time_s != old_first_time_s)
  791. uuid_original_entry->df_index_oldest = uuid_original_entry->df_matched;
  792. binary_match++;
  793. }
  794. journalfile_count++;
  795. journalfile_v2_data_release(datafile->journalfile);
  796. datafile = datafile_release_and_acquire_next_for_retention(ctx, datafile);
  797. }
  798. // Let's scan the open cache for almost exact match
  799. size_t open_cache_count = 0;
  800. size_t df_index[10] = { 0 };
  801. size_t without_metric = 0;
  802. size_t open_cache_gave_first_time_s = 0;
  803. size_t metric_count = 0;
  804. size_t without_retention = 0;
  805. size_t not_needed_bsearches = 0;
  806. for (size_t index = 0; index < count; ++index) {
  807. struct uuid_first_time_s *uuid_first_t_entry = &uuid_first_entry_list[index];
  808. metric_count++;
  809. size_t idx = uuid_first_t_entry->df_index_oldest;
  810. if(idx >= 10)
  811. idx = 9;
  812. df_index[idx]++;
  813. not_needed_bsearches += uuid_first_t_entry->df_matched - uuid_first_t_entry->df_index_oldest;
  814. if (unlikely(!uuid_first_t_entry->metric)) {
  815. without_metric++;
  816. continue;
  817. }
  818. PGC_PAGE *page = pgc_page_get_and_acquire(
  819. open_cache, (Word_t)ctx,
  820. (Word_t)uuid_first_t_entry->metric, 0,
  821. PGC_SEARCH_FIRST);
  822. if (page) {
  823. time_t old_first_time_s = uuid_first_t_entry->first_time_s;
  824. time_t first_time_s = pgc_page_start_time_s(page);
  825. uuid_first_t_entry->first_time_s = MIN(uuid_first_t_entry->first_time_s, first_time_s);
  826. pgc_page_release(open_cache, page);
  827. open_cache_count++;
  828. if(uuid_first_t_entry->first_time_s != old_first_time_s) {
  829. open_cache_gave_first_time_s++;
  830. }
  831. }
  832. else {
  833. if(!uuid_first_t_entry->df_index_oldest)
  834. without_retention++;
  835. }
  836. }
  837. internal_error(true,
  838. "DBENGINE: analyzed the retention of %zu rotated metrics of tier %d, "
  839. "did %zu jv2 matching binary searches (%zu not matching, %zu overflown) in %u journal files, "
  840. "%zu metrics with entries in open cache, "
  841. "metrics first time found per datafile index ([not in jv2]:%zu, [1]:%zu, [2]:%zu, [3]:%zu, [4]:%zu, [5]:%zu, [6]:%zu, [7]:%zu, [8]:%zu, [bigger]: %zu), "
  842. "open cache found first time %zu, "
  843. "metrics without any remaining retention %zu, "
  844. "metrics not in MRG %zu",
  845. metric_count,
  846. ctx->config.tier,
  847. binary_match,
  848. not_matching_bsearches,
  849. not_needed_bsearches,
  850. journalfile_count,
  851. open_cache_count,
  852. df_index[0], df_index[1], df_index[2], df_index[3], df_index[4], df_index[5], df_index[6], df_index[7], df_index[8], df_index[9],
  853. open_cache_gave_first_time_s,
  854. without_retention,
  855. without_metric
  856. );
  857. }
  858. static void update_metrics_first_time_s(struct rrdengine_instance *ctx, struct rrdengine_datafile *datafile_to_delete, struct rrdengine_datafile *first_datafile_remaining, bool worker) {
  859. __atomic_add_fetch(&rrdeng_cache_efficiency_stats.metrics_retention_started, 1, __ATOMIC_RELAXED);
  860. if(worker)
  861. worker_is_busy(UV_EVENT_DBENGINE_FIND_ROTATED_METRICS);
  862. struct rrdengine_journalfile *journalfile = datafile_to_delete->journalfile;
  863. struct journal_v2_header *j2_header = journalfile_v2_data_acquire(journalfile, NULL, 0, 0);
  864. struct journal_metric_list *uuid_list = (struct journal_metric_list *)((uint8_t *) j2_header + j2_header->metric_offset);
  865. size_t count = j2_header->metric_count;
  866. struct uuid_first_time_s *uuid_first_t_entry;
  867. struct uuid_first_time_s *uuid_first_entry_list = callocz(count, sizeof(struct uuid_first_time_s));
  868. size_t added = 0;
  869. for (size_t index = 0; index < count; ++index) {
  870. METRIC *metric = mrg_metric_get_and_acquire(main_mrg, &uuid_list[index].uuid, (Word_t) ctx);
  871. if (!metric)
  872. continue;
  873. uuid_first_entry_list[added].metric = metric;
  874. uuid_first_entry_list[added].first_time_s = LONG_MAX;
  875. uuid_first_entry_list[added].df_matched = 0;
  876. uuid_first_entry_list[added].df_index_oldest = 0;
  877. uuid_first_entry_list[added].uuid = mrg_metric_uuid(main_mrg, metric);
  878. added++;
  879. }
  880. info("DBENGINE: recalculating tier %d retention for %zu metrics starting with datafile %u",
  881. ctx->config.tier, count, first_datafile_remaining->fileno);
  882. journalfile_v2_data_release(journalfile);
  883. // Update the first time / last time for all metrics we plan to delete
  884. if(worker)
  885. worker_is_busy(UV_EVENT_DBENGINE_FIND_REMAINING_RETENTION);
  886. find_uuid_first_time(ctx, first_datafile_remaining, uuid_first_entry_list, added);
  887. if(worker)
  888. worker_is_busy(UV_EVENT_DBENGINE_POPULATE_MRG);
  889. info("DBENGINE: updating tier %d metrics registry retention for %zu metrics",
  890. ctx->config.tier, added);
  891. size_t deleted_metrics = 0, zero_retention_referenced = 0, zero_disk_retention = 0, zero_disk_but_live = 0;
  892. for (size_t index = 0; index < added; ++index) {
  893. uuid_first_t_entry = &uuid_first_entry_list[index];
  894. if (likely(uuid_first_t_entry->first_time_s != LONG_MAX)) {
  895. mrg_metric_set_first_time_s_if_bigger(main_mrg, uuid_first_t_entry->metric, uuid_first_t_entry->first_time_s);
  896. mrg_metric_release(main_mrg, uuid_first_t_entry->metric);
  897. }
  898. else {
  899. zero_disk_retention++;
  900. // there is no retention for this metric
  901. bool has_retention = mrg_metric_zero_disk_retention(main_mrg, uuid_first_t_entry->metric);
  902. if (!has_retention) {
  903. bool deleted = mrg_metric_release_and_delete(main_mrg, uuid_first_t_entry->metric);
  904. if(deleted)
  905. deleted_metrics++;
  906. else
  907. zero_retention_referenced++;
  908. }
  909. else {
  910. zero_disk_but_live++;
  911. mrg_metric_release(main_mrg, uuid_first_t_entry->metric);
  912. }
  913. }
  914. }
  915. freez(uuid_first_entry_list);
  916. internal_error(zero_disk_retention,
  917. "DBENGINE: deleted %zu metrics, zero retention but referenced %zu (out of %zu total, of which %zu have main cache retention) zero on-disk retention tier %d metrics from metrics registry",
  918. deleted_metrics, zero_retention_referenced, zero_disk_retention, zero_disk_but_live, ctx->config.tier);
  919. if(worker)
  920. worker_is_idle();
  921. }
  922. void datafile_delete(struct rrdengine_instance *ctx, struct rrdengine_datafile *datafile, bool update_retention, bool worker) {
  923. if(worker)
  924. worker_is_busy(UV_EVENT_DBENGINE_DATAFILE_DELETE_WAIT);
  925. bool datafile_got_for_deletion = datafile_acquire_for_deletion(datafile);
  926. if (update_retention)
  927. update_metrics_first_time_s(ctx, datafile, datafile->next, worker);
  928. while (!datafile_got_for_deletion) {
  929. if(worker)
  930. worker_is_busy(UV_EVENT_DBENGINE_DATAFILE_DELETE_WAIT);
  931. datafile_got_for_deletion = datafile_acquire_for_deletion(datafile);
  932. if (!datafile_got_for_deletion) {
  933. info("DBENGINE: waiting for data file '%s/"
  934. DATAFILE_PREFIX RRDENG_FILE_NUMBER_PRINT_TMPL DATAFILE_EXTENSION
  935. "' to be available for deletion, "
  936. "it is in use currently by %u users.",
  937. ctx->config.dbfiles_path, ctx->datafiles.first->tier, ctx->datafiles.first->fileno, datafile->users.lockers);
  938. __atomic_add_fetch(&rrdeng_cache_efficiency_stats.datafile_deletion_spin, 1, __ATOMIC_RELAXED);
  939. sleep_usec(1 * USEC_PER_SEC);
  940. }
  941. }
  942. __atomic_add_fetch(&rrdeng_cache_efficiency_stats.datafile_deletion_started, 1, __ATOMIC_RELAXED);
  943. info("DBENGINE: deleting data file '%s/"
  944. DATAFILE_PREFIX RRDENG_FILE_NUMBER_PRINT_TMPL DATAFILE_EXTENSION
  945. "'.",
  946. ctx->config.dbfiles_path, ctx->datafiles.first->tier, ctx->datafiles.first->fileno);
  947. if(worker)
  948. worker_is_busy(UV_EVENT_DBENGINE_DATAFILE_DELETE);
  949. struct rrdengine_journalfile *journal_file;
  950. unsigned deleted_bytes, journal_file_bytes, datafile_bytes;
  951. int ret;
  952. char path[RRDENG_PATH_MAX];
  953. uv_rwlock_wrlock(&ctx->datafiles.rwlock);
  954. datafile_list_delete_unsafe(ctx, datafile);
  955. uv_rwlock_wrunlock(&ctx->datafiles.rwlock);
  956. journal_file = datafile->journalfile;
  957. datafile_bytes = datafile->pos;
  958. journal_file_bytes = journalfile_current_size(journal_file);
  959. deleted_bytes = journalfile_v2_data_size_get(journal_file);
  960. info("DBENGINE: deleting data and journal files to maintain disk quota");
  961. ret = journalfile_destroy_unsafe(journal_file, datafile);
  962. if (!ret) {
  963. journalfile_v1_generate_path(datafile, path, sizeof(path));
  964. info("DBENGINE: deleted journal file \"%s\".", path);
  965. journalfile_v2_generate_path(datafile, path, sizeof(path));
  966. info("DBENGINE: deleted journal file \"%s\".", path);
  967. deleted_bytes += journal_file_bytes;
  968. }
  969. ret = destroy_data_file_unsafe(datafile);
  970. if (!ret) {
  971. generate_datafilepath(datafile, path, sizeof(path));
  972. info("DBENGINE: deleted data file \"%s\".", path);
  973. deleted_bytes += datafile_bytes;
  974. }
  975. freez(journal_file);
  976. freez(datafile);
  977. ctx_current_disk_space_decrease(ctx, deleted_bytes);
  978. info("DBENGINE: reclaimed %u bytes of disk space.", deleted_bytes);
  979. }
  980. static void *database_rotate_tp_worker(struct rrdengine_instance *ctx __maybe_unused, void *data __maybe_unused, struct completion *completion __maybe_unused, uv_work_t *uv_work_req __maybe_unused) {
  981. datafile_delete(ctx, ctx->datafiles.first, ctx_is_available_for_queries(ctx), true);
  982. if (rrdeng_ctx_exceeded_disk_quota(ctx))
  983. rrdeng_enq_cmd(ctx, RRDENG_OPCODE_DATABASE_ROTATE, NULL, NULL, STORAGE_PRIORITY_INTERNAL_DBENGINE, NULL, NULL);
  984. rrdcontext_db_rotation();
  985. return data;
  986. }
  987. static void after_flush_all_hot_and_dirty_pages_of_section(struct rrdengine_instance *ctx __maybe_unused, void *data __maybe_unused, struct completion *completion __maybe_unused, uv_work_t* req __maybe_unused, int status __maybe_unused) {
  988. ;
  989. }
  990. static void *flush_all_hot_and_dirty_pages_of_section_tp_worker(struct rrdengine_instance *ctx __maybe_unused, void *data __maybe_unused, struct completion *completion __maybe_unused, uv_work_t *uv_work_req __maybe_unused) {
  991. worker_is_busy(UV_EVENT_DBENGINE_QUIESCE);
  992. pgc_flush_all_hot_and_dirty_pages(main_cache, (Word_t)ctx);
  993. completion_mark_complete(&ctx->quiesce.completion);
  994. return data;
  995. }
  996. static void after_populate_mrg(struct rrdengine_instance *ctx __maybe_unused, void *data __maybe_unused, struct completion *completion __maybe_unused, uv_work_t* req __maybe_unused, int status __maybe_unused) {
  997. ;
  998. }
  999. static void *populate_mrg_tp_worker(struct rrdengine_instance *ctx __maybe_unused, void *data __maybe_unused, struct completion *completion __maybe_unused, uv_work_t *uv_work_req __maybe_unused) {
  1000. worker_is_busy(UV_EVENT_DBENGINE_POPULATE_MRG);
  1001. do {
  1002. struct rrdengine_datafile *datafile = NULL;
  1003. // find a datafile to work
  1004. uv_rwlock_rdlock(&ctx->datafiles.rwlock);
  1005. for(datafile = ctx->datafiles.first; datafile ; datafile = datafile->next) {
  1006. if(!netdata_spinlock_trylock(&datafile->populate_mrg.spinlock))
  1007. continue;
  1008. if(datafile->populate_mrg.populated) {
  1009. netdata_spinlock_unlock(&datafile->populate_mrg.spinlock);
  1010. continue;
  1011. }
  1012. // we have the spinlock and it is not populated
  1013. break;
  1014. }
  1015. uv_rwlock_rdunlock(&ctx->datafiles.rwlock);
  1016. if(!datafile)
  1017. break;
  1018. journalfile_v2_populate_retention_to_mrg(ctx, datafile->journalfile);
  1019. datafile->populate_mrg.populated = true;
  1020. netdata_spinlock_unlock(&datafile->populate_mrg.spinlock);
  1021. } while(1);
  1022. completion_mark_complete(completion);
  1023. return data;
  1024. }
  1025. static void after_ctx_shutdown(struct rrdengine_instance *ctx __maybe_unused, void *data __maybe_unused, struct completion *completion __maybe_unused, uv_work_t* req __maybe_unused, int status __maybe_unused) {
  1026. ;
  1027. }
  1028. static void *ctx_shutdown_tp_worker(struct rrdengine_instance *ctx __maybe_unused, void *data __maybe_unused, struct completion *completion __maybe_unused, uv_work_t *uv_work_req __maybe_unused) {
  1029. worker_is_busy(UV_EVENT_DBENGINE_SHUTDOWN);
  1030. completion_wait_for(&ctx->quiesce.completion);
  1031. completion_destroy(&ctx->quiesce.completion);
  1032. bool logged = false;
  1033. while(__atomic_load_n(&ctx->atomic.extents_currently_being_flushed, __ATOMIC_RELAXED) ||
  1034. __atomic_load_n(&ctx->atomic.inflight_queries, __ATOMIC_RELAXED)) {
  1035. if(!logged) {
  1036. logged = true;
  1037. info("DBENGINE: waiting for %zu inflight queries to finish to shutdown tier %d...",
  1038. __atomic_load_n(&ctx->atomic.inflight_queries, __ATOMIC_RELAXED),
  1039. (ctx->config.legacy) ? -1 : ctx->config.tier);
  1040. }
  1041. sleep_usec(1 * USEC_PER_MS);
  1042. }
  1043. completion_mark_complete(completion);
  1044. return data;
  1045. }
  1046. static void *cache_flush_tp_worker(struct rrdengine_instance *ctx __maybe_unused, void *data __maybe_unused, struct completion *completion __maybe_unused, uv_work_t *uv_work_req __maybe_unused) {
  1047. if (!main_cache)
  1048. return data;
  1049. worker_is_busy(UV_EVENT_DBENGINE_FLUSH_MAIN_CACHE);
  1050. pgc_flush_pages(main_cache, 0);
  1051. return data;
  1052. }
  1053. static void *cache_evict_tp_worker(struct rrdengine_instance *ctx __maybe_unused, void *data __maybe_unused, struct completion *completion __maybe_unused, uv_work_t *req __maybe_unused) {
  1054. if (!main_cache)
  1055. return data;
  1056. worker_is_busy(UV_EVENT_DBENGINE_EVICT_MAIN_CACHE);
  1057. pgc_evict_pages(main_cache, 0, 0);
  1058. return data;
  1059. }
  1060. static void after_prep_query(struct rrdengine_instance *ctx __maybe_unused, void *data __maybe_unused, struct completion *completion __maybe_unused, uv_work_t* req __maybe_unused, int status __maybe_unused) {
  1061. ;
  1062. }
  1063. static void *query_prep_tp_worker(struct rrdengine_instance *ctx __maybe_unused, void *data __maybe_unused, struct completion *completion __maybe_unused, uv_work_t *req __maybe_unused) {
  1064. worker_is_busy(UV_EVENT_DBENGINE_QUERY);
  1065. PDC *pdc = data;
  1066. rrdeng_prep_query(pdc);
  1067. return data;
  1068. }
  1069. unsigned rrdeng_target_data_file_size(struct rrdengine_instance *ctx) {
  1070. unsigned target_size = ctx->config.max_disk_space / TARGET_DATAFILES;
  1071. target_size = MIN(target_size, MAX_DATAFILE_SIZE);
  1072. target_size = MAX(target_size, MIN_DATAFILE_SIZE);
  1073. return target_size;
  1074. }
  1075. bool rrdeng_ctx_exceeded_disk_quota(struct rrdengine_instance *ctx)
  1076. {
  1077. uint64_t estimated_disk_space = ctx_current_disk_space_get(ctx) + rrdeng_target_data_file_size(ctx) -
  1078. (ctx->datafiles.first->prev ? ctx->datafiles.first->prev->pos : 0);
  1079. return estimated_disk_space > ctx->config.max_disk_space;
  1080. }
  1081. /* return 0 on success */
  1082. int init_rrd_files(struct rrdengine_instance *ctx)
  1083. {
  1084. return init_data_files(ctx);
  1085. }
  1086. void finalize_rrd_files(struct rrdengine_instance *ctx)
  1087. {
  1088. return finalize_data_files(ctx);
  1089. }
  1090. void async_cb(uv_async_t *handle)
  1091. {
  1092. uv_stop(handle->loop);
  1093. uv_update_time(handle->loop);
  1094. debug(D_RRDENGINE, "%s called, active=%d.", __func__, uv_is_active((uv_handle_t *)handle));
  1095. }
  1096. #define TIMER_PERIOD_MS (1000)
  1097. static void *extent_read_tp_worker(struct rrdengine_instance *ctx __maybe_unused, void *data __maybe_unused, struct completion *completion __maybe_unused, uv_work_t *uv_work_req __maybe_unused) {
  1098. EPDL *epdl = data;
  1099. epdl_find_extent_and_populate_pages(ctx, epdl, true);
  1100. return data;
  1101. }
  1102. static void epdl_populate_pages_asynchronously(struct rrdengine_instance *ctx, EPDL *epdl, STORAGE_PRIORITY priority) {
  1103. rrdeng_enq_cmd(ctx, RRDENG_OPCODE_EXTENT_READ, epdl, NULL, priority,
  1104. rrdeng_enqueue_epdl_cmd, rrdeng_dequeue_epdl_cmd);
  1105. }
  1106. void pdc_route_asynchronously(struct rrdengine_instance *ctx, struct page_details_control *pdc) {
  1107. pdc_to_epdl_router(ctx, pdc, epdl_populate_pages_asynchronously, epdl_populate_pages_asynchronously);
  1108. }
  1109. void epdl_populate_pages_synchronously(struct rrdengine_instance *ctx, EPDL *epdl, enum storage_priority priority __maybe_unused) {
  1110. epdl_find_extent_and_populate_pages(ctx, epdl, false);
  1111. }
  1112. void pdc_route_synchronously(struct rrdengine_instance *ctx, struct page_details_control *pdc) {
  1113. pdc_to_epdl_router(ctx, pdc, epdl_populate_pages_synchronously, epdl_populate_pages_synchronously);
  1114. }
  1115. #define MAX_RETRIES_TO_START_INDEX (100)
  1116. static void *journal_v2_indexing_tp_worker(struct rrdengine_instance *ctx __maybe_unused, void *data __maybe_unused, struct completion *completion __maybe_unused, uv_work_t *uv_work_req __maybe_unused) {
  1117. unsigned count = 0;
  1118. worker_is_busy(UV_EVENT_DBENGINE_JOURNAL_INDEX_WAIT);
  1119. while (__atomic_load_n(&ctx->atomic.now_deleting_files, __ATOMIC_RELAXED) && count++ < MAX_RETRIES_TO_START_INDEX)
  1120. sleep_usec(100 * USEC_PER_MS);
  1121. if (count == MAX_RETRIES_TO_START_INDEX) {
  1122. worker_is_idle();
  1123. return data;
  1124. }
  1125. struct rrdengine_datafile *datafile = ctx->datafiles.first;
  1126. worker_is_busy(UV_EVENT_DBENGINE_JOURNAL_INDEX);
  1127. count = 0;
  1128. while (datafile && datafile->fileno != ctx_last_fileno_get(ctx) && datafile->fileno != ctx_last_flush_fileno_get(ctx)) {
  1129. netdata_spinlock_lock(&datafile->writers.spinlock);
  1130. bool available = (datafile->writers.running || datafile->writers.flushed_to_open_running) ? false : true;
  1131. netdata_spinlock_unlock(&datafile->writers.spinlock);
  1132. if(!available)
  1133. continue;
  1134. if (unlikely(!journalfile_v2_data_available(datafile->journalfile))) {
  1135. info("DBENGINE: journal file %u is ready to be indexed", datafile->fileno);
  1136. pgc_open_cache_to_journal_v2(open_cache, (Word_t) ctx, (int) datafile->fileno, ctx->config.page_type,
  1137. journalfile_migrate_to_v2_callback, (void *) datafile->journalfile);
  1138. count++;
  1139. }
  1140. datafile = datafile->next;
  1141. if (unlikely(!ctx_is_available_for_queries(ctx)))
  1142. break;
  1143. }
  1144. errno = 0;
  1145. internal_error(count, "DBENGINE: journal indexing done; %u files processed", count);
  1146. worker_is_idle();
  1147. return data;
  1148. }
  1149. static void after_do_cache_flush(struct rrdengine_instance *ctx __maybe_unused, void *data __maybe_unused, struct completion *completion __maybe_unused, uv_work_t* req __maybe_unused, int status __maybe_unused) {
  1150. rrdeng_main.flushes_running--;
  1151. }
  1152. static void after_do_cache_evict(struct rrdengine_instance *ctx __maybe_unused, void *data __maybe_unused, struct completion *completion __maybe_unused, uv_work_t* req __maybe_unused, int status __maybe_unused) {
  1153. rrdeng_main.evictions_running--;
  1154. }
  1155. static void after_extent_read(struct rrdengine_instance *ctx __maybe_unused, void *data __maybe_unused, struct completion *completion __maybe_unused, uv_work_t* req __maybe_unused, int status __maybe_unused) {
  1156. ;
  1157. }
  1158. static void after_journal_v2_indexing(struct rrdengine_instance *ctx __maybe_unused, void *data __maybe_unused, struct completion *completion __maybe_unused, uv_work_t* req __maybe_unused, int status __maybe_unused) {
  1159. __atomic_store_n(&ctx->atomic.migration_to_v2_running, false, __ATOMIC_RELAXED);
  1160. rrdeng_enq_cmd(ctx, RRDENG_OPCODE_DATABASE_ROTATE, NULL, NULL, STORAGE_PRIORITY_INTERNAL_DBENGINE, NULL, NULL);
  1161. }
  1162. struct rrdeng_buffer_sizes rrdeng_get_buffer_sizes(void) {
  1163. return (struct rrdeng_buffer_sizes) {
  1164. .pgc = pgc_aral_overhead() + pgc_aral_structures(),
  1165. .mrg = mrg_aral_overhead() + mrg_aral_structures(),
  1166. .opcodes = aral_overhead(rrdeng_main.cmd_queue.ar) + aral_structures(rrdeng_main.cmd_queue.ar),
  1167. .handles = aral_overhead(rrdeng_main.handles.ar) + aral_structures(rrdeng_main.handles.ar),
  1168. .descriptors = aral_overhead(rrdeng_main.descriptors.ar) + aral_structures(rrdeng_main.descriptors.ar),
  1169. .wal = __atomic_load_n(&wal_globals.atomics.allocated, __ATOMIC_RELAXED) * (sizeof(WAL) + RRDENG_BLOCK_SIZE),
  1170. .workers = aral_overhead(rrdeng_main.work_cmd.ar),
  1171. .pdc = pdc_cache_size(),
  1172. .xt_io = aral_overhead(rrdeng_main.xt_io_descr.ar) + aral_structures(rrdeng_main.xt_io_descr.ar),
  1173. .xt_buf = extent_buffer_cache_size(),
  1174. .epdl = epdl_cache_size(),
  1175. .deol = deol_cache_size(),
  1176. .pd = pd_cache_size(),
  1177. #ifdef PDC_USE_JULYL
  1178. .julyl = julyl_cache_size(),
  1179. #endif
  1180. };
  1181. }
  1182. static void after_cleanup(struct rrdengine_instance *ctx __maybe_unused, void *data __maybe_unused, struct completion *completion __maybe_unused, uv_work_t* req __maybe_unused, int status __maybe_unused) {
  1183. rrdeng_main.cleanup_running--;
  1184. }
  1185. static void *cleanup_tp_worker(struct rrdengine_instance *ctx __maybe_unused, void *data __maybe_unused, struct completion *completion __maybe_unused, uv_work_t *uv_work_req __maybe_unused) {
  1186. worker_is_busy(UV_EVENT_DBENGINE_BUFFERS_CLEANUP);
  1187. wal_cleanup1();
  1188. extent_buffer_cleanup1();
  1189. {
  1190. static time_t last_run_s = 0;
  1191. time_t now_s = now_monotonic_sec();
  1192. if(now_s - last_run_s >= 10) {
  1193. last_run_s = now_s;
  1194. journalfile_v2_data_unmount_cleanup(now_s);
  1195. }
  1196. }
  1197. #ifdef PDC_USE_JULYL
  1198. julyl_cleanup1();
  1199. #endif
  1200. return data;
  1201. }
  1202. void timer_cb(uv_timer_t* handle) {
  1203. worker_is_busy(RRDENG_TIMER_CB);
  1204. uv_stop(handle->loop);
  1205. uv_update_time(handle->loop);
  1206. worker_set_metric(RRDENG_OPCODES_WAITING, (NETDATA_DOUBLE)rrdeng_main.cmd_queue.unsafe.waiting);
  1207. worker_set_metric(RRDENG_WORKS_DISPATCHED, (NETDATA_DOUBLE)__atomic_load_n(&rrdeng_main.work_cmd.atomics.dispatched, __ATOMIC_RELAXED));
  1208. worker_set_metric(RRDENG_WORKS_EXECUTING, (NETDATA_DOUBLE)__atomic_load_n(&rrdeng_main.work_cmd.atomics.executing, __ATOMIC_RELAXED));
  1209. rrdeng_enq_cmd(NULL, RRDENG_OPCODE_FLUSH_INIT, NULL, NULL, STORAGE_PRIORITY_INTERNAL_DBENGINE, NULL, NULL);
  1210. rrdeng_enq_cmd(NULL, RRDENG_OPCODE_EVICT_INIT, NULL, NULL, STORAGE_PRIORITY_INTERNAL_DBENGINE, NULL, NULL);
  1211. rrdeng_enq_cmd(NULL, RRDENG_OPCODE_CLEANUP, NULL, NULL, STORAGE_PRIORITY_INTERNAL_DBENGINE, NULL, NULL);
  1212. worker_is_idle();
  1213. }
  1214. static void dbengine_initialize_structures(void) {
  1215. pgc_and_mrg_initialize();
  1216. pdc_init();
  1217. page_details_init();
  1218. epdl_init();
  1219. deol_init();
  1220. rrdeng_cmd_queue_init();
  1221. work_request_init();
  1222. rrdeng_query_handle_init();
  1223. page_descriptors_init();
  1224. extent_buffer_init();
  1225. dbengine_page_alloc_init();
  1226. extent_io_descriptor_init();
  1227. }
  1228. bool rrdeng_dbengine_spawn(struct rrdengine_instance *ctx __maybe_unused) {
  1229. static bool spawned = false;
  1230. static SPINLOCK spinlock = NETDATA_SPINLOCK_INITIALIZER;
  1231. netdata_spinlock_lock(&spinlock);
  1232. if(!spawned) {
  1233. int ret;
  1234. ret = uv_loop_init(&rrdeng_main.loop);
  1235. if (ret) {
  1236. error("DBENGINE: uv_loop_init(): %s", uv_strerror(ret));
  1237. return false;
  1238. }
  1239. rrdeng_main.loop.data = &rrdeng_main;
  1240. ret = uv_async_init(&rrdeng_main.loop, &rrdeng_main.async, async_cb);
  1241. if (ret) {
  1242. error("DBENGINE: uv_async_init(): %s", uv_strerror(ret));
  1243. fatal_assert(0 == uv_loop_close(&rrdeng_main.loop));
  1244. return false;
  1245. }
  1246. rrdeng_main.async.data = &rrdeng_main;
  1247. ret = uv_timer_init(&rrdeng_main.loop, &rrdeng_main.timer);
  1248. if (ret) {
  1249. error("DBENGINE: uv_timer_init(): %s", uv_strerror(ret));
  1250. uv_close((uv_handle_t *)&rrdeng_main.async, NULL);
  1251. fatal_assert(0 == uv_loop_close(&rrdeng_main.loop));
  1252. return false;
  1253. }
  1254. rrdeng_main.timer.data = &rrdeng_main;
  1255. dbengine_initialize_structures();
  1256. fatal_assert(0 == uv_thread_create(&rrdeng_main.thread, dbengine_event_loop, &rrdeng_main));
  1257. spawned = true;
  1258. }
  1259. netdata_spinlock_unlock(&spinlock);
  1260. return true;
  1261. }
  1262. void dbengine_event_loop(void* arg) {
  1263. sanity_check();
  1264. uv_thread_set_name_np(pthread_self(), "DBENGINE");
  1265. service_register(SERVICE_THREAD_TYPE_EVENT_LOOP, NULL, NULL, NULL, true);
  1266. worker_register("DBENGINE");
  1267. // opcode jobs
  1268. worker_register_job_name(RRDENG_OPCODE_NOOP, "noop");
  1269. worker_register_job_name(RRDENG_OPCODE_QUERY, "query");
  1270. worker_register_job_name(RRDENG_OPCODE_EXTENT_WRITE, "extent write");
  1271. worker_register_job_name(RRDENG_OPCODE_EXTENT_READ, "extent read");
  1272. worker_register_job_name(RRDENG_OPCODE_FLUSHED_TO_OPEN, "flushed to open");
  1273. worker_register_job_name(RRDENG_OPCODE_DATABASE_ROTATE, "db rotate");
  1274. worker_register_job_name(RRDENG_OPCODE_JOURNAL_INDEX, "journal index");
  1275. worker_register_job_name(RRDENG_OPCODE_FLUSH_INIT, "flush init");
  1276. worker_register_job_name(RRDENG_OPCODE_EVICT_INIT, "evict init");
  1277. worker_register_job_name(RRDENG_OPCODE_CTX_SHUTDOWN, "ctx shutdown");
  1278. worker_register_job_name(RRDENG_OPCODE_CTX_QUIESCE, "ctx quiesce");
  1279. worker_register_job_name(RRDENG_OPCODE_MAX, "get opcode");
  1280. worker_register_job_name(RRDENG_OPCODE_MAX + RRDENG_OPCODE_QUERY, "query cb");
  1281. worker_register_job_name(RRDENG_OPCODE_MAX + RRDENG_OPCODE_EXTENT_WRITE, "extent write cb");
  1282. worker_register_job_name(RRDENG_OPCODE_MAX + RRDENG_OPCODE_EXTENT_READ, "extent read cb");
  1283. worker_register_job_name(RRDENG_OPCODE_MAX + RRDENG_OPCODE_FLUSHED_TO_OPEN, "flushed to open cb");
  1284. worker_register_job_name(RRDENG_OPCODE_MAX + RRDENG_OPCODE_DATABASE_ROTATE, "db rotate cb");
  1285. worker_register_job_name(RRDENG_OPCODE_MAX + RRDENG_OPCODE_JOURNAL_INDEX, "journal index cb");
  1286. worker_register_job_name(RRDENG_OPCODE_MAX + RRDENG_OPCODE_FLUSH_INIT, "flush init cb");
  1287. worker_register_job_name(RRDENG_OPCODE_MAX + RRDENG_OPCODE_EVICT_INIT, "evict init cb");
  1288. worker_register_job_name(RRDENG_OPCODE_MAX + RRDENG_OPCODE_CTX_SHUTDOWN, "ctx shutdown cb");
  1289. worker_register_job_name(RRDENG_OPCODE_MAX + RRDENG_OPCODE_CTX_QUIESCE, "ctx quiesce cb");
  1290. // special jobs
  1291. worker_register_job_name(RRDENG_TIMER_CB, "timer");
  1292. worker_register_job_name(RRDENG_FLUSH_TRANSACTION_BUFFER_CB, "transaction buffer flush cb");
  1293. worker_register_job_custom_metric(RRDENG_OPCODES_WAITING, "opcodes waiting", "opcodes", WORKER_METRIC_ABSOLUTE);
  1294. worker_register_job_custom_metric(RRDENG_WORKS_DISPATCHED, "works dispatched", "works", WORKER_METRIC_ABSOLUTE);
  1295. worker_register_job_custom_metric(RRDENG_WORKS_EXECUTING, "works executing", "works", WORKER_METRIC_ABSOLUTE);
  1296. struct rrdeng_main *main = arg;
  1297. enum rrdeng_opcode opcode;
  1298. struct rrdeng_cmd cmd;
  1299. main->tid = gettid();
  1300. fatal_assert(0 == uv_timer_start(&main->timer, timer_cb, TIMER_PERIOD_MS, TIMER_PERIOD_MS));
  1301. bool shutdown = false;
  1302. while (likely(!shutdown)) {
  1303. worker_is_idle();
  1304. uv_run(&main->loop, UV_RUN_DEFAULT);
  1305. /* wait for commands */
  1306. do {
  1307. worker_is_busy(RRDENG_OPCODE_MAX);
  1308. cmd = rrdeng_deq_cmd();
  1309. opcode = cmd.opcode;
  1310. worker_is_busy(opcode);
  1311. switch (opcode) {
  1312. case RRDENG_OPCODE_EXTENT_READ: {
  1313. struct rrdengine_instance *ctx = cmd.ctx;
  1314. EPDL *epdl = cmd.data;
  1315. work_dispatch(ctx, epdl, NULL, opcode, extent_read_tp_worker, after_extent_read);
  1316. break;
  1317. }
  1318. case RRDENG_OPCODE_QUERY: {
  1319. struct rrdengine_instance *ctx = cmd.ctx;
  1320. PDC *pdc = cmd.data;
  1321. work_dispatch(ctx, pdc, NULL, opcode, query_prep_tp_worker, after_prep_query);
  1322. break;
  1323. }
  1324. case RRDENG_OPCODE_EXTENT_WRITE: {
  1325. struct rrdengine_instance *ctx = cmd.ctx;
  1326. struct page_descr_with_data *base = cmd.data;
  1327. struct completion *completion = cmd.completion; // optional
  1328. work_dispatch(ctx, base, completion, opcode, extent_write_tp_worker, after_extent_write);
  1329. break;
  1330. }
  1331. case RRDENG_OPCODE_FLUSHED_TO_OPEN: {
  1332. struct rrdengine_instance *ctx = cmd.ctx;
  1333. uv_fs_t *uv_fs_request = cmd.data;
  1334. struct extent_io_descriptor *xt_io_descr = uv_fs_request->data;
  1335. struct completion *completion = xt_io_descr->completion;
  1336. work_dispatch(ctx, uv_fs_request, completion, opcode, extent_flushed_to_open_tp_worker, after_extent_flushed_to_open);
  1337. break;
  1338. }
  1339. case RRDENG_OPCODE_FLUSH_INIT: {
  1340. if(rrdeng_main.flushes_running < (size_t)(libuv_worker_threads / 4)) {
  1341. rrdeng_main.flushes_running++;
  1342. work_dispatch(NULL, NULL, NULL, opcode, cache_flush_tp_worker, after_do_cache_flush);
  1343. }
  1344. break;
  1345. }
  1346. case RRDENG_OPCODE_EVICT_INIT: {
  1347. if(!rrdeng_main.evictions_running) {
  1348. rrdeng_main.evictions_running++;
  1349. work_dispatch(NULL, NULL, NULL, opcode, cache_evict_tp_worker, after_do_cache_evict);
  1350. }
  1351. break;
  1352. }
  1353. case RRDENG_OPCODE_CLEANUP: {
  1354. if(!rrdeng_main.cleanup_running) {
  1355. rrdeng_main.cleanup_running++;
  1356. work_dispatch(NULL, NULL, NULL, opcode, cleanup_tp_worker, after_cleanup);
  1357. }
  1358. break;
  1359. }
  1360. case RRDENG_OPCODE_JOURNAL_INDEX: {
  1361. struct rrdengine_instance *ctx = cmd.ctx;
  1362. struct rrdengine_datafile *datafile = cmd.data;
  1363. if(!__atomic_load_n(&ctx->atomic.migration_to_v2_running, __ATOMIC_RELAXED)) {
  1364. __atomic_store_n(&ctx->atomic.migration_to_v2_running, true, __ATOMIC_RELAXED);
  1365. work_dispatch(ctx, datafile, NULL, opcode, journal_v2_indexing_tp_worker, after_journal_v2_indexing);
  1366. }
  1367. break;
  1368. }
  1369. case RRDENG_OPCODE_DATABASE_ROTATE: {
  1370. struct rrdengine_instance *ctx = cmd.ctx;
  1371. if (!__atomic_load_n(&ctx->atomic.now_deleting_files, __ATOMIC_RELAXED) &&
  1372. ctx->datafiles.first->next != NULL &&
  1373. ctx->datafiles.first->next->next != NULL &&
  1374. rrdeng_ctx_exceeded_disk_quota(ctx)) {
  1375. __atomic_store_n(&ctx->atomic.now_deleting_files, true, __ATOMIC_RELAXED);
  1376. work_dispatch(ctx, NULL, NULL, opcode, database_rotate_tp_worker, after_database_rotate);
  1377. }
  1378. break;
  1379. }
  1380. case RRDENG_OPCODE_CTX_POPULATE_MRG: {
  1381. struct rrdengine_instance *ctx = cmd.ctx;
  1382. struct completion *completion = cmd.completion;
  1383. work_dispatch(ctx, NULL, completion, opcode, populate_mrg_tp_worker, after_populate_mrg);
  1384. break;
  1385. }
  1386. case RRDENG_OPCODE_CTX_QUIESCE: {
  1387. // a ctx will shutdown shortly
  1388. struct rrdengine_instance *ctx = cmd.ctx;
  1389. __atomic_store_n(&ctx->quiesce.enabled, true, __ATOMIC_RELEASE);
  1390. work_dispatch(ctx, NULL, NULL, opcode,
  1391. flush_all_hot_and_dirty_pages_of_section_tp_worker,
  1392. after_flush_all_hot_and_dirty_pages_of_section);
  1393. break;
  1394. }
  1395. case RRDENG_OPCODE_CTX_SHUTDOWN: {
  1396. // a ctx is shutting down
  1397. struct rrdengine_instance *ctx = cmd.ctx;
  1398. struct completion *completion = cmd.completion;
  1399. work_dispatch(ctx, NULL, completion, opcode, ctx_shutdown_tp_worker, after_ctx_shutdown);
  1400. break;
  1401. }
  1402. case RRDENG_OPCODE_NOOP: {
  1403. /* the command queue was empty, do nothing */
  1404. break;
  1405. }
  1406. // not opcodes
  1407. case RRDENG_OPCODE_MAX:
  1408. default: {
  1409. internal_fatal(true, "DBENGINE: unknown opcode");
  1410. break;
  1411. }
  1412. }
  1413. } while (opcode != RRDENG_OPCODE_NOOP);
  1414. }
  1415. /* cleanup operations of the event loop */
  1416. info("DBENGINE: shutting down dbengine thread");
  1417. /*
  1418. * uv_async_send after uv_close does not seem to crash in linux at the moment,
  1419. * it is however undocumented behaviour and we need to be aware if this becomes
  1420. * an issue in the future.
  1421. */
  1422. uv_close((uv_handle_t *)&main->async, NULL);
  1423. uv_timer_stop(&main->timer);
  1424. uv_close((uv_handle_t *)&main->timer, NULL);
  1425. uv_run(&main->loop, UV_RUN_DEFAULT);
  1426. uv_loop_close(&main->loop);
  1427. worker_unregister();
  1428. }