rrdengineapi.c 50 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737738739740741742743744745746747748749750751752753754755756757758759760761762763764765766767768769770771772773774775776777778779780781782783784785786787788789790791792793794795796797798799800801802803804805806807808809810811812813814815816817818819820821822823824825826827828829830831832833834835836837838839840841842843844845846847848849850851852853854855856857858859860861862863864865866867868869870871872873874875876877878879880881882883884885886887888889890891892893894895896897898899900901902903904905906907908909910911912913914915916917918919920921922923924925926927928929930931932933934935936937938939940941942943944945946947948949950951952953954955956957958959960961962963964965966967968969970971972973974975976977978979980981982983984985986987988989990991992993994995996997998999100010011002100310041005100610071008100910101011101210131014101510161017101810191020102110221023102410251026102710281029103010311032103310341035103610371038103910401041104210431044104510461047104810491050105110521053105410551056105710581059106010611062106310641065106610671068106910701071107210731074107510761077107810791080108110821083108410851086108710881089109010911092109310941095109610971098109911001101110211031104110511061107110811091110111111121113111411151116111711181119112011211122112311241125112611271128112911301131113211331134113511361137113811391140114111421143114411451146114711481149115011511152115311541155115611571158115911601161116211631164116511661167116811691170117111721173117411751176117711781179118011811182118311841185118611871188118911901191119211931194119511961197119811991200120112021203120412051206120712081209121012111212121312141215121612171218121912201221122212231224122512261227122812291230123112321233123412351236123712381239124012411242124312441245124612471248124912501251125212531254125512561257125812591260126112621263126412651266126712681269127012711272127312741275
  1. // SPDX-License-Identifier: GPL-3.0-or-later
  2. #include "rrdengine.h"
  3. #include "../storage_engine.h"
  4. /* Default global database instance */
  5. struct rrdengine_instance multidb_ctx_storage_tier0;
  6. struct rrdengine_instance multidb_ctx_storage_tier1;
  7. struct rrdengine_instance multidb_ctx_storage_tier2;
  8. struct rrdengine_instance multidb_ctx_storage_tier3;
  9. struct rrdengine_instance multidb_ctx_storage_tier4;
  10. #if RRD_STORAGE_TIERS != 5
  11. #error RRD_STORAGE_TIERS is not 5 - you need to add allocations here
  12. #endif
  13. struct rrdengine_instance *multidb_ctx[RRD_STORAGE_TIERS];
  14. uint8_t tier_page_type[RRD_STORAGE_TIERS] = {PAGE_METRICS, PAGE_TIER, PAGE_TIER, PAGE_TIER, PAGE_TIER};
  15. #if PAGE_TYPE_MAX != 1
  16. #error PAGE_TYPE_MAX is not 1 - you need to add allocations here
  17. #endif
  18. size_t page_type_size[256] = {sizeof(storage_number), sizeof(storage_number_tier1_t)};
  19. __attribute__((constructor)) void initialize_multidb_ctx(void) {
  20. multidb_ctx[0] = &multidb_ctx_storage_tier0;
  21. multidb_ctx[1] = &multidb_ctx_storage_tier1;
  22. multidb_ctx[2] = &multidb_ctx_storage_tier2;
  23. multidb_ctx[3] = &multidb_ctx_storage_tier3;
  24. multidb_ctx[4] = &multidb_ctx_storage_tier4;
  25. }
  26. int db_engine_use_malloc = 0;
  27. int default_rrdeng_page_fetch_timeout = 3;
  28. int default_rrdeng_page_fetch_retries = 3;
  29. int default_rrdeng_page_cache_mb = 32;
  30. int default_rrdeng_disk_quota_mb = 256;
  31. int default_multidb_disk_quota_mb = 256;
  32. /* Default behaviour is to unblock data collection if the page cache is full of dirty pages by dropping metrics */
  33. uint8_t rrdeng_drop_metrics_under_page_cache_pressure = 1;
  34. // ----------------------------------------------------------------------------
  35. // metrics groups
  36. STORAGE_METRICS_GROUP *rrdeng_metrics_group_get(STORAGE_INSTANCE *db_instance __maybe_unused, uuid_t *uuid __maybe_unused) {
  37. return callocz(1, sizeof(struct pg_alignment));
  38. }
  39. void rrdeng_metrics_group_release(STORAGE_INSTANCE *db_instance, STORAGE_METRICS_GROUP *smg) {
  40. if(!smg) return;
  41. struct rrdengine_instance *ctx = (struct rrdengine_instance *)db_instance;
  42. struct pg_alignment *pa = (struct pg_alignment *)smg;
  43. struct page_cache *pg_cache = &ctx->pg_cache;
  44. uv_rwlock_rdlock(&pg_cache->metrics_index.lock);
  45. if(pa->refcount == 0)
  46. freez(pa);
  47. uv_rwlock_rdunlock(&pg_cache->metrics_index.lock);
  48. }
  49. // ----------------------------------------------------------------------------
  50. // metric handle for legacy dbs
  51. /* This UUID is not unique across hosts */
  52. void rrdeng_generate_legacy_uuid(const char *dim_id, const char *chart_id, uuid_t *ret_uuid)
  53. {
  54. EVP_MD_CTX *evpctx;
  55. unsigned char hash_value[EVP_MAX_MD_SIZE];
  56. unsigned int hash_len;
  57. evpctx = EVP_MD_CTX_create();
  58. EVP_DigestInit_ex(evpctx, EVP_sha256(), NULL);
  59. EVP_DigestUpdate(evpctx, dim_id, strlen(dim_id));
  60. EVP_DigestUpdate(evpctx, chart_id, strlen(chart_id));
  61. EVP_DigestFinal_ex(evpctx, hash_value, &hash_len);
  62. EVP_MD_CTX_destroy(evpctx);
  63. fatal_assert(hash_len > sizeof(uuid_t));
  64. memcpy(ret_uuid, hash_value, sizeof(uuid_t));
  65. }
  66. /* Transform legacy UUID to be unique across hosts deterministically */
  67. void rrdeng_convert_legacy_uuid_to_multihost(char machine_guid[GUID_LEN + 1], uuid_t *legacy_uuid, uuid_t *ret_uuid)
  68. {
  69. EVP_MD_CTX *evpctx;
  70. unsigned char hash_value[EVP_MAX_MD_SIZE];
  71. unsigned int hash_len;
  72. evpctx = EVP_MD_CTX_create();
  73. EVP_DigestInit_ex(evpctx, EVP_sha256(), NULL);
  74. EVP_DigestUpdate(evpctx, machine_guid, GUID_LEN);
  75. EVP_DigestUpdate(evpctx, *legacy_uuid, sizeof(uuid_t));
  76. EVP_DigestFinal_ex(evpctx, hash_value, &hash_len);
  77. EVP_MD_CTX_destroy(evpctx);
  78. fatal_assert(hash_len > sizeof(uuid_t));
  79. memcpy(ret_uuid, hash_value, sizeof(uuid_t));
  80. }
  81. STORAGE_METRIC_HANDLE *rrdeng_metric_get_legacy(STORAGE_INSTANCE *db_instance, const char *rd_id, const char *st_id, STORAGE_METRICS_GROUP *smg) {
  82. uuid_t legacy_uuid;
  83. rrdeng_generate_legacy_uuid(rd_id, st_id, &legacy_uuid);
  84. return rrdeng_metric_get(db_instance, &legacy_uuid, smg);
  85. }
  86. // ----------------------------------------------------------------------------
  87. // metric handle
  88. void rrdeng_metric_release(STORAGE_METRIC_HANDLE *db_metric_handle) {
  89. struct pg_cache_page_index *page_index = (struct pg_cache_page_index *)db_metric_handle;
  90. unsigned short refcount = __atomic_sub_fetch(&page_index->refcount, 1, __ATOMIC_SEQ_CST);
  91. if(refcount == 0 && page_index->alignment) {
  92. __atomic_sub_fetch(&page_index->alignment->refcount, 1, __ATOMIC_SEQ_CST);
  93. page_index->alignment = NULL;
  94. }
  95. }
  96. STORAGE_METRIC_HANDLE *rrdeng_metric_dup(STORAGE_METRIC_HANDLE *db_metric_handle) {
  97. struct pg_cache_page_index *page_index = (struct pg_cache_page_index *)db_metric_handle;
  98. __atomic_add_fetch(&page_index->refcount, 1, __ATOMIC_SEQ_CST);
  99. return db_metric_handle;
  100. }
  101. STORAGE_METRIC_HANDLE *rrdeng_metric_get(STORAGE_INSTANCE *db_instance, uuid_t *uuid, STORAGE_METRICS_GROUP *smg) {
  102. struct rrdengine_instance *ctx = (struct rrdengine_instance *)db_instance;
  103. struct pg_alignment *pa = (struct pg_alignment *)smg;
  104. struct page_cache *pg_cache = &ctx->pg_cache;
  105. struct pg_cache_page_index *page_index = NULL;
  106. uv_rwlock_rdlock(&pg_cache->metrics_index.lock);
  107. Pvoid_t *PValue = JudyHSGet(pg_cache->metrics_index.JudyHS_array, uuid, sizeof(uuid_t));
  108. if (likely(NULL != PValue))
  109. page_index = *PValue;
  110. uv_rwlock_rdunlock(&pg_cache->metrics_index.lock);
  111. if (likely(page_index)) {
  112. __atomic_add_fetch(&page_index->refcount, 1, __ATOMIC_SEQ_CST);
  113. if(pa) {
  114. if(page_index->alignment && page_index->alignment != pa)
  115. fatal("DBENGINE: page_index has a different alignment.");
  116. if(!page_index->alignment) {
  117. page_index->alignment = pa;
  118. __atomic_add_fetch(&pa->refcount, 1, __ATOMIC_SEQ_CST);
  119. }
  120. }
  121. }
  122. return (STORAGE_METRIC_HANDLE *)page_index;
  123. }
  124. STORAGE_METRIC_HANDLE *rrdeng_metric_create(STORAGE_INSTANCE *db_instance, uuid_t *uuid, STORAGE_METRICS_GROUP *smg) {
  125. internal_fatal(!db_instance, "DBENGINE: db_instance is NULL");
  126. struct rrdengine_instance *ctx = (struct rrdengine_instance *)db_instance;
  127. struct pg_alignment *pa = (struct pg_alignment *)smg;
  128. struct pg_cache_page_index *page_index;
  129. struct page_cache *pg_cache = &ctx->pg_cache;
  130. uv_rwlock_wrlock(&pg_cache->metrics_index.lock);
  131. Pvoid_t *PValue = JudyHSIns(&pg_cache->metrics_index.JudyHS_array, uuid, sizeof(uuid_t), PJE0);
  132. fatal_assert(NULL == *PValue); /* TODO: figure out concurrency model */
  133. *PValue = page_index = create_page_index(uuid, ctx);
  134. page_index->prev = pg_cache->metrics_index.last_page_index;
  135. pg_cache->metrics_index.last_page_index = page_index;
  136. page_index->alignment = pa;
  137. page_index->refcount = 1;
  138. if(pa)
  139. pa->refcount++;
  140. uv_rwlock_wrunlock(&pg_cache->metrics_index.lock);
  141. return (STORAGE_METRIC_HANDLE *)page_index;
  142. }
  143. STORAGE_METRIC_HANDLE *rrdeng_metric_get_or_create(RRDDIM *rd, STORAGE_INSTANCE *db_instance, STORAGE_METRICS_GROUP *smg) {
  144. STORAGE_METRIC_HANDLE *db_metric_handle;
  145. db_metric_handle = rrdeng_metric_get(db_instance, &rd->metric_uuid, smg);
  146. if(!db_metric_handle) {
  147. db_metric_handle = rrdeng_metric_get_legacy(db_instance, rrddim_id(rd), rrdset_id(rd->rrdset), smg);
  148. if(db_metric_handle) {
  149. struct pg_cache_page_index *page_index = (struct pg_cache_page_index *)db_metric_handle;
  150. uuid_copy(rd->metric_uuid, page_index->id);
  151. }
  152. }
  153. if(!db_metric_handle)
  154. db_metric_handle = rrdeng_metric_create(db_instance, &rd->metric_uuid, smg);
  155. #ifdef NETDATA_INTERNAL_CHECKS
  156. struct pg_cache_page_index *page_index = (struct pg_cache_page_index *)db_metric_handle;
  157. if(uuid_compare(rd->metric_uuid, page_index->id) != 0) {
  158. char uuid1[UUID_STR_LEN + 1];
  159. char uuid2[UUID_STR_LEN + 1];
  160. uuid_unparse(rd->metric_uuid, uuid1);
  161. uuid_unparse(page_index->id, uuid2);
  162. fatal("DBENGINE: uuids do not match, asked for metric '%s', but got page_index of metric '%s'", uuid1, uuid2);
  163. }
  164. struct rrdengine_instance *ctx = (struct rrdengine_instance *)db_instance;
  165. if(page_index->ctx != ctx)
  166. fatal("DBENGINE: mixed up rrdengine instances, asked for metric from %p, got from %p", ctx, page_index->ctx);
  167. #endif
  168. return db_metric_handle;
  169. }
  170. // ----------------------------------------------------------------------------
  171. // collect ops
  172. /*
  173. * Gets a handle for storing metrics to the database.
  174. * The handle must be released with rrdeng_store_metric_final().
  175. */
  176. STORAGE_COLLECT_HANDLE *rrdeng_store_metric_init(STORAGE_METRIC_HANDLE *db_metric_handle, uint32_t update_every) {
  177. struct pg_cache_page_index *page_index = (struct pg_cache_page_index *)db_metric_handle;
  178. struct rrdeng_collect_handle *handle;
  179. if(!page_index->alignment)
  180. fatal("DBENGINE: metric group is required for collect operations");
  181. handle = callocz(1, sizeof(struct rrdeng_collect_handle));
  182. handle->page_index = page_index;
  183. handle->descr = NULL;
  184. handle->unaligned_page = 0;
  185. page_index->latest_update_every_s = update_every;
  186. uv_rwlock_wrlock(&page_index->lock);
  187. ++page_index->writers;
  188. uv_rwlock_wrunlock(&page_index->lock);
  189. return (STORAGE_COLLECT_HANDLE *)handle;
  190. }
  191. /* The page must be populated and referenced */
  192. static int page_has_only_empty_metrics(struct rrdeng_page_descr *descr)
  193. {
  194. switch(descr->type) {
  195. case PAGE_METRICS: {
  196. size_t slots = descr->page_length / PAGE_POINT_SIZE_BYTES(descr);
  197. storage_number *array = (storage_number *)descr->pg_cache_descr->page;
  198. for (size_t i = 0 ; i < slots; ++i) {
  199. if(does_storage_number_exist(array[i]))
  200. return 0;
  201. }
  202. }
  203. break;
  204. case PAGE_TIER: {
  205. size_t slots = descr->page_length / PAGE_POINT_SIZE_BYTES(descr);
  206. storage_number_tier1_t *array = (storage_number_tier1_t *)descr->pg_cache_descr->page;
  207. for (size_t i = 0 ; i < slots; ++i) {
  208. if(fpclassify(array[i].sum_value) != FP_NAN)
  209. return 0;
  210. }
  211. }
  212. break;
  213. default: {
  214. static bool logged = false;
  215. if(!logged) {
  216. error("DBENGINE: cannot check page for nulls on unknown page type id %d", descr->type);
  217. logged = true;
  218. }
  219. return 0;
  220. }
  221. }
  222. return 1;
  223. }
  224. void rrdeng_store_metric_flush_current_page(STORAGE_COLLECT_HANDLE *collection_handle) {
  225. struct rrdeng_collect_handle *handle = (struct rrdeng_collect_handle *)collection_handle;
  226. // struct rrdeng_metric_handle *metric_handle = (struct rrdeng_metric_handle *)handle->metric_handle;
  227. struct rrdengine_instance *ctx = handle->page_index->ctx;
  228. struct rrdeng_page_descr *descr = handle->descr;
  229. if (unlikely(!ctx)) return;
  230. if (unlikely(!descr)) return;
  231. if (likely(descr->page_length)) {
  232. int page_is_empty;
  233. rrd_stat_atomic_add(&ctx->stats.metric_API_producers, -1);
  234. page_is_empty = page_has_only_empty_metrics(descr);
  235. if (page_is_empty) {
  236. print_page_cache_descr(descr, "Page has empty metrics only, deleting", true);
  237. pg_cache_put(ctx, descr);
  238. pg_cache_punch_hole(ctx, descr, 1, 0, NULL);
  239. } else
  240. rrdeng_commit_page(ctx, descr, handle->page_correlation_id);
  241. } else {
  242. dbengine_page_free(descr->pg_cache_descr->page);
  243. rrdeng_destroy_pg_cache_descr(ctx, descr->pg_cache_descr);
  244. rrdeng_page_descr_freez(descr);
  245. }
  246. handle->descr = NULL;
  247. }
  248. static void rrdeng_store_metric_next_internal(STORAGE_COLLECT_HANDLE *collection_handle,
  249. usec_t point_in_time_ut,
  250. NETDATA_DOUBLE n,
  251. NETDATA_DOUBLE min_value,
  252. NETDATA_DOUBLE max_value,
  253. uint16_t count,
  254. uint16_t anomaly_count,
  255. SN_FLAGS flags)
  256. {
  257. struct rrdeng_collect_handle *handle = (struct rrdeng_collect_handle *)collection_handle;
  258. struct pg_cache_page_index *page_index = handle->page_index;
  259. struct rrdengine_instance *ctx = handle->page_index->ctx;
  260. struct page_cache *pg_cache = &ctx->pg_cache;
  261. struct rrdeng_page_descr *descr = handle->descr;
  262. void *page;
  263. uint8_t must_flush_unaligned_page = 0, perfect_page_alignment = 0;
  264. if (descr) {
  265. /* Make alignment decisions */
  266. #ifdef NETDATA_INTERNAL_CHECKS
  267. if(descr->end_time_ut + page_index->latest_update_every_s * USEC_PER_SEC != point_in_time_ut) {
  268. char buffer[200 + 1];
  269. snprintfz(buffer, 200,
  270. "metrics collected are %s, end_time_ut = %llu, point_in_time_ut = %llu, update_every = %u, delta = %llu",
  271. (point_in_time_ut / USEC_PER_SEC - descr->end_time_ut / USEC_PER_SEC > page_index->latest_update_every_s)?"far apart":"not aligned",
  272. descr->end_time_ut / USEC_PER_SEC,
  273. point_in_time_ut / USEC_PER_SEC,
  274. page_index->latest_update_every_s,
  275. point_in_time_ut / USEC_PER_SEC - descr->end_time_ut / USEC_PER_SEC);
  276. print_page_cache_descr(descr, buffer, false);
  277. }
  278. #endif
  279. if (descr->page_length == page_index->alignment->page_length) {
  280. /* this is the leading dimension that defines chart alignment */
  281. perfect_page_alignment = 1;
  282. }
  283. /* is the metric far enough out of alignment with the others? */
  284. if (unlikely(descr->page_length + PAGE_POINT_SIZE_BYTES(descr) < page_index->alignment->page_length)) {
  285. handle->unaligned_page = 1;
  286. print_page_cache_descr(descr, "Metric page is not aligned with chart", true);
  287. }
  288. if (unlikely(handle->unaligned_page &&
  289. /* did the other metrics change page? */
  290. page_index->alignment->page_length <= PAGE_POINT_SIZE_BYTES(descr))) {
  291. print_page_cache_descr(descr, "must_flush_unaligned_page = 1", true);
  292. must_flush_unaligned_page = 1;
  293. handle->unaligned_page = 0;
  294. }
  295. }
  296. if (unlikely(NULL == descr ||
  297. descr->page_length + PAGE_POINT_SIZE_BYTES(descr) > RRDENG_BLOCK_SIZE ||
  298. must_flush_unaligned_page)) {
  299. if(descr) {
  300. print_page_cache_descr(descr, "flushing metric", true);
  301. rrdeng_store_metric_flush_current_page(collection_handle);
  302. }
  303. page = rrdeng_create_page(ctx, &page_index->id, &descr);
  304. fatal_assert(page);
  305. descr->update_every_s = page_index->latest_update_every_s;
  306. handle->descr = descr;
  307. handle->page_correlation_id = rrd_atomic_fetch_add(&pg_cache->committed_page_index.latest_corr_id, 1);
  308. if (0 == page_index->alignment->page_length) {
  309. /* this is the leading dimension that defines chart alignment */
  310. perfect_page_alignment = 1;
  311. }
  312. }
  313. page = descr->pg_cache_descr->page;
  314. switch (descr->type) {
  315. case PAGE_METRICS: {
  316. ((storage_number *)page)[descr->page_length / PAGE_POINT_SIZE_BYTES(descr)] = pack_storage_number(n, flags);
  317. }
  318. break;
  319. case PAGE_TIER: {
  320. storage_number_tier1_t number_tier1;
  321. number_tier1.sum_value = (float)n;
  322. number_tier1.min_value = (float)min_value;
  323. number_tier1.max_value = (float)max_value;
  324. number_tier1.anomaly_count = anomaly_count;
  325. number_tier1.count = count;
  326. ((storage_number_tier1_t *)page)[descr->page_length / PAGE_POINT_SIZE_BYTES(descr)] = number_tier1;
  327. }
  328. break;
  329. default: {
  330. static bool logged = false;
  331. if(!logged) {
  332. error("DBENGINE: cannot store metric on unknown page type id %d", descr->type);
  333. logged = true;
  334. }
  335. }
  336. break;
  337. }
  338. pg_cache_atomic_set_pg_info(descr, point_in_time_ut, descr->page_length + PAGE_POINT_SIZE_BYTES(descr));
  339. if (perfect_page_alignment)
  340. page_index->alignment->page_length = descr->page_length;
  341. if (unlikely(INVALID_TIME == descr->start_time_ut)) {
  342. unsigned long new_metric_API_producers, old_metric_API_max_producers, ret_metric_API_max_producers;
  343. descr->start_time_ut = point_in_time_ut;
  344. new_metric_API_producers = rrd_atomic_add_fetch(&ctx->stats.metric_API_producers, 1);
  345. while (unlikely(new_metric_API_producers > (old_metric_API_max_producers = ctx->metric_API_max_producers))) {
  346. /* Increase ctx->metric_API_max_producers */
  347. ret_metric_API_max_producers = ulong_compare_and_swap(&ctx->metric_API_max_producers,
  348. old_metric_API_max_producers,
  349. new_metric_API_producers);
  350. if (old_metric_API_max_producers == ret_metric_API_max_producers) {
  351. /* success */
  352. break;
  353. }
  354. }
  355. pg_cache_insert(ctx, page_index, descr);
  356. } else {
  357. pg_cache_add_new_metric_time(page_index, descr);
  358. }
  359. // {
  360. // unsigned char u[16] = { 0x0C, 0x0A, 0x40, 0xD6, 0x2A, 0x43, 0x4A, 0x7C, 0x95, 0xF7, 0xD1, 0x1E, 0x0C, 0x9E, 0x8A, 0xE7 };
  361. // if(uuid_compare(u, page_index->id) == 0) {
  362. // char buffer[100];
  363. // snprintfz(buffer, 100, "store system.cpu, collect:%u, page_index first:%u, last:%u",
  364. // (uint32_t)(point_in_time / USEC_PER_SEC),
  365. // (uint32_t)(page_index->oldest_time / USEC_PER_SEC),
  366. // (uint32_t)(page_index->latest_time / USEC_PER_SEC));
  367. //
  368. // print_page_cache_descr(descr, buffer, false);
  369. // }
  370. // }
  371. }
  372. void rrdeng_store_metric_next(STORAGE_COLLECT_HANDLE *collection_handle,
  373. usec_t point_in_time_ut,
  374. NETDATA_DOUBLE n,
  375. NETDATA_DOUBLE min_value,
  376. NETDATA_DOUBLE max_value,
  377. uint16_t count,
  378. uint16_t anomaly_count,
  379. SN_FLAGS flags)
  380. {
  381. struct rrdeng_collect_handle *handle = (struct rrdeng_collect_handle *)collection_handle;
  382. struct pg_cache_page_index *page_index = handle->page_index;
  383. struct rrdeng_page_descr *descr = handle->descr;
  384. if(likely(descr)) {
  385. usec_t last_point_in_time_ut = descr->end_time_ut;
  386. usec_t update_every_ut = page_index->latest_update_every_s * USEC_PER_SEC;
  387. size_t points_gap = (point_in_time_ut <= last_point_in_time_ut) ?
  388. (size_t)0 :
  389. (size_t)((point_in_time_ut - last_point_in_time_ut) / update_every_ut);
  390. if(unlikely(points_gap != 1)) {
  391. if (unlikely(points_gap <= 0)) {
  392. time_t now = now_realtime_sec();
  393. static __thread size_t counter = 0;
  394. static __thread time_t last_time_logged = 0;
  395. counter++;
  396. if(now - last_time_logged > 600) {
  397. error("DBENGINE: collected point is in the past (repeated %zu times in the last %zu secs). Ignoring these data collection points.",
  398. counter, (size_t)(last_time_logged?(now - last_time_logged):0));
  399. last_time_logged = now;
  400. counter = 0;
  401. }
  402. return;
  403. }
  404. size_t point_size = PAGE_POINT_SIZE_BYTES(descr);
  405. size_t page_size_in_points = RRDENG_BLOCK_SIZE / point_size;
  406. size_t used_points = descr->page_length / point_size;
  407. size_t remaining_points_in_page = page_size_in_points - used_points;
  408. bool new_point_is_aligned = true;
  409. if(unlikely((point_in_time_ut - last_point_in_time_ut) / points_gap != update_every_ut))
  410. new_point_is_aligned = false;
  411. if(unlikely(points_gap > remaining_points_in_page || !new_point_is_aligned)) {
  412. // char buffer[200];
  413. // snprintfz(buffer, 200, "data collection skipped %zu points, last stored point %llu, new point %llu, update every %d. Cutting page.",
  414. // points_gap, last_point_in_time_ut / USEC_PER_SEC, point_in_time_ut / USEC_PER_SEC, page_index->latest_update_every_s);
  415. // print_page_cache_descr(descr, buffer, false);
  416. rrdeng_store_metric_flush_current_page(collection_handle);
  417. }
  418. else {
  419. // char buffer[200];
  420. // snprintfz(buffer, 200, "data collection skipped %zu points, last stored point %llu, new point %llu, update every %d. Filling the gap.",
  421. // points_gap, last_point_in_time_ut / USEC_PER_SEC, point_in_time_ut / USEC_PER_SEC, page_index->latest_update_every_s);
  422. // print_page_cache_descr(descr, buffer, false);
  423. // loop to fill the gap
  424. usec_t step_ut = page_index->latest_update_every_s * USEC_PER_SEC;
  425. usec_t last_point_filled_ut = last_point_in_time_ut + step_ut;
  426. while (last_point_filled_ut < point_in_time_ut) {
  427. rrdeng_store_metric_next_internal(
  428. collection_handle, last_point_filled_ut, NAN, NAN, NAN,
  429. 1, 0, SN_EMPTY_SLOT);
  430. last_point_filled_ut += step_ut;
  431. }
  432. }
  433. }
  434. }
  435. rrdeng_store_metric_next_internal(collection_handle, point_in_time_ut, n, min_value, max_value, count, anomaly_count, flags);
  436. }
  437. /*
  438. * Releases the database reference from the handle for storing metrics.
  439. * Returns 1 if it's safe to delete the dimension.
  440. */
  441. int rrdeng_store_metric_finalize(STORAGE_COLLECT_HANDLE *collection_handle) {
  442. struct rrdeng_collect_handle *handle = (struct rrdeng_collect_handle *)collection_handle;
  443. struct pg_cache_page_index *page_index = handle->page_index;
  444. uint8_t can_delete_metric = 0;
  445. rrdeng_store_metric_flush_current_page(collection_handle);
  446. uv_rwlock_wrlock(&page_index->lock);
  447. if (!--page_index->writers && !page_index->page_count) {
  448. can_delete_metric = 1;
  449. }
  450. uv_rwlock_wrunlock(&page_index->lock);
  451. freez(handle);
  452. return can_delete_metric;
  453. }
  454. void rrdeng_store_metric_change_collection_frequency(STORAGE_COLLECT_HANDLE *collection_handle, int update_every) {
  455. struct rrdeng_collect_handle *handle = (struct rrdeng_collect_handle *)collection_handle;
  456. struct pg_cache_page_index *page_index = handle->page_index;
  457. rrdeng_store_metric_flush_current_page(collection_handle);
  458. uv_rwlock_rdlock(&page_index->lock);
  459. page_index->latest_update_every_s = update_every;
  460. uv_rwlock_rdunlock(&page_index->lock);
  461. }
  462. // ----------------------------------------------------------------------------
  463. // query ops
  464. //static inline uint32_t *pginfo_to_dt(struct rrdeng_page_info *page_info)
  465. //{
  466. // return (uint32_t *)&page_info->scratch[0];
  467. //}
  468. //
  469. //static inline uint32_t *pginfo_to_points(struct rrdeng_page_info *page_info)
  470. //{
  471. // return (uint32_t *)&page_info->scratch[sizeof(uint32_t)];
  472. //}
  473. //
  474. /*
  475. * Gets a handle for loading metrics from the database.
  476. * The handle must be released with rrdeng_load_metric_final().
  477. */
  478. void rrdeng_load_metric_init(STORAGE_METRIC_HANDLE *db_metric_handle, struct storage_engine_query_handle *rrdimm_handle, time_t start_time_s, time_t end_time_s)
  479. {
  480. struct pg_cache_page_index *page_index = (struct pg_cache_page_index *)db_metric_handle;
  481. struct rrdengine_instance *ctx = page_index->ctx;
  482. // fprintf(stderr, "%s: %s/%s start time %ld, end time %ld\n", __FUNCTION__ , rd->rrdset->name, rd->name, start_time, end_time);
  483. struct rrdeng_query_handle *handle;
  484. unsigned pages_nr;
  485. if(!page_index->latest_update_every_s)
  486. page_index->latest_update_every_s = default_rrd_update_every;
  487. rrdimm_handle->start_time_s = start_time_s;
  488. rrdimm_handle->end_time_s = end_time_s;
  489. handle = callocz(1, sizeof(struct rrdeng_query_handle));
  490. handle->wanted_start_time_s = start_time_s;
  491. handle->now_s = start_time_s;
  492. handle->position = 0;
  493. handle->ctx = ctx;
  494. handle->descr = NULL;
  495. handle->dt_s = page_index->latest_update_every_s;
  496. rrdimm_handle->handle = (STORAGE_QUERY_HANDLE *)handle;
  497. pages_nr = pg_cache_preload(ctx, &page_index->id, start_time_s * USEC_PER_SEC, end_time_s * USEC_PER_SEC,
  498. NULL, &handle->page_index);
  499. if (unlikely(NULL == handle->page_index || 0 == pages_nr))
  500. // there are no metrics to load
  501. handle->wanted_start_time_s = INVALID_TIME;
  502. }
  503. static int rrdeng_load_page_next(struct storage_engine_query_handle *rrdimm_handle, bool debug_this __maybe_unused) {
  504. struct rrdeng_query_handle *handle = (struct rrdeng_query_handle *)rrdimm_handle->handle;
  505. struct rrdengine_instance *ctx = handle->ctx;
  506. struct rrdeng_page_descr *descr = handle->descr;
  507. uint32_t page_length;
  508. usec_t page_end_time_ut;
  509. unsigned position;
  510. if (likely(descr)) {
  511. // Drop old page's reference
  512. #ifdef NETDATA_INTERNAL_CHECKS
  513. rrd_stat_atomic_add(&ctx->stats.metric_API_consumers, -1);
  514. #endif
  515. pg_cache_put(ctx, descr);
  516. handle->descr = NULL;
  517. handle->wanted_start_time_s = (time_t)((handle->page_end_time_ut / USEC_PER_SEC) + handle->dt_s);
  518. if (unlikely(handle->wanted_start_time_s > rrdimm_handle->end_time_s))
  519. return 1;
  520. }
  521. usec_t wanted_start_time_ut = handle->wanted_start_time_s * USEC_PER_SEC;
  522. descr = pg_cache_lookup_next(ctx, handle->page_index, &handle->page_index->id,
  523. wanted_start_time_ut, rrdimm_handle->end_time_s * USEC_PER_SEC);
  524. if (NULL == descr)
  525. return 1;
  526. #ifdef NETDATA_INTERNAL_CHECKS
  527. rrd_stat_atomic_add(&ctx->stats.metric_API_consumers, 1);
  528. #endif
  529. handle->descr = descr;
  530. pg_cache_atomic_get_pg_info(descr, &page_end_time_ut, &page_length);
  531. if (unlikely(INVALID_TIME == descr->start_time_ut || INVALID_TIME == page_end_time_ut || 0 == descr->update_every_s)) {
  532. error("DBENGINE: discarding invalid page descriptor (start_time = %llu, end_time = %llu, update_every_s = %d)",
  533. descr->start_time_ut, page_end_time_ut, descr->update_every_s);
  534. return 1;
  535. }
  536. if (unlikely(descr->start_time_ut != page_end_time_ut && wanted_start_time_ut > descr->start_time_ut)) {
  537. // we're in the middle of the page somewhere
  538. unsigned entries = page_length / PAGE_POINT_SIZE_BYTES(descr);
  539. position = ((uint64_t)(wanted_start_time_ut - descr->start_time_ut)) * (entries - 1) /
  540. (page_end_time_ut - descr->start_time_ut);
  541. }
  542. else
  543. position = 0;
  544. handle->page_end_time_ut = page_end_time_ut;
  545. handle->page_length = page_length;
  546. handle->entries = page_length / PAGE_POINT_SIZE_BYTES(descr);
  547. handle->page = descr->pg_cache_descr->page;
  548. handle->dt_s = descr->update_every_s;
  549. handle->position = position;
  550. // if(debug_this)
  551. // info("DBENGINE: rrdeng_load_page_next(), "
  552. // "position:%d, "
  553. // "start_time_ut:%llu, "
  554. // "page_end_time_ut:%llu, "
  555. // "next_page_time_ut:%llu, "
  556. // "in_out:%s"
  557. // , position
  558. // , descr->start_time_ut
  559. // , page_end_time_ut
  560. // ,
  561. // wanted_start_time_ut, in_out?"true":"false"
  562. // );
  563. return 0;
  564. }
  565. // Returns the metric and sets its timestamp into current_time
  566. // IT IS REQUIRED TO **ALWAYS** SET ALL RETURN VALUES (current_time, end_time, flags)
  567. // IT IS REQUIRED TO **ALWAYS** KEEP TRACK OF TIME, EVEN OUTSIDE THE DATABASE BOUNDARIES
  568. STORAGE_POINT rrdeng_load_metric_next(struct storage_engine_query_handle *rrddim_handle) {
  569. struct rrdeng_query_handle *handle = (struct rrdeng_query_handle *)rrddim_handle->handle;
  570. // struct rrdeng_metric_handle *metric_handle = handle->metric_handle;
  571. struct rrdeng_page_descr *descr = handle->descr;
  572. time_t now = handle->now_s + handle->dt_s;
  573. // bool debug_this = false;
  574. // {
  575. // unsigned char u[16] = { 0x0C, 0x0A, 0x40, 0xD6, 0x2A, 0x43, 0x4A, 0x7C, 0x95, 0xF7, 0xD1, 0x1E, 0x0C, 0x9E, 0x8A, 0xE7 };
  576. // if(uuid_compare(u, handle->page_index->id) == 0) {
  577. // char buffer[100];
  578. // snprintfz(buffer, 100, "load system.cpu, now:%u, dt:%u, position:%u page_index first:%u, last:%u",
  579. // (uint32_t)(now),
  580. // (uint32_t)(handle->dt_s),
  581. // (uint32_t)(handle->position),
  582. // (uint32_t)(handle->page_index->oldest_time / USEC_PER_SEC),
  583. // (uint32_t)(handle->page_index->latest_time / USEC_PER_SEC));
  584. //
  585. // print_page_cache_descr(descr, buffer, false);
  586. // debug_this = true;
  587. // }
  588. // }
  589. STORAGE_POINT sp;
  590. unsigned position = handle->position + 1;
  591. storage_number_tier1_t tier1_value;
  592. if (unlikely(INVALID_TIME == handle->wanted_start_time_s)) {
  593. handle->wanted_start_time_s = INVALID_TIME;
  594. handle->now_s = now;
  595. storage_point_empty(sp, now - handle->dt_s, now);
  596. return sp;
  597. }
  598. if (unlikely(!descr || position >= handle->entries)) {
  599. // We need to get a new page
  600. if(rrdeng_load_page_next(rrddim_handle, false)) {
  601. // next calls will not load any more metrics
  602. handle->wanted_start_time_s = INVALID_TIME;
  603. handle->now_s = now;
  604. storage_point_empty(sp, now - handle->dt_s, now);
  605. return sp;
  606. }
  607. descr = handle->descr;
  608. position = handle->position;
  609. now = (time_t)((descr->start_time_ut / USEC_PER_SEC) + position * descr->update_every_s);
  610. // if(debug_this) {
  611. // char buffer[100];
  612. // snprintfz(buffer, 100, "NEW PAGE system.cpu, now:%u, dt:%u, position:%u page_index first:%u, last:%u",
  613. // (uint32_t)(now),
  614. // (uint32_t)(handle->dt_s),
  615. // (uint32_t)(handle->position),
  616. // (uint32_t)(handle->page_index->oldest_time / USEC_PER_SEC),
  617. // (uint32_t)(handle->page_index->latest_time / USEC_PER_SEC));
  618. //
  619. // print_page_cache_descr(descr, buffer, false);
  620. // }
  621. }
  622. sp.start_time = now - handle->dt_s;
  623. sp.end_time = now;
  624. handle->position = position;
  625. handle->now_s = now;
  626. switch(descr->type) {
  627. case PAGE_METRICS: {
  628. storage_number n = handle->page[position];
  629. sp.min = sp.max = sp.sum = unpack_storage_number(n);
  630. sp.flags = n & SN_USER_FLAGS;
  631. sp.count = 1;
  632. sp.anomaly_count = is_storage_number_anomalous(n) ? 1 : 0;
  633. }
  634. break;
  635. case PAGE_TIER: {
  636. tier1_value = ((storage_number_tier1_t *)handle->page)[position];
  637. sp.flags = tier1_value.anomaly_count ? SN_FLAG_NONE : SN_FLAG_NOT_ANOMALOUS;
  638. sp.count = tier1_value.count;
  639. sp.anomaly_count = tier1_value.anomaly_count;
  640. sp.min = tier1_value.min_value;
  641. sp.max = tier1_value.max_value;
  642. sp.sum = tier1_value.sum_value;
  643. }
  644. break;
  645. // we don't know this page type
  646. default: {
  647. static bool logged = false;
  648. if(!logged) {
  649. error("DBENGINE: unknown page type %d found. Cannot decode it. Ignoring its metrics.", descr->type);
  650. logged = true;
  651. }
  652. storage_point_empty(sp, sp.start_time, sp.end_time);
  653. }
  654. break;
  655. }
  656. if (unlikely(now >= rrddim_handle->end_time_s)) {
  657. // next calls will not load any more metrics
  658. handle->wanted_start_time_s = INVALID_TIME;
  659. }
  660. // if(debug_this)
  661. // info("DBENGINE: returning point: "
  662. // "time from %ld to %ld // query from %ld to %ld // wanted_start_time_s %ld"
  663. // , sp.start_time, sp.end_time
  664. // , rrddim_handle->start_time_s, rrddim_handle->end_time_s
  665. // , handle->wanted_start_time_s
  666. // );
  667. return sp;
  668. }
  669. int rrdeng_load_metric_is_finished(struct storage_engine_query_handle *rrdimm_handle)
  670. {
  671. struct rrdeng_query_handle *handle = (struct rrdeng_query_handle *)rrdimm_handle->handle;
  672. return (INVALID_TIME == handle->wanted_start_time_s);
  673. }
  674. /*
  675. * Releases the database reference from the handle for loading metrics.
  676. */
  677. void rrdeng_load_metric_finalize(struct storage_engine_query_handle *rrdimm_handle)
  678. {
  679. struct rrdeng_query_handle *handle = (struct rrdeng_query_handle *)rrdimm_handle->handle;
  680. struct rrdengine_instance *ctx = handle->ctx;
  681. struct rrdeng_page_descr *descr = handle->descr;
  682. if (descr) {
  683. #ifdef NETDATA_INTERNAL_CHECKS
  684. rrd_stat_atomic_add(&ctx->stats.metric_API_consumers, -1);
  685. #endif
  686. pg_cache_put(ctx, descr);
  687. }
  688. // whatever is allocated at rrdeng_load_metric_init() should be freed here
  689. freez(handle);
  690. rrdimm_handle->handle = NULL;
  691. }
  692. time_t rrdeng_metric_latest_time(STORAGE_METRIC_HANDLE *db_metric_handle) {
  693. struct pg_cache_page_index *page_index = (struct pg_cache_page_index *)db_metric_handle;
  694. return (time_t)(page_index->latest_time_ut / USEC_PER_SEC);
  695. }
  696. time_t rrdeng_metric_oldest_time(STORAGE_METRIC_HANDLE *db_metric_handle) {
  697. struct pg_cache_page_index *page_index = (struct pg_cache_page_index *)db_metric_handle;
  698. return (time_t)(page_index->oldest_time_ut / USEC_PER_SEC);
  699. }
  700. int rrdeng_metric_retention_by_uuid(STORAGE_INSTANCE *si, uuid_t *dim_uuid, time_t *first_entry_t, time_t *last_entry_t)
  701. {
  702. struct page_cache *pg_cache;
  703. struct rrdengine_instance *ctx;
  704. Pvoid_t *PValue;
  705. struct pg_cache_page_index *page_index = NULL;
  706. ctx = (struct rrdengine_instance *)si;
  707. if (unlikely(!ctx)) {
  708. error("DBENGINE: invalid STORAGE INSTANCE to %s()", __FUNCTION__);
  709. return 1;
  710. }
  711. pg_cache = &ctx->pg_cache;
  712. uv_rwlock_rdlock(&pg_cache->metrics_index.lock);
  713. PValue = JudyHSGet(pg_cache->metrics_index.JudyHS_array, dim_uuid, sizeof(uuid_t));
  714. if (likely(NULL != PValue)) {
  715. page_index = *PValue;
  716. }
  717. uv_rwlock_rdunlock(&pg_cache->metrics_index.lock);
  718. if (likely(page_index)) {
  719. *first_entry_t = page_index->oldest_time_ut / USEC_PER_SEC;
  720. *last_entry_t = page_index->latest_time_ut / USEC_PER_SEC;
  721. return 0;
  722. }
  723. return 1;
  724. }
  725. /* Also gets a reference for the page */
  726. void *rrdeng_create_page(struct rrdengine_instance *ctx, uuid_t *id, struct rrdeng_page_descr **ret_descr)
  727. {
  728. struct rrdeng_page_descr *descr;
  729. struct page_cache_descr *pg_cache_descr;
  730. void *page;
  731. /* TODO: check maximum number of pages in page cache limit */
  732. descr = pg_cache_create_descr();
  733. descr->id = id; /* TODO: add page type: metric, log, something? */
  734. descr->type = ctx->page_type;
  735. page = dbengine_page_alloc(); /*TODO: add page size */
  736. rrdeng_page_descr_mutex_lock(ctx, descr);
  737. pg_cache_descr = descr->pg_cache_descr;
  738. pg_cache_descr->page = page;
  739. pg_cache_descr->flags = RRD_PAGE_DIRTY /*| RRD_PAGE_LOCKED */ | RRD_PAGE_POPULATED /* | BEING_COLLECTED */;
  740. pg_cache_descr->refcnt = 1;
  741. debug(D_RRDENGINE, "Created new page:");
  742. if (unlikely(debug_flags & D_RRDENGINE))
  743. print_page_cache_descr(descr, "", true);
  744. rrdeng_page_descr_mutex_unlock(ctx, descr);
  745. *ret_descr = descr;
  746. return page;
  747. }
  748. /* The page must not be empty */
  749. void rrdeng_commit_page(struct rrdengine_instance *ctx, struct rrdeng_page_descr *descr,
  750. Word_t page_correlation_id)
  751. {
  752. struct page_cache *pg_cache = &ctx->pg_cache;
  753. Pvoid_t *PValue;
  754. unsigned nr_committed_pages;
  755. if (unlikely(NULL == descr)) {
  756. debug(D_RRDENGINE, "%s: page descriptor is NULL, page has already been force-committed.", __func__);
  757. return;
  758. }
  759. fatal_assert(descr->page_length);
  760. uv_rwlock_wrlock(&pg_cache->committed_page_index.lock);
  761. PValue = JudyLIns(&pg_cache->committed_page_index.JudyL_array, page_correlation_id, PJE0);
  762. *PValue = descr;
  763. nr_committed_pages = ++pg_cache->committed_page_index.nr_committed_pages;
  764. uv_rwlock_wrunlock(&pg_cache->committed_page_index.lock);
  765. if (nr_committed_pages >= pg_cache_hard_limit(ctx) / 2) {
  766. /* over 50% of pages have not been committed yet */
  767. if (ctx->drop_metrics_under_page_cache_pressure &&
  768. nr_committed_pages >= pg_cache_committed_hard_limit(ctx)) {
  769. /* 100% of pages are dirty */
  770. struct rrdeng_cmd cmd;
  771. cmd.opcode = RRDENG_INVALIDATE_OLDEST_MEMORY_PAGE;
  772. rrdeng_enq_cmd(&ctx->worker_config, &cmd);
  773. } else {
  774. if (0 == (unsigned long) ctx->stats.pg_cache_over_half_dirty_events) {
  775. /* only print the first time */
  776. errno = 0;
  777. error("Failed to flush dirty buffers quickly enough in dbengine instance \"%s\". "
  778. "Metric data at risk of not being stored in the database, "
  779. "please reduce disk load or use a faster disk.", ctx->dbfiles_path);
  780. }
  781. rrd_stat_atomic_add(&ctx->stats.pg_cache_over_half_dirty_events, 1);
  782. rrd_stat_atomic_add(&global_pg_cache_over_half_dirty_events, 1);
  783. }
  784. }
  785. pg_cache_put(ctx, descr);
  786. }
  787. /* Gets a reference for the page */
  788. void *rrdeng_get_latest_page(struct rrdengine_instance *ctx, uuid_t *id, void **handle)
  789. {
  790. struct rrdeng_page_descr *descr;
  791. struct page_cache_descr *pg_cache_descr;
  792. debug(D_RRDENGINE, "Reading existing page:");
  793. descr = pg_cache_lookup(ctx, NULL, id, INVALID_TIME);
  794. if (NULL == descr) {
  795. *handle = NULL;
  796. return NULL;
  797. }
  798. *handle = descr;
  799. pg_cache_descr = descr->pg_cache_descr;
  800. return pg_cache_descr->page;
  801. }
  802. /* Gets a reference for the page */
  803. void *rrdeng_get_page(struct rrdengine_instance *ctx, uuid_t *id, usec_t point_in_time_ut, void **handle)
  804. {
  805. struct rrdeng_page_descr *descr;
  806. struct page_cache_descr *pg_cache_descr;
  807. debug(D_RRDENGINE, "Reading existing page:");
  808. descr = pg_cache_lookup(ctx, NULL, id, point_in_time_ut);
  809. if (NULL == descr) {
  810. *handle = NULL;
  811. return NULL;
  812. }
  813. *handle = descr;
  814. pg_cache_descr = descr->pg_cache_descr;
  815. return pg_cache_descr->page;
  816. }
  817. /*
  818. * Gathers Database Engine statistics.
  819. * Careful when modifying this function.
  820. * You must not change the indices of the statistics or user code will break.
  821. * You must not exceed RRDENG_NR_STATS or it will crash.
  822. */
  823. void rrdeng_get_37_statistics(struct rrdengine_instance *ctx, unsigned long long *array)
  824. {
  825. if (ctx == NULL)
  826. return;
  827. struct page_cache *pg_cache = &ctx->pg_cache;
  828. array[0] = (uint64_t)ctx->stats.metric_API_producers;
  829. array[1] = (uint64_t)ctx->stats.metric_API_consumers;
  830. array[2] = (uint64_t)pg_cache->page_descriptors;
  831. array[3] = (uint64_t)pg_cache->populated_pages;
  832. array[4] = (uint64_t)pg_cache->committed_page_index.nr_committed_pages;
  833. array[5] = (uint64_t)ctx->stats.pg_cache_insertions;
  834. array[6] = (uint64_t)ctx->stats.pg_cache_deletions;
  835. array[7] = (uint64_t)ctx->stats.pg_cache_hits;
  836. array[8] = (uint64_t)ctx->stats.pg_cache_misses;
  837. array[9] = (uint64_t)ctx->stats.pg_cache_backfills;
  838. array[10] = (uint64_t)ctx->stats.pg_cache_evictions;
  839. array[11] = (uint64_t)ctx->stats.before_compress_bytes;
  840. array[12] = (uint64_t)ctx->stats.after_compress_bytes;
  841. array[13] = (uint64_t)ctx->stats.before_decompress_bytes;
  842. array[14] = (uint64_t)ctx->stats.after_decompress_bytes;
  843. array[15] = (uint64_t)ctx->stats.io_write_bytes;
  844. array[16] = (uint64_t)ctx->stats.io_write_requests;
  845. array[17] = (uint64_t)ctx->stats.io_read_bytes;
  846. array[18] = (uint64_t)ctx->stats.io_read_requests;
  847. array[19] = (uint64_t)ctx->stats.io_write_extent_bytes;
  848. array[20] = (uint64_t)ctx->stats.io_write_extents;
  849. array[21] = (uint64_t)ctx->stats.io_read_extent_bytes;
  850. array[22] = (uint64_t)ctx->stats.io_read_extents;
  851. array[23] = (uint64_t)ctx->stats.datafile_creations;
  852. array[24] = (uint64_t)ctx->stats.datafile_deletions;
  853. array[25] = (uint64_t)ctx->stats.journalfile_creations;
  854. array[26] = (uint64_t)ctx->stats.journalfile_deletions;
  855. array[27] = (uint64_t)ctx->stats.page_cache_descriptors;
  856. array[28] = (uint64_t)ctx->stats.io_errors;
  857. array[29] = (uint64_t)ctx->stats.fs_errors;
  858. array[30] = (uint64_t)global_io_errors;
  859. array[31] = (uint64_t)global_fs_errors;
  860. array[32] = (uint64_t)rrdeng_reserved_file_descriptors;
  861. array[33] = (uint64_t)ctx->stats.pg_cache_over_half_dirty_events;
  862. array[34] = (uint64_t)global_pg_cache_over_half_dirty_events;
  863. array[35] = (uint64_t)ctx->stats.flushing_pressure_page_deletions;
  864. array[36] = (uint64_t)global_flushing_pressure_page_deletions;
  865. fatal_assert(RRDENG_NR_STATS == 37);
  866. }
  867. /* Releases reference to page */
  868. void rrdeng_put_page(struct rrdengine_instance *ctx, void *handle)
  869. {
  870. (void)ctx;
  871. pg_cache_put(ctx, (struct rrdeng_page_descr *)handle);
  872. }
  873. /*
  874. * Returns 0 on success, negative on error
  875. */
  876. int rrdeng_init(RRDHOST *host, struct rrdengine_instance **ctxp, char *dbfiles_path, unsigned page_cache_mb,
  877. unsigned disk_space_mb, size_t tier) {
  878. struct rrdengine_instance *ctx;
  879. int error;
  880. uint32_t max_open_files;
  881. max_open_files = rlimit_nofile.rlim_cur / 4;
  882. /* reserve RRDENG_FD_BUDGET_PER_INSTANCE file descriptors for this instance */
  883. rrd_stat_atomic_add(&rrdeng_reserved_file_descriptors, RRDENG_FD_BUDGET_PER_INSTANCE);
  884. if (rrdeng_reserved_file_descriptors > max_open_files) {
  885. error(
  886. "Exceeded the budget of available file descriptors (%u/%u), cannot create new dbengine instance.",
  887. (unsigned)rrdeng_reserved_file_descriptors,
  888. (unsigned)max_open_files);
  889. rrd_stat_atomic_add(&global_fs_errors, 1);
  890. rrd_stat_atomic_add(&rrdeng_reserved_file_descriptors, -RRDENG_FD_BUDGET_PER_INSTANCE);
  891. return UV_EMFILE;
  892. }
  893. if(NULL == ctxp) {
  894. ctx = multidb_ctx[tier];
  895. memset(ctx, 0, sizeof(*ctx));
  896. }
  897. else {
  898. *ctxp = ctx = callocz(1, sizeof(*ctx));
  899. }
  900. ctx->tier = tier;
  901. ctx->page_type = tier_page_type[tier];
  902. ctx->global_compress_alg = RRD_LZ4;
  903. if (page_cache_mb < RRDENG_MIN_PAGE_CACHE_SIZE_MB)
  904. page_cache_mb = RRDENG_MIN_PAGE_CACHE_SIZE_MB;
  905. ctx->max_cache_pages = page_cache_mb * (1048576LU / RRDENG_BLOCK_SIZE);
  906. /* try to keep 5% of the page cache free */
  907. ctx->cache_pages_low_watermark = (ctx->max_cache_pages * 95LLU) / 100;
  908. if (disk_space_mb < RRDENG_MIN_DISK_SPACE_MB)
  909. disk_space_mb = RRDENG_MIN_DISK_SPACE_MB;
  910. ctx->max_disk_space = disk_space_mb * 1048576LLU;
  911. strncpyz(ctx->dbfiles_path, dbfiles_path, sizeof(ctx->dbfiles_path) - 1);
  912. ctx->dbfiles_path[sizeof(ctx->dbfiles_path) - 1] = '\0';
  913. if (NULL == host)
  914. strncpyz(ctx->machine_guid, registry_get_this_machine_guid(), GUID_LEN);
  915. else
  916. strncpyz(ctx->machine_guid, host->machine_guid, GUID_LEN);
  917. ctx->drop_metrics_under_page_cache_pressure = rrdeng_drop_metrics_under_page_cache_pressure;
  918. ctx->metric_API_max_producers = 0;
  919. ctx->quiesce = NO_QUIESCE;
  920. ctx->host = host;
  921. memset(&ctx->worker_config, 0, sizeof(ctx->worker_config));
  922. ctx->worker_config.ctx = ctx;
  923. init_page_cache(ctx);
  924. init_commit_log(ctx);
  925. error = init_rrd_files(ctx);
  926. if (error) {
  927. goto error_after_init_rrd_files;
  928. }
  929. completion_init(&ctx->rrdengine_completion);
  930. fatal_assert(0 == uv_thread_create(&ctx->worker_config.thread, rrdeng_worker, &ctx->worker_config));
  931. /* wait for worker thread to initialize */
  932. completion_wait_for(&ctx->rrdengine_completion);
  933. completion_destroy(&ctx->rrdengine_completion);
  934. uv_thread_set_name_np(ctx->worker_config.thread, "LIBUV_WORKER");
  935. if (ctx->worker_config.error) {
  936. goto error_after_rrdeng_worker;
  937. }
  938. // error = metalog_init(ctx);
  939. // if (error) {
  940. // error("Failed to initialize metadata log file event loop.");
  941. // goto error_after_rrdeng_worker;
  942. // }
  943. return 0;
  944. error_after_rrdeng_worker:
  945. finalize_rrd_files(ctx);
  946. error_after_init_rrd_files:
  947. free_page_cache(ctx);
  948. if (!is_storage_engine_shared((STORAGE_INSTANCE *)ctx)) {
  949. freez(ctx);
  950. if (ctxp)
  951. *ctxp = NULL;
  952. }
  953. rrd_stat_atomic_add(&rrdeng_reserved_file_descriptors, -RRDENG_FD_BUDGET_PER_INSTANCE);
  954. return UV_EIO;
  955. }
  956. /*
  957. * Returns 0 on success, 1 on error
  958. */
  959. int rrdeng_exit(struct rrdengine_instance *ctx)
  960. {
  961. struct rrdeng_cmd cmd;
  962. if (NULL == ctx) {
  963. return 1;
  964. }
  965. /* TODO: add page to page cache */
  966. cmd.opcode = RRDENG_SHUTDOWN;
  967. rrdeng_enq_cmd(&ctx->worker_config, &cmd);
  968. fatal_assert(0 == uv_thread_join(&ctx->worker_config.thread));
  969. finalize_rrd_files(ctx);
  970. //metalog_exit(ctx->metalog_ctx);
  971. free_page_cache(ctx);
  972. if(!is_storage_engine_shared((STORAGE_INSTANCE *)ctx))
  973. freez(ctx);
  974. rrd_stat_atomic_add(&rrdeng_reserved_file_descriptors, -RRDENG_FD_BUDGET_PER_INSTANCE);
  975. return 0;
  976. }
  977. void rrdeng_prepare_exit(struct rrdengine_instance *ctx)
  978. {
  979. struct rrdeng_cmd cmd;
  980. if (NULL == ctx) {
  981. return;
  982. }
  983. completion_init(&ctx->rrdengine_completion);
  984. cmd.opcode = RRDENG_QUIESCE;
  985. rrdeng_enq_cmd(&ctx->worker_config, &cmd);
  986. /* wait for dbengine to quiesce */
  987. completion_wait_for(&ctx->rrdengine_completion);
  988. completion_destroy(&ctx->rrdengine_completion);
  989. //metalog_prepare_exit(ctx->metalog_ctx);
  990. }
  991. RRDENG_SIZE_STATS rrdeng_size_statistics(struct rrdengine_instance *ctx) {
  992. RRDENG_SIZE_STATS stats = { 0 };
  993. for(struct pg_cache_page_index *page_index = ctx->pg_cache.metrics_index.last_page_index;
  994. page_index != NULL ;page_index = page_index->prev) {
  995. stats.metrics++;
  996. stats.metrics_pages += page_index->page_count;
  997. }
  998. for(struct rrdengine_datafile *df = ctx->datafiles.first; df ;df = df->next) {
  999. stats.datafiles++;
  1000. for(struct extent_info *ei = df->extents.first; ei ; ei = ei->next) {
  1001. stats.extents++;
  1002. stats.extents_compressed_bytes += ei->size;
  1003. for(int p = 0; p < ei->number_of_pages ;p++) {
  1004. struct rrdeng_page_descr *descr = ei->pages[p];
  1005. usec_t update_every_usec;
  1006. size_t points = descr->page_length / PAGE_POINT_SIZE_BYTES(descr);
  1007. if(likely(points > 1))
  1008. update_every_usec = (descr->end_time_ut - descr->start_time_ut) / (points - 1);
  1009. else {
  1010. update_every_usec = default_rrd_update_every * get_tier_grouping(ctx->tier) * USEC_PER_SEC;
  1011. stats.single_point_pages++;
  1012. }
  1013. time_t duration_secs = (time_t)((descr->end_time_ut - descr->start_time_ut + update_every_usec)/USEC_PER_SEC);
  1014. stats.extents_pages++;
  1015. stats.pages_uncompressed_bytes += descr->page_length;
  1016. stats.pages_duration_secs += duration_secs;
  1017. stats.points += points;
  1018. stats.page_types[descr->type].pages++;
  1019. stats.page_types[descr->type].pages_uncompressed_bytes += descr->page_length;
  1020. stats.page_types[descr->type].pages_duration_secs += duration_secs;
  1021. stats.page_types[descr->type].points += points;
  1022. if(!stats.first_t || (descr->start_time_ut - update_every_usec) < stats.first_t)
  1023. stats.first_t = (descr->start_time_ut - update_every_usec) / USEC_PER_SEC;
  1024. if(!stats.last_t || descr->end_time_ut > stats.last_t)
  1025. stats.last_t = descr->end_time_ut / USEC_PER_SEC;
  1026. }
  1027. }
  1028. }
  1029. stats.currently_collected_metrics = ctx->stats.metric_API_producers;
  1030. stats.max_concurrently_collected_metrics = ctx->metric_API_max_producers;
  1031. internal_error(stats.metrics_pages != stats.extents_pages + stats.currently_collected_metrics,
  1032. "DBENGINE: metrics pages is %zu, but extents pages is %zu and API consumers is %zu",
  1033. stats.metrics_pages, stats.extents_pages, stats.currently_collected_metrics);
  1034. stats.disk_space = ctx->disk_space;
  1035. stats.max_disk_space = ctx->max_disk_space;
  1036. stats.database_retention_secs = (time_t)(stats.last_t - stats.first_t);
  1037. if(stats.extents_pages)
  1038. stats.average_page_size_bytes = (double)stats.pages_uncompressed_bytes / (double)stats.extents_pages;
  1039. if(stats.pages_uncompressed_bytes > 0)
  1040. stats.average_compression_savings = 100.0 - ((double)stats.extents_compressed_bytes * 100.0 / (double)stats.pages_uncompressed_bytes);
  1041. if(stats.points)
  1042. stats.average_point_duration_secs = (double)stats.pages_duration_secs / (double)stats.points;
  1043. if(stats.metrics) {
  1044. stats.average_metric_retention_secs = (double)stats.pages_duration_secs / (double)stats.metrics;
  1045. if(stats.database_retention_secs) {
  1046. double metric_coverage = stats.average_metric_retention_secs / (double)stats.database_retention_secs;
  1047. double db_retention_days = (double)stats.database_retention_secs / 86400.0;
  1048. stats.estimated_concurrently_collected_metrics = stats.metrics * metric_coverage;
  1049. stats.ephemeral_metrics_per_day_percent = ((double)stats.metrics * 100.0 / (double)stats.estimated_concurrently_collected_metrics - 100.0) / (double)db_retention_days;
  1050. }
  1051. }
  1052. stats.sizeof_metric = struct_natural_alignment(sizeof(struct pg_cache_page_index) + sizeof(struct pg_alignment));
  1053. stats.sizeof_page = struct_natural_alignment(sizeof(struct rrdeng_page_descr));
  1054. stats.sizeof_datafile = struct_natural_alignment(sizeof(struct rrdengine_datafile)) + struct_natural_alignment(sizeof(struct rrdengine_journalfile));
  1055. stats.sizeof_page_in_cache = struct_natural_alignment(sizeof(struct page_cache_descr));
  1056. stats.sizeof_point_data = page_type_size[ctx->page_type];
  1057. stats.sizeof_page_data = RRDENG_BLOCK_SIZE;
  1058. stats.pages_per_extent = rrdeng_pages_per_extent;
  1059. stats.sizeof_extent = sizeof(struct extent_info);
  1060. stats.sizeof_page_in_extent = sizeof(struct rrdeng_page_descr *);
  1061. stats.sizeof_metric_in_index = 40;
  1062. stats.sizeof_page_in_index = 24;
  1063. stats.default_granularity_secs = (size_t)default_rrd_update_every * get_tier_grouping(ctx->tier);
  1064. return stats;
  1065. }