rrdengineapi.c 41 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737738739740741742743744745746747748749750751752753754755756757758759760761762763764765766767768769770771772773774775776777778779780781782783784785786787788789790791792793794795796797798799800801802803804805806807808809810811812813814815816817818819820821822823824825826827828829830831832833834835836837838839840841842843844845846847848849850851852853854855856857858859860861862863864865866867868869870871872873874875876877878879880881882883884885886887888889890891892893894895896897898899900901902903904905906907908909910911912913914915916917918919920921922923924925926927928929930931932933934935936937938939940941942943944945946947948949950951952953954955956957958959960961962963964965966967968969970971972973974975976977978979980981982983984985986987988989990991992993994995996997998999100010011002100310041005100610071008100910101011101210131014101510161017101810191020102110221023102410251026102710281029103010311032103310341035103610371038103910401041104210431044104510461047104810491050105110521053105410551056105710581059106010611062106310641065106610671068106910701071107210731074107510761077107810791080108110821083108410851086108710881089109010911092
  1. // SPDX-License-Identifier: GPL-3.0-or-later
  2. #include "rrdengine.h"
  3. /* Default global database instance */
  4. struct rrdengine_instance multidb_ctx_storage_tier0;
  5. struct rrdengine_instance multidb_ctx_storage_tier1;
  6. struct rrdengine_instance multidb_ctx_storage_tier2;
  7. struct rrdengine_instance multidb_ctx_storage_tier3;
  8. struct rrdengine_instance multidb_ctx_storage_tier4;
  9. #if RRD_STORAGE_TIERS != 5
  10. #error RRD_STORAGE_TIERS is not 5 - you need to add allocations here
  11. #endif
  12. struct rrdengine_instance *multidb_ctx[RRD_STORAGE_TIERS];
  13. uint8_t tier_page_type[RRD_STORAGE_TIERS] = {PAGE_METRICS, PAGE_TIER, PAGE_TIER, PAGE_TIER, PAGE_TIER};
  14. #if PAGE_TYPE_MAX != 1
  15. #error PAGE_TYPE_MAX is not 1 - you need to add allocations here
  16. #endif
  17. size_t page_type_size[256] = {sizeof(storage_number), sizeof(storage_number_tier1_t)};
  18. __attribute__((constructor)) void initialize_multidb_ctx(void) {
  19. multidb_ctx[0] = &multidb_ctx_storage_tier0;
  20. multidb_ctx[1] = &multidb_ctx_storage_tier1;
  21. multidb_ctx[2] = &multidb_ctx_storage_tier2;
  22. multidb_ctx[3] = &multidb_ctx_storage_tier3;
  23. multidb_ctx[4] = &multidb_ctx_storage_tier4;
  24. }
  25. int db_engine_use_malloc = 0;
  26. int default_rrdeng_page_fetch_timeout = 3;
  27. int default_rrdeng_page_fetch_retries = 3;
  28. int default_rrdeng_page_cache_mb = 32;
  29. int default_rrdeng_disk_quota_mb = 256;
  30. int default_multidb_disk_quota_mb = 256;
  31. /* Default behaviour is to unblock data collection if the page cache is full of dirty pages by dropping metrics */
  32. uint8_t rrdeng_drop_metrics_under_page_cache_pressure = 1;
  33. static inline struct rrdengine_instance *get_rrdeng_ctx_from_host(RRDHOST *host, int tier) {
  34. if(tier < 0 || tier >= RRD_STORAGE_TIERS) tier = 0;
  35. if(!host->storage_instance[tier]) tier = 0;
  36. return (struct rrdengine_instance *)host->storage_instance[tier];
  37. }
  38. /* This UUID is not unique across hosts */
  39. void rrdeng_generate_legacy_uuid(const char *dim_id, char *chart_id, uuid_t *ret_uuid)
  40. {
  41. EVP_MD_CTX *evpctx;
  42. unsigned char hash_value[EVP_MAX_MD_SIZE];
  43. unsigned int hash_len;
  44. evpctx = EVP_MD_CTX_create();
  45. EVP_DigestInit_ex(evpctx, EVP_sha256(), NULL);
  46. EVP_DigestUpdate(evpctx, dim_id, strlen(dim_id));
  47. EVP_DigestUpdate(evpctx, chart_id, strlen(chart_id));
  48. EVP_DigestFinal_ex(evpctx, hash_value, &hash_len);
  49. EVP_MD_CTX_destroy(evpctx);
  50. fatal_assert(hash_len > sizeof(uuid_t));
  51. memcpy(ret_uuid, hash_value, sizeof(uuid_t));
  52. }
  53. /* Transform legacy UUID to be unique across hosts deterministically */
  54. void rrdeng_convert_legacy_uuid_to_multihost(char machine_guid[GUID_LEN + 1], uuid_t *legacy_uuid, uuid_t *ret_uuid)
  55. {
  56. EVP_MD_CTX *evpctx;
  57. unsigned char hash_value[EVP_MAX_MD_SIZE];
  58. unsigned int hash_len;
  59. evpctx = EVP_MD_CTX_create();
  60. EVP_DigestInit_ex(evpctx, EVP_sha256(), NULL);
  61. EVP_DigestUpdate(evpctx, machine_guid, GUID_LEN);
  62. EVP_DigestUpdate(evpctx, *legacy_uuid, sizeof(uuid_t));
  63. EVP_DigestFinal_ex(evpctx, hash_value, &hash_len);
  64. EVP_MD_CTX_destroy(evpctx);
  65. fatal_assert(hash_len > sizeof(uuid_t));
  66. memcpy(ret_uuid, hash_value, sizeof(uuid_t));
  67. }
  68. struct rrdeng_metric_handle {
  69. RRDDIM *rd;
  70. struct rrdengine_instance *ctx;
  71. uuid_t *rrdeng_uuid; // database engine metric UUID
  72. struct pg_cache_page_index *page_index;
  73. };
  74. void rrdeng_metric_free(STORAGE_METRIC_HANDLE *db_metric_handle) {
  75. freez(db_metric_handle);
  76. }
  77. STORAGE_METRIC_HANDLE *rrdeng_metric_init(RRDDIM *rd, STORAGE_INSTANCE *db_instance) {
  78. struct rrdengine_instance *ctx = (struct rrdengine_instance *)db_instance;
  79. struct page_cache *pg_cache;
  80. uuid_t legacy_uuid;
  81. uuid_t multihost_legacy_uuid;
  82. Pvoid_t *PValue;
  83. struct pg_cache_page_index *page_index = NULL;
  84. int is_multihost_child = 0;
  85. RRDHOST *host = rd->rrdset->rrdhost;
  86. pg_cache = &ctx->pg_cache;
  87. rrdeng_generate_legacy_uuid(rd->id, rd->rrdset->id, &legacy_uuid);
  88. if (host != localhost && is_storage_engine_shared((STORAGE_INSTANCE *)ctx))
  89. is_multihost_child = 1;
  90. uv_rwlock_rdlock(&pg_cache->metrics_index.lock);
  91. PValue = JudyHSGet(pg_cache->metrics_index.JudyHS_array, &legacy_uuid, sizeof(uuid_t));
  92. if (likely(NULL != PValue)) {
  93. page_index = *PValue;
  94. }
  95. uv_rwlock_rdunlock(&pg_cache->metrics_index.lock);
  96. if (is_multihost_child || NULL == PValue) {
  97. /* First time we see the legacy UUID or metric belongs to child host in multi-host DB.
  98. * Drop legacy support, normal path */
  99. uv_rwlock_rdlock(&pg_cache->metrics_index.lock);
  100. PValue = JudyHSGet(pg_cache->metrics_index.JudyHS_array, &rd->metric_uuid, sizeof(uuid_t));
  101. if (likely(NULL != PValue)) {
  102. page_index = *PValue;
  103. }
  104. uv_rwlock_rdunlock(&pg_cache->metrics_index.lock);
  105. if (NULL == PValue) {
  106. uv_rwlock_wrlock(&pg_cache->metrics_index.lock);
  107. PValue = JudyHSIns(&pg_cache->metrics_index.JudyHS_array, &rd->metric_uuid, sizeof(uuid_t), PJE0);
  108. fatal_assert(NULL == *PValue); /* TODO: figure out concurrency model */
  109. *PValue = page_index = create_page_index(&rd->metric_uuid);
  110. page_index->prev = pg_cache->metrics_index.last_page_index;
  111. pg_cache->metrics_index.last_page_index = page_index;
  112. uv_rwlock_wrunlock(&pg_cache->metrics_index.lock);
  113. }
  114. } else {
  115. /* There are legacy UUIDs in the database, implement backward compatibility */
  116. rrdeng_convert_legacy_uuid_to_multihost(rd->rrdset->rrdhost->machine_guid, &legacy_uuid,
  117. &multihost_legacy_uuid);
  118. int need_to_store = uuid_compare(rd->metric_uuid, multihost_legacy_uuid);
  119. uuid_copy(rd->metric_uuid, multihost_legacy_uuid);
  120. if (unlikely(need_to_store && !ctx->tier))
  121. (void)sql_store_dimension(&rd->metric_uuid, rd->rrdset->chart_uuid, rd->id, rd->name, rd->multiplier, rd->divisor,
  122. rd->algorithm);
  123. }
  124. struct rrdeng_metric_handle *mh = mallocz(sizeof(struct rrdeng_metric_handle));
  125. mh->rd = rd;
  126. mh->ctx = ctx;
  127. mh->rrdeng_uuid = &page_index->id;
  128. mh->page_index = page_index;
  129. return (STORAGE_METRIC_HANDLE *)mh;
  130. }
  131. /*
  132. * Gets a handle for storing metrics to the database.
  133. * The handle must be released with rrdeng_store_metric_final().
  134. */
  135. STORAGE_COLLECT_HANDLE *rrdeng_store_metric_init(STORAGE_METRIC_HANDLE *db_metric_handle) {
  136. struct rrdeng_metric_handle *metric_handle = (struct rrdeng_metric_handle *)db_metric_handle;
  137. struct rrdeng_collect_handle *handle;
  138. struct pg_cache_page_index *page_index;
  139. handle = callocz(1, sizeof(struct rrdeng_collect_handle));
  140. handle->metric_handle = metric_handle;
  141. handle->ctx = metric_handle->ctx;
  142. handle->descr = NULL;
  143. handle->unaligned_page = 0;
  144. page_index = metric_handle->page_index;
  145. uv_rwlock_wrlock(&page_index->lock);
  146. ++page_index->writers;
  147. uv_rwlock_wrunlock(&page_index->lock);
  148. return (STORAGE_COLLECT_HANDLE *)handle;
  149. }
  150. /* The page must be populated and referenced */
  151. static int page_has_only_empty_metrics(struct rrdeng_page_descr *descr)
  152. {
  153. switch(descr->type) {
  154. case PAGE_METRICS: {
  155. size_t slots = descr->page_length / PAGE_POINT_SIZE_BYTES(descr);
  156. storage_number *array = (storage_number *)descr->pg_cache_descr->page;
  157. for (size_t i = 0 ; i < slots; ++i) {
  158. if(does_storage_number_exist(array[i]))
  159. return 0;
  160. }
  161. }
  162. break;
  163. case PAGE_TIER: {
  164. size_t slots = descr->page_length / PAGE_POINT_SIZE_BYTES(descr);
  165. storage_number_tier1_t *array = (storage_number_tier1_t *)descr->pg_cache_descr->page;
  166. for (size_t i = 0 ; i < slots; ++i) {
  167. if(fpclassify(array[i].sum_value) != FP_NAN)
  168. return 0;
  169. }
  170. }
  171. break;
  172. default: {
  173. static bool logged = false;
  174. if(!logged) {
  175. error("DBENGINE: cannot check page for nulls on unknown page type id %d", descr->type);
  176. logged = true;
  177. }
  178. return 0;
  179. }
  180. }
  181. return 1;
  182. }
  183. void rrdeng_store_metric_flush_current_page(STORAGE_COLLECT_HANDLE *collection_handle) {
  184. struct rrdeng_collect_handle *handle = (struct rrdeng_collect_handle *)collection_handle;
  185. // struct rrdeng_metric_handle *metric_handle = (struct rrdeng_metric_handle *)handle->metric_handle;
  186. struct rrdengine_instance *ctx = handle->ctx;
  187. struct rrdeng_page_descr *descr = handle->descr;
  188. if (unlikely(!ctx)) return;
  189. if (unlikely(!descr)) return;
  190. if (likely(descr->page_length)) {
  191. int page_is_empty;
  192. rrd_stat_atomic_add(&ctx->stats.metric_API_producers, -1);
  193. page_is_empty = page_has_only_empty_metrics(descr);
  194. if (page_is_empty) {
  195. debug(D_RRDENGINE, "Page has empty metrics only, deleting:");
  196. if (unlikely(debug_flags & D_RRDENGINE))
  197. print_page_cache_descr(descr);
  198. pg_cache_put(ctx, descr);
  199. pg_cache_punch_hole(ctx, descr, 1, 0, NULL);
  200. } else
  201. rrdeng_commit_page(ctx, descr, handle->page_correlation_id);
  202. } else {
  203. dbengine_page_free(descr->pg_cache_descr->page);
  204. rrdeng_destroy_pg_cache_descr(ctx, descr->pg_cache_descr);
  205. rrdeng_page_descr_freez(descr);
  206. }
  207. handle->descr = NULL;
  208. }
  209. void rrdeng_store_metric_next(STORAGE_COLLECT_HANDLE *collection_handle,
  210. usec_t point_in_time,
  211. NETDATA_DOUBLE n,
  212. NETDATA_DOUBLE min_value,
  213. NETDATA_DOUBLE max_value,
  214. uint16_t count,
  215. uint16_t anomaly_count,
  216. SN_FLAGS flags)
  217. {
  218. struct rrdeng_collect_handle *handle = (struct rrdeng_collect_handle *)collection_handle;
  219. struct rrdeng_metric_handle *metric_handle = (struct rrdeng_metric_handle *)handle->metric_handle;
  220. struct rrdengine_instance *ctx = handle->ctx;
  221. struct page_cache *pg_cache = &ctx->pg_cache;
  222. struct rrdeng_page_descr *descr = handle->descr;
  223. RRDDIM *rd = metric_handle->rd;
  224. void *page;
  225. uint8_t must_flush_unaligned_page = 0, perfect_page_alignment = 0;
  226. if (descr) {
  227. /* Make alignment decisions */
  228. if (descr->page_length == rd->rrdset->rrddim_page_alignment) {
  229. /* this is the leading dimension that defines chart alignment */
  230. perfect_page_alignment = 1;
  231. }
  232. /* is the metric far enough out of alignment with the others? */
  233. if (unlikely(descr->page_length + PAGE_POINT_SIZE_BYTES(descr) < rd->rrdset->rrddim_page_alignment)) {
  234. handle->unaligned_page = 1;
  235. debug(D_RRDENGINE, "Metric page is not aligned with chart:");
  236. if (unlikely(debug_flags & D_RRDENGINE))
  237. print_page_cache_descr(descr);
  238. }
  239. if (unlikely(handle->unaligned_page &&
  240. /* did the other metrics change page? */
  241. rd->rrdset->rrddim_page_alignment <= PAGE_POINT_SIZE_BYTES(descr))) {
  242. debug(D_RRDENGINE, "Flushing unaligned metric page.");
  243. must_flush_unaligned_page = 1;
  244. handle->unaligned_page = 0;
  245. }
  246. }
  247. if (unlikely(NULL == descr ||
  248. descr->page_length + PAGE_POINT_SIZE_BYTES(descr) > RRDENG_BLOCK_SIZE ||
  249. must_flush_unaligned_page)) {
  250. rrdeng_store_metric_flush_current_page(collection_handle);
  251. page = rrdeng_create_page(ctx, &metric_handle->page_index->id, &descr);
  252. fatal_assert(page);
  253. handle->descr = descr;
  254. handle->page_correlation_id = rrd_atomic_fetch_add(&pg_cache->committed_page_index.latest_corr_id, 1);
  255. if (0 == rd->rrdset->rrddim_page_alignment) {
  256. /* this is the leading dimension that defines chart alignment */
  257. perfect_page_alignment = 1;
  258. }
  259. }
  260. page = descr->pg_cache_descr->page;
  261. switch (descr->type) {
  262. case PAGE_METRICS: {
  263. ((storage_number *)page)[descr->page_length / PAGE_POINT_SIZE_BYTES(descr)] = pack_storage_number(n, flags);
  264. }
  265. break;
  266. case PAGE_TIER: {
  267. storage_number_tier1_t number_tier1;
  268. number_tier1.sum_value = (float)n;
  269. number_tier1.min_value = (float)min_value;
  270. number_tier1.max_value = (float)max_value;
  271. number_tier1.anomaly_count = anomaly_count;
  272. number_tier1.count = count;
  273. ((storage_number_tier1_t *)page)[descr->page_length / PAGE_POINT_SIZE_BYTES(descr)] = number_tier1;
  274. }
  275. break;
  276. default: {
  277. static bool logged = false;
  278. if(!logged) {
  279. error("DBENGINE: cannot store metric on unknown page type id %d", descr->type);
  280. logged = true;
  281. }
  282. }
  283. break;
  284. }
  285. pg_cache_atomic_set_pg_info(descr, point_in_time, descr->page_length + PAGE_POINT_SIZE_BYTES(descr));
  286. if (perfect_page_alignment)
  287. rd->rrdset->rrddim_page_alignment = descr->page_length;
  288. if (unlikely(INVALID_TIME == descr->start_time)) {
  289. unsigned long new_metric_API_producers, old_metric_API_max_producers, ret_metric_API_max_producers;
  290. descr->start_time = point_in_time;
  291. new_metric_API_producers = rrd_atomic_add_fetch(&ctx->stats.metric_API_producers, 1);
  292. while (unlikely(new_metric_API_producers > (old_metric_API_max_producers = ctx->metric_API_max_producers))) {
  293. /* Increase ctx->metric_API_max_producers */
  294. ret_metric_API_max_producers = ulong_compare_and_swap(&ctx->metric_API_max_producers,
  295. old_metric_API_max_producers,
  296. new_metric_API_producers);
  297. if (old_metric_API_max_producers == ret_metric_API_max_producers) {
  298. /* success */
  299. break;
  300. }
  301. }
  302. pg_cache_insert(ctx, metric_handle->page_index, descr);
  303. } else {
  304. pg_cache_add_new_metric_time(metric_handle->page_index, descr);
  305. }
  306. }
  307. /*
  308. * Releases the database reference from the handle for storing metrics.
  309. * Returns 1 if it's safe to delete the dimension.
  310. */
  311. int rrdeng_store_metric_finalize(STORAGE_COLLECT_HANDLE *collection_handle) {
  312. struct rrdeng_collect_handle *handle = (struct rrdeng_collect_handle *)collection_handle;
  313. struct rrdeng_metric_handle *metric_handle = (struct rrdeng_metric_handle *)handle->metric_handle;
  314. struct pg_cache_page_index *page_index = metric_handle->page_index;
  315. uint8_t can_delete_metric = 0;
  316. rrdeng_store_metric_flush_current_page(collection_handle);
  317. uv_rwlock_wrlock(&page_index->lock);
  318. if (!--page_index->writers && !page_index->page_count) {
  319. can_delete_metric = 1;
  320. }
  321. uv_rwlock_wrunlock(&page_index->lock);
  322. freez(handle);
  323. return can_delete_metric;
  324. }
  325. //static inline uint32_t *pginfo_to_dt(struct rrdeng_page_info *page_info)
  326. //{
  327. // return (uint32_t *)&page_info->scratch[0];
  328. //}
  329. //
  330. //static inline uint32_t *pginfo_to_points(struct rrdeng_page_info *page_info)
  331. //{
  332. // return (uint32_t *)&page_info->scratch[sizeof(uint32_t)];
  333. //}
  334. //
  335. /*
  336. * Gets a handle for loading metrics from the database.
  337. * The handle must be released with rrdeng_load_metric_final().
  338. */
  339. void rrdeng_load_metric_init(STORAGE_METRIC_HANDLE *db_metric_handle, struct rrddim_query_handle *rrdimm_handle, time_t start_time, time_t end_time, TIER_QUERY_FETCH tier_query_fetch_type)
  340. {
  341. struct rrdeng_metric_handle *metric_handle = (struct rrdeng_metric_handle *)db_metric_handle;
  342. struct rrdengine_instance *ctx = metric_handle->ctx;
  343. RRDDIM *rd = metric_handle->rd;
  344. // fprintf(stderr, "%s: %s/%s start time %ld, end time %ld\n", __FUNCTION__ , rd->rrdset->name, rd->name, start_time, end_time);
  345. struct rrdeng_query_handle *handle;
  346. unsigned pages_nr;
  347. rrdimm_handle->start_time = start_time;
  348. rrdimm_handle->end_time = end_time;
  349. handle = callocz(1, sizeof(struct rrdeng_query_handle));
  350. handle->next_page_time = start_time;
  351. handle->now = start_time;
  352. handle->tier_query_fetch_type = tier_query_fetch_type;
  353. // TODO we should store the dt of each page in each page
  354. // this will produce wrong values for dt in case the user changes
  355. // the update every of the charts or the tier grouping iterations
  356. handle->dt_sec = get_tier_grouping(ctx->tier) * (time_t)rd->update_every;
  357. handle->dt = handle->dt_sec * USEC_PER_SEC;
  358. handle->position = 0;
  359. handle->ctx = ctx;
  360. handle->metric_handle = metric_handle;
  361. handle->descr = NULL;
  362. rrdimm_handle->handle = (STORAGE_QUERY_HANDLE *)handle;
  363. pages_nr = pg_cache_preload(ctx, metric_handle->rrdeng_uuid, start_time * USEC_PER_SEC, end_time * USEC_PER_SEC,
  364. NULL, &handle->page_index);
  365. if (unlikely(NULL == handle->page_index || 0 == pages_nr))
  366. // there are no metrics to load
  367. handle->next_page_time = INVALID_TIME;
  368. }
  369. static int rrdeng_load_page_next(struct rrddim_query_handle *rrdimm_handle) {
  370. struct rrdeng_query_handle *handle = (struct rrdeng_query_handle *)rrdimm_handle->handle;
  371. struct rrdengine_instance *ctx = handle->ctx;
  372. struct rrdeng_page_descr *descr = handle->descr;
  373. uint32_t page_length;
  374. usec_t page_end_time;
  375. unsigned position;
  376. if (likely(descr)) {
  377. // Drop old page's reference
  378. #ifdef NETDATA_INTERNAL_CHECKS
  379. rrd_stat_atomic_add(&ctx->stats.metric_API_consumers, -1);
  380. #endif
  381. pg_cache_put(ctx, descr);
  382. handle->descr = NULL;
  383. handle->next_page_time = (handle->page_end_time / USEC_PER_SEC) + 1;
  384. if (unlikely(handle->next_page_time > rrdimm_handle->end_time))
  385. return 1;
  386. }
  387. usec_t next_page_time = handle->next_page_time * USEC_PER_SEC;
  388. descr = pg_cache_lookup_next(ctx, handle->page_index, &handle->page_index->id, next_page_time, rrdimm_handle->end_time * USEC_PER_SEC);
  389. if (NULL == descr)
  390. return 1;
  391. #ifdef NETDATA_INTERNAL_CHECKS
  392. rrd_stat_atomic_add(&ctx->stats.metric_API_consumers, 1);
  393. #endif
  394. handle->descr = descr;
  395. pg_cache_atomic_get_pg_info(descr, &page_end_time, &page_length);
  396. if (unlikely(INVALID_TIME == descr->start_time || INVALID_TIME == page_end_time))
  397. return 1;
  398. if (unlikely(descr->start_time != page_end_time && next_page_time > descr->start_time)) {
  399. // we're in the middle of the page somewhere
  400. unsigned entries = page_length / PAGE_POINT_SIZE_BYTES(descr);
  401. position = ((uint64_t)(next_page_time - descr->start_time)) * (entries - 1) /
  402. (page_end_time - descr->start_time);
  403. }
  404. else
  405. position = 0;
  406. handle->page_end_time = page_end_time;
  407. handle->page_length = page_length;
  408. handle->page = descr->pg_cache_descr->page;
  409. usec_t entries = handle->entries = page_length / PAGE_POINT_SIZE_BYTES(descr);
  410. if (likely(entries > 1))
  411. handle->dt = (page_end_time - descr->start_time) / (entries - 1);
  412. else {
  413. // TODO we should store the dt of each page in each page
  414. // now we keep the dt of whatever was before
  415. ;
  416. }
  417. handle->dt_sec = (time_t)(handle->dt / USEC_PER_SEC);
  418. handle->position = position;
  419. return 0;
  420. }
  421. // Returns the metric and sets its timestamp into current_time
  422. // IT IS REQUIRED TO **ALWAYS** SET ALL RETURN VALUES (current_time, end_time, flags)
  423. // IT IS REQUIRED TO **ALWAYS** KEEP TRACK OF TIME, EVEN OUTSIDE THE DATABASE BOUNDARIES
  424. STORAGE_POINT rrdeng_load_metric_next(struct rrddim_query_handle *rrdimm_handle) {
  425. struct rrdeng_query_handle *handle = (struct rrdeng_query_handle *)rrdimm_handle->handle;
  426. // struct rrdeng_metric_handle *metric_handle = handle->metric_handle;
  427. STORAGE_POINT sp;
  428. struct rrdeng_page_descr *descr = handle->descr;
  429. unsigned position = handle->position + 1;
  430. time_t now = handle->now + handle->dt_sec;
  431. storage_number_tier1_t tier1_value;
  432. if (unlikely(INVALID_TIME == handle->next_page_time)) {
  433. handle->next_page_time = INVALID_TIME;
  434. handle->now = now;
  435. storage_point_empty(sp, now - handle->dt_sec, now);
  436. return sp;
  437. }
  438. if (unlikely(!descr || position >= handle->entries)) {
  439. // We need to get a new page
  440. if(rrdeng_load_page_next(rrdimm_handle)) {
  441. // next calls will not load any more metrics
  442. handle->next_page_time = INVALID_TIME;
  443. handle->now = now;
  444. storage_point_empty(sp, now - handle->dt_sec, now);
  445. return sp;
  446. }
  447. descr = handle->descr;
  448. position = handle->position;
  449. now = (time_t)((descr->start_time + position * handle->dt) / USEC_PER_SEC);
  450. }
  451. sp.start_time = now - handle->dt_sec;
  452. sp.end_time = now;
  453. handle->position = position;
  454. handle->now = now;
  455. switch(descr->type) {
  456. case PAGE_METRICS: {
  457. storage_number n = handle->page[position];
  458. sp.min = sp.max = sp.sum = unpack_storage_number(n);
  459. sp.flags = n & SN_USER_FLAGS;
  460. sp.count = 1;
  461. sp.anomaly_count = is_storage_number_anomalous(n) ? 1 : 0;
  462. }
  463. break;
  464. case PAGE_TIER: {
  465. tier1_value = ((storage_number_tier1_t *)handle->page)[position];
  466. sp.flags = tier1_value.anomaly_count ? SN_FLAG_NONE : SN_FLAG_NOT_ANOMALOUS;
  467. sp.count = tier1_value.count;
  468. sp.anomaly_count = tier1_value.anomaly_count;
  469. sp.min = tier1_value.min_value;
  470. sp.max = tier1_value.max_value;
  471. sp.sum = tier1_value.sum_value;
  472. }
  473. break;
  474. // we don't know this page type
  475. default: {
  476. static bool logged = false;
  477. if(!logged) {
  478. error("DBENGINE: unknown page type %d found. Cannot decode it. Ignoring its metrics.", descr->type);
  479. logged = true;
  480. }
  481. storage_point_empty(sp, sp.start_time, sp.end_time);
  482. }
  483. break;
  484. }
  485. if (unlikely(now >= rrdimm_handle->end_time)) {
  486. // next calls will not load any more metrics
  487. handle->next_page_time = INVALID_TIME;
  488. }
  489. return sp;
  490. }
  491. int rrdeng_load_metric_is_finished(struct rrddim_query_handle *rrdimm_handle)
  492. {
  493. struct rrdeng_query_handle *handle = (struct rrdeng_query_handle *)rrdimm_handle->handle;
  494. return (INVALID_TIME == handle->next_page_time);
  495. }
  496. /*
  497. * Releases the database reference from the handle for loading metrics.
  498. */
  499. void rrdeng_load_metric_finalize(struct rrddim_query_handle *rrdimm_handle)
  500. {
  501. struct rrdeng_query_handle *handle = (struct rrdeng_query_handle *)rrdimm_handle->handle;
  502. struct rrdengine_instance *ctx = handle->ctx;
  503. struct rrdeng_page_descr *descr = handle->descr;
  504. if (descr) {
  505. #ifdef NETDATA_INTERNAL_CHECKS
  506. rrd_stat_atomic_add(&ctx->stats.metric_API_consumers, -1);
  507. #endif
  508. pg_cache_put(ctx, descr);
  509. }
  510. // whatever is allocated at rrdeng_load_metric_init() should be freed here
  511. freez(handle);
  512. rrdimm_handle->handle = NULL;
  513. }
  514. time_t rrdeng_metric_latest_time(STORAGE_METRIC_HANDLE *db_metric_handle) {
  515. struct rrdeng_metric_handle *metric_handle = (struct rrdeng_metric_handle *)db_metric_handle;
  516. struct pg_cache_page_index *page_index = metric_handle->page_index;
  517. return page_index->latest_time / USEC_PER_SEC;
  518. }
  519. time_t rrdeng_metric_oldest_time(STORAGE_METRIC_HANDLE *db_metric_handle) {
  520. struct rrdeng_metric_handle *metric_handle = (struct rrdeng_metric_handle *)db_metric_handle;
  521. struct pg_cache_page_index *page_index = metric_handle->page_index;
  522. return page_index->oldest_time / USEC_PER_SEC;
  523. }
  524. int rrdeng_metric_latest_time_by_uuid(uuid_t *dim_uuid, time_t *first_entry_t, time_t *last_entry_t, int tier)
  525. {
  526. struct page_cache *pg_cache;
  527. struct rrdengine_instance *ctx;
  528. Pvoid_t *PValue;
  529. struct pg_cache_page_index *page_index = NULL;
  530. ctx = get_rrdeng_ctx_from_host(localhost, tier);
  531. if (unlikely(!ctx)) {
  532. error("Failed to fetch multidb context");
  533. return 1;
  534. }
  535. pg_cache = &ctx->pg_cache;
  536. uv_rwlock_rdlock(&pg_cache->metrics_index.lock);
  537. PValue = JudyHSGet(pg_cache->metrics_index.JudyHS_array, dim_uuid, sizeof(uuid_t));
  538. if (likely(NULL != PValue)) {
  539. page_index = *PValue;
  540. }
  541. uv_rwlock_rdunlock(&pg_cache->metrics_index.lock);
  542. if (likely(page_index)) {
  543. *first_entry_t = page_index->oldest_time / USEC_PER_SEC;
  544. *last_entry_t = page_index->latest_time / USEC_PER_SEC;
  545. return 0;
  546. }
  547. return 1;
  548. }
  549. int rrdeng_metric_retention_by_uuid(STORAGE_INSTANCE *si, uuid_t *dim_uuid, time_t *first_entry_t, time_t *last_entry_t)
  550. {
  551. struct page_cache *pg_cache;
  552. struct rrdengine_instance *ctx;
  553. Pvoid_t *PValue;
  554. struct pg_cache_page_index *page_index = NULL;
  555. ctx = (struct rrdengine_instance *)si;
  556. if (unlikely(!ctx)) {
  557. error("DBENGINE: invalid STORAGE INSTANCE to %s()", __FUNCTION__);
  558. return 1;
  559. }
  560. pg_cache = &ctx->pg_cache;
  561. uv_rwlock_rdlock(&pg_cache->metrics_index.lock);
  562. PValue = JudyHSGet(pg_cache->metrics_index.JudyHS_array, dim_uuid, sizeof(uuid_t));
  563. if (likely(NULL != PValue)) {
  564. page_index = *PValue;
  565. }
  566. uv_rwlock_rdunlock(&pg_cache->metrics_index.lock);
  567. if (likely(page_index)) {
  568. *first_entry_t = page_index->oldest_time / USEC_PER_SEC;
  569. *last_entry_t = page_index->latest_time / USEC_PER_SEC;
  570. return 0;
  571. }
  572. return 1;
  573. }
  574. /* Also gets a reference for the page */
  575. void *rrdeng_create_page(struct rrdengine_instance *ctx, uuid_t *id, struct rrdeng_page_descr **ret_descr)
  576. {
  577. struct rrdeng_page_descr *descr;
  578. struct page_cache_descr *pg_cache_descr;
  579. void *page;
  580. /* TODO: check maximum number of pages in page cache limit */
  581. descr = pg_cache_create_descr();
  582. descr->id = id; /* TODO: add page type: metric, log, something? */
  583. descr->type = ctx->page_type;
  584. page = dbengine_page_alloc(); /*TODO: add page size */
  585. rrdeng_page_descr_mutex_lock(ctx, descr);
  586. pg_cache_descr = descr->pg_cache_descr;
  587. pg_cache_descr->page = page;
  588. pg_cache_descr->flags = RRD_PAGE_DIRTY /*| RRD_PAGE_LOCKED */ | RRD_PAGE_POPULATED /* | BEING_COLLECTED */;
  589. pg_cache_descr->refcnt = 1;
  590. debug(D_RRDENGINE, "Created new page:");
  591. if (unlikely(debug_flags & D_RRDENGINE))
  592. print_page_cache_descr(descr);
  593. rrdeng_page_descr_mutex_unlock(ctx, descr);
  594. *ret_descr = descr;
  595. return page;
  596. }
  597. /* The page must not be empty */
  598. void rrdeng_commit_page(struct rrdengine_instance *ctx, struct rrdeng_page_descr *descr,
  599. Word_t page_correlation_id)
  600. {
  601. struct page_cache *pg_cache = &ctx->pg_cache;
  602. Pvoid_t *PValue;
  603. unsigned nr_committed_pages;
  604. if (unlikely(NULL == descr)) {
  605. debug(D_RRDENGINE, "%s: page descriptor is NULL, page has already been force-committed.", __func__);
  606. return;
  607. }
  608. fatal_assert(descr->page_length);
  609. uv_rwlock_wrlock(&pg_cache->committed_page_index.lock);
  610. PValue = JudyLIns(&pg_cache->committed_page_index.JudyL_array, page_correlation_id, PJE0);
  611. *PValue = descr;
  612. nr_committed_pages = ++pg_cache->committed_page_index.nr_committed_pages;
  613. uv_rwlock_wrunlock(&pg_cache->committed_page_index.lock);
  614. if (nr_committed_pages >= pg_cache_hard_limit(ctx) / 2) {
  615. /* over 50% of pages have not been committed yet */
  616. if (ctx->drop_metrics_under_page_cache_pressure &&
  617. nr_committed_pages >= pg_cache_committed_hard_limit(ctx)) {
  618. /* 100% of pages are dirty */
  619. struct rrdeng_cmd cmd;
  620. cmd.opcode = RRDENG_INVALIDATE_OLDEST_MEMORY_PAGE;
  621. rrdeng_enq_cmd(&ctx->worker_config, &cmd);
  622. } else {
  623. if (0 == (unsigned long) ctx->stats.pg_cache_over_half_dirty_events) {
  624. /* only print the first time */
  625. errno = 0;
  626. error("Failed to flush dirty buffers quickly enough in dbengine instance \"%s\". "
  627. "Metric data at risk of not being stored in the database, "
  628. "please reduce disk load or use a faster disk.", ctx->dbfiles_path);
  629. }
  630. rrd_stat_atomic_add(&ctx->stats.pg_cache_over_half_dirty_events, 1);
  631. rrd_stat_atomic_add(&global_pg_cache_over_half_dirty_events, 1);
  632. }
  633. }
  634. pg_cache_put(ctx, descr);
  635. }
  636. /* Gets a reference for the page */
  637. void *rrdeng_get_latest_page(struct rrdengine_instance *ctx, uuid_t *id, void **handle)
  638. {
  639. struct rrdeng_page_descr *descr;
  640. struct page_cache_descr *pg_cache_descr;
  641. debug(D_RRDENGINE, "Reading existing page:");
  642. descr = pg_cache_lookup(ctx, NULL, id, INVALID_TIME);
  643. if (NULL == descr) {
  644. *handle = NULL;
  645. return NULL;
  646. }
  647. *handle = descr;
  648. pg_cache_descr = descr->pg_cache_descr;
  649. return pg_cache_descr->page;
  650. }
  651. /* Gets a reference for the page */
  652. void *rrdeng_get_page(struct rrdengine_instance *ctx, uuid_t *id, usec_t point_in_time, void **handle)
  653. {
  654. struct rrdeng_page_descr *descr;
  655. struct page_cache_descr *pg_cache_descr;
  656. debug(D_RRDENGINE, "Reading existing page:");
  657. descr = pg_cache_lookup(ctx, NULL, id, point_in_time);
  658. if (NULL == descr) {
  659. *handle = NULL;
  660. return NULL;
  661. }
  662. *handle = descr;
  663. pg_cache_descr = descr->pg_cache_descr;
  664. return pg_cache_descr->page;
  665. }
  666. /*
  667. * Gathers Database Engine statistics.
  668. * Careful when modifying this function.
  669. * You must not change the indices of the statistics or user code will break.
  670. * You must not exceed RRDENG_NR_STATS or it will crash.
  671. */
  672. void rrdeng_get_37_statistics(struct rrdengine_instance *ctx, unsigned long long *array)
  673. {
  674. if (ctx == NULL)
  675. return;
  676. struct page_cache *pg_cache = &ctx->pg_cache;
  677. array[0] = (uint64_t)ctx->stats.metric_API_producers;
  678. array[1] = (uint64_t)ctx->stats.metric_API_consumers;
  679. array[2] = (uint64_t)pg_cache->page_descriptors;
  680. array[3] = (uint64_t)pg_cache->populated_pages;
  681. array[4] = (uint64_t)pg_cache->committed_page_index.nr_committed_pages;
  682. array[5] = (uint64_t)ctx->stats.pg_cache_insertions;
  683. array[6] = (uint64_t)ctx->stats.pg_cache_deletions;
  684. array[7] = (uint64_t)ctx->stats.pg_cache_hits;
  685. array[8] = (uint64_t)ctx->stats.pg_cache_misses;
  686. array[9] = (uint64_t)ctx->stats.pg_cache_backfills;
  687. array[10] = (uint64_t)ctx->stats.pg_cache_evictions;
  688. array[11] = (uint64_t)ctx->stats.before_compress_bytes;
  689. array[12] = (uint64_t)ctx->stats.after_compress_bytes;
  690. array[13] = (uint64_t)ctx->stats.before_decompress_bytes;
  691. array[14] = (uint64_t)ctx->stats.after_decompress_bytes;
  692. array[15] = (uint64_t)ctx->stats.io_write_bytes;
  693. array[16] = (uint64_t)ctx->stats.io_write_requests;
  694. array[17] = (uint64_t)ctx->stats.io_read_bytes;
  695. array[18] = (uint64_t)ctx->stats.io_read_requests;
  696. array[19] = (uint64_t)ctx->stats.io_write_extent_bytes;
  697. array[20] = (uint64_t)ctx->stats.io_write_extents;
  698. array[21] = (uint64_t)ctx->stats.io_read_extent_bytes;
  699. array[22] = (uint64_t)ctx->stats.io_read_extents;
  700. array[23] = (uint64_t)ctx->stats.datafile_creations;
  701. array[24] = (uint64_t)ctx->stats.datafile_deletions;
  702. array[25] = (uint64_t)ctx->stats.journalfile_creations;
  703. array[26] = (uint64_t)ctx->stats.journalfile_deletions;
  704. array[27] = (uint64_t)ctx->stats.page_cache_descriptors;
  705. array[28] = (uint64_t)ctx->stats.io_errors;
  706. array[29] = (uint64_t)ctx->stats.fs_errors;
  707. array[30] = (uint64_t)global_io_errors;
  708. array[31] = (uint64_t)global_fs_errors;
  709. array[32] = (uint64_t)rrdeng_reserved_file_descriptors;
  710. array[33] = (uint64_t)ctx->stats.pg_cache_over_half_dirty_events;
  711. array[34] = (uint64_t)global_pg_cache_over_half_dirty_events;
  712. array[35] = (uint64_t)ctx->stats.flushing_pressure_page_deletions;
  713. array[36] = (uint64_t)global_flushing_pressure_page_deletions;
  714. fatal_assert(RRDENG_NR_STATS == 37);
  715. }
  716. /* Releases reference to page */
  717. void rrdeng_put_page(struct rrdengine_instance *ctx, void *handle)
  718. {
  719. (void)ctx;
  720. pg_cache_put(ctx, (struct rrdeng_page_descr *)handle);
  721. }
  722. /*
  723. * Returns 0 on success, negative on error
  724. */
  725. int rrdeng_init(RRDHOST *host, struct rrdengine_instance **ctxp, char *dbfiles_path, unsigned page_cache_mb,
  726. unsigned disk_space_mb, int tier) {
  727. struct rrdengine_instance *ctx;
  728. int error;
  729. uint32_t max_open_files;
  730. max_open_files = rlimit_nofile.rlim_cur / 4;
  731. /* reserve RRDENG_FD_BUDGET_PER_INSTANCE file descriptors for this instance */
  732. rrd_stat_atomic_add(&rrdeng_reserved_file_descriptors, RRDENG_FD_BUDGET_PER_INSTANCE);
  733. if (rrdeng_reserved_file_descriptors > max_open_files) {
  734. error(
  735. "Exceeded the budget of available file descriptors (%u/%u), cannot create new dbengine instance.",
  736. (unsigned)rrdeng_reserved_file_descriptors,
  737. (unsigned)max_open_files);
  738. rrd_stat_atomic_add(&global_fs_errors, 1);
  739. rrd_stat_atomic_add(&rrdeng_reserved_file_descriptors, -RRDENG_FD_BUDGET_PER_INSTANCE);
  740. return UV_EMFILE;
  741. }
  742. if(NULL == ctxp) {
  743. ctx = multidb_ctx[tier];
  744. memset(ctx, 0, sizeof(*ctx));
  745. }
  746. else {
  747. *ctxp = ctx = callocz(1, sizeof(*ctx));
  748. }
  749. ctx->tier = tier;
  750. ctx->page_type = tier_page_type[tier];
  751. ctx->global_compress_alg = RRD_LZ4;
  752. if (page_cache_mb < RRDENG_MIN_PAGE_CACHE_SIZE_MB)
  753. page_cache_mb = RRDENG_MIN_PAGE_CACHE_SIZE_MB;
  754. ctx->max_cache_pages = page_cache_mb * (1048576LU / RRDENG_BLOCK_SIZE);
  755. /* try to keep 5% of the page cache free */
  756. ctx->cache_pages_low_watermark = (ctx->max_cache_pages * 95LLU) / 100;
  757. if (disk_space_mb < RRDENG_MIN_DISK_SPACE_MB)
  758. disk_space_mb = RRDENG_MIN_DISK_SPACE_MB;
  759. ctx->max_disk_space = disk_space_mb * 1048576LLU;
  760. strncpyz(ctx->dbfiles_path, dbfiles_path, sizeof(ctx->dbfiles_path) - 1);
  761. ctx->dbfiles_path[sizeof(ctx->dbfiles_path) - 1] = '\0';
  762. if (NULL == host)
  763. strncpyz(ctx->machine_guid, registry_get_this_machine_guid(), GUID_LEN);
  764. else
  765. strncpyz(ctx->machine_guid, host->machine_guid, GUID_LEN);
  766. ctx->drop_metrics_under_page_cache_pressure = rrdeng_drop_metrics_under_page_cache_pressure;
  767. ctx->metric_API_max_producers = 0;
  768. ctx->quiesce = NO_QUIESCE;
  769. ctx->metalog_ctx = NULL; /* only set this after the metadata log has finished initializing */
  770. ctx->host = host;
  771. memset(&ctx->worker_config, 0, sizeof(ctx->worker_config));
  772. ctx->worker_config.ctx = ctx;
  773. init_page_cache(ctx);
  774. init_commit_log(ctx);
  775. error = init_rrd_files(ctx);
  776. if (error) {
  777. goto error_after_init_rrd_files;
  778. }
  779. completion_init(&ctx->rrdengine_completion);
  780. fatal_assert(0 == uv_thread_create(&ctx->worker_config.thread, rrdeng_worker, &ctx->worker_config));
  781. /* wait for worker thread to initialize */
  782. completion_wait_for(&ctx->rrdengine_completion);
  783. completion_destroy(&ctx->rrdengine_completion);
  784. uv_thread_set_name_np(ctx->worker_config.thread, "LIBUV_WORKER");
  785. if (ctx->worker_config.error) {
  786. goto error_after_rrdeng_worker;
  787. }
  788. error = metalog_init(ctx);
  789. if (error) {
  790. error("Failed to initialize metadata log file event loop.");
  791. goto error_after_rrdeng_worker;
  792. }
  793. return 0;
  794. error_after_rrdeng_worker:
  795. finalize_rrd_files(ctx);
  796. error_after_init_rrd_files:
  797. free_page_cache(ctx);
  798. if (!is_storage_engine_shared((STORAGE_INSTANCE *)ctx)) {
  799. freez(ctx);
  800. if (ctxp)
  801. *ctxp = NULL;
  802. }
  803. rrd_stat_atomic_add(&rrdeng_reserved_file_descriptors, -RRDENG_FD_BUDGET_PER_INSTANCE);
  804. return UV_EIO;
  805. }
  806. /*
  807. * Returns 0 on success, 1 on error
  808. */
  809. int rrdeng_exit(struct rrdengine_instance *ctx)
  810. {
  811. struct rrdeng_cmd cmd;
  812. if (NULL == ctx) {
  813. return 1;
  814. }
  815. /* TODO: add page to page cache */
  816. cmd.opcode = RRDENG_SHUTDOWN;
  817. rrdeng_enq_cmd(&ctx->worker_config, &cmd);
  818. fatal_assert(0 == uv_thread_join(&ctx->worker_config.thread));
  819. finalize_rrd_files(ctx);
  820. //metalog_exit(ctx->metalog_ctx);
  821. free_page_cache(ctx);
  822. if(!is_storage_engine_shared((STORAGE_INSTANCE *)ctx))
  823. freez(ctx);
  824. rrd_stat_atomic_add(&rrdeng_reserved_file_descriptors, -RRDENG_FD_BUDGET_PER_INSTANCE);
  825. return 0;
  826. }
  827. void rrdeng_prepare_exit(struct rrdengine_instance *ctx)
  828. {
  829. struct rrdeng_cmd cmd;
  830. if (NULL == ctx) {
  831. return;
  832. }
  833. completion_init(&ctx->rrdengine_completion);
  834. cmd.opcode = RRDENG_QUIESCE;
  835. rrdeng_enq_cmd(&ctx->worker_config, &cmd);
  836. /* wait for dbengine to quiesce */
  837. completion_wait_for(&ctx->rrdengine_completion);
  838. completion_destroy(&ctx->rrdengine_completion);
  839. //metalog_prepare_exit(ctx->metalog_ctx);
  840. }
  841. RRDENG_SIZE_STATS rrdeng_size_statistics(struct rrdengine_instance *ctx) {
  842. RRDENG_SIZE_STATS stats = { 0 };
  843. for(struct pg_cache_page_index *page_index = ctx->pg_cache.metrics_index.last_page_index;
  844. page_index != NULL ;page_index = page_index->prev) {
  845. stats.metrics++;
  846. stats.metrics_pages += page_index->page_count;
  847. }
  848. for(struct rrdengine_datafile *df = ctx->datafiles.first; df ;df = df->next) {
  849. stats.datafiles++;
  850. for(struct extent_info *ei = df->extents.first; ei ; ei = ei->next) {
  851. stats.extents++;
  852. stats.extents_compressed_bytes += ei->size;
  853. for(int p = 0; p < ei->number_of_pages ;p++) {
  854. struct rrdeng_page_descr *descr = ei->pages[p];
  855. usec_t update_every_usec;
  856. size_t points = descr->page_length / PAGE_POINT_SIZE_BYTES(descr);
  857. if(likely(points > 1))
  858. update_every_usec = (descr->end_time - descr->start_time) / (points - 1);
  859. else {
  860. update_every_usec = default_rrd_update_every * get_tier_grouping(ctx->tier) * USEC_PER_SEC;
  861. stats.single_point_pages++;
  862. }
  863. time_t duration_secs = (time_t)((descr->end_time - descr->start_time + update_every_usec)/USEC_PER_SEC);
  864. stats.extents_pages++;
  865. stats.pages_uncompressed_bytes += descr->page_length;
  866. stats.pages_duration_secs += duration_secs;
  867. stats.points += points;
  868. stats.page_types[descr->type].pages++;
  869. stats.page_types[descr->type].pages_uncompressed_bytes += descr->page_length;
  870. stats.page_types[descr->type].pages_duration_secs += duration_secs;
  871. stats.page_types[descr->type].points += points;
  872. if(!stats.first_t || (descr->start_time - update_every_usec) < stats.first_t)
  873. stats.first_t = (descr->start_time - update_every_usec) / USEC_PER_SEC;
  874. if(!stats.last_t || descr->end_time > stats.last_t)
  875. stats.last_t = descr->end_time / USEC_PER_SEC;
  876. }
  877. }
  878. }
  879. stats.currently_collected_metrics = ctx->stats.metric_API_producers;
  880. stats.max_concurrently_collected_metrics = ctx->metric_API_max_producers;
  881. internal_error(stats.metrics_pages != stats.extents_pages + stats.currently_collected_metrics,
  882. "DBENGINE: metrics pages is %zu, but extents pages is %zu and API consumers is %zu",
  883. stats.metrics_pages, stats.extents_pages, stats.currently_collected_metrics);
  884. stats.disk_space = ctx->disk_space;
  885. stats.max_disk_space = ctx->max_disk_space;
  886. stats.database_retention_secs = (time_t)(stats.last_t - stats.first_t);
  887. if(stats.extents_pages)
  888. stats.average_page_size_bytes = (double)stats.pages_uncompressed_bytes / (double)stats.extents_pages;
  889. if(stats.pages_uncompressed_bytes > 0)
  890. stats.average_compression_savings = 100.0 - ((double)stats.extents_compressed_bytes * 100.0 / (double)stats.pages_uncompressed_bytes);
  891. if(stats.points)
  892. stats.average_point_duration_secs = (double)stats.pages_duration_secs / (double)stats.points;
  893. if(stats.metrics) {
  894. stats.average_metric_retention_secs = (double)stats.pages_duration_secs / (double)stats.metrics;
  895. if(stats.database_retention_secs) {
  896. double metric_coverage = stats.average_metric_retention_secs / (double)stats.database_retention_secs;
  897. double db_retention_days = (double)stats.database_retention_secs / 86400.0;
  898. stats.estimated_concurrently_collected_metrics = stats.metrics * metric_coverage;
  899. stats.ephemeral_metrics_per_day_percent = ((double)stats.metrics * 100.0 / (double)stats.estimated_concurrently_collected_metrics - 100.0) / (double)db_retention_days;
  900. }
  901. }
  902. stats.sizeof_metric = struct_natural_alignment(sizeof(struct pg_cache_page_index));
  903. stats.sizeof_page = struct_natural_alignment(sizeof(struct rrdeng_page_descr));
  904. stats.sizeof_datafile = struct_natural_alignment(sizeof(struct rrdengine_datafile)) + struct_natural_alignment(sizeof(struct rrdengine_journalfile));
  905. stats.sizeof_page_in_cache = struct_natural_alignment(sizeof(struct page_cache_descr));
  906. stats.sizeof_point_data = page_type_size[ctx->page_type];
  907. stats.sizeof_page_data = RRDENG_BLOCK_SIZE;
  908. stats.pages_per_extent = rrdeng_pages_per_extent;
  909. stats.sizeof_extent = sizeof(struct extent_info);
  910. stats.sizeof_page_in_extent = sizeof(struct rrdeng_page_descr *);
  911. stats.sizeof_metric_in_index = 40;
  912. stats.sizeof_page_in_index = 24;
  913. stats.default_granularity_secs = (size_t)default_rrd_update_every * get_tier_grouping(ctx->tier);
  914. return stats;
  915. }