rrddim_mem.c 17 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437
  1. // SPDX-License-Identifier: GPL-3.0-or-later
  2. #include "rrddim_mem.h"
  3. #include "Judy.h"
  4. static Pvoid_t rrddim_JudyHS_array = NULL;
  5. static netdata_rwlock_t rrddim_JudyHS_rwlock = NETDATA_RWLOCK_INITIALIZER;
  6. // ----------------------------------------------------------------------------
  7. // metrics groups
  8. STORAGE_METRICS_GROUP *rrddim_metrics_group_get(STORAGE_INSTANCE *db_instance __maybe_unused, uuid_t *uuid __maybe_unused) {
  9. return NULL;
  10. }
  11. void rrddim_metrics_group_release(STORAGE_INSTANCE *db_instance __maybe_unused, STORAGE_METRICS_GROUP *smg __maybe_unused) {
  12. // if(!smg) return; // smg may be NULL
  13. ;
  14. }
  15. // ----------------------------------------------------------------------------
  16. // RRDDIM legacy data collection functions
  17. struct mem_metric_handle {
  18. RRDDIM *rd;
  19. size_t counter;
  20. size_t entries;
  21. size_t current_entry;
  22. time_t last_updated_s;
  23. time_t update_every_s;
  24. int32_t refcount;
  25. };
  26. static void update_metric_handle_from_rrddim(struct mem_metric_handle *mh, RRDDIM *rd) {
  27. mh->counter = rd->rrdset->counter;
  28. mh->entries = rd->rrdset->entries;
  29. mh->current_entry = rd->rrdset->current_entry;
  30. mh->last_updated_s = rd->rrdset->last_updated.tv_sec;
  31. mh->update_every_s = rd->rrdset->update_every;
  32. }
  33. static void check_metric_handle_from_rrddim(struct mem_metric_handle *mh) {
  34. RRDDIM *rd = mh->rd; (void)rd;
  35. internal_fatal(mh->entries != (size_t)rd->rrdset->entries, "RRDDIM: entries do not match");
  36. internal_fatal(mh->update_every_s != rd->rrdset->update_every, "RRDDIM: update every does not match");
  37. }
  38. STORAGE_METRIC_HANDLE *
  39. rrddim_metric_get_or_create(RRDDIM *rd, STORAGE_INSTANCE *db_instance __maybe_unused) {
  40. struct mem_metric_handle *mh = (struct mem_metric_handle *)rrddim_metric_get(db_instance, &rd->metric_uuid);
  41. while(!mh) {
  42. netdata_rwlock_wrlock(&rrddim_JudyHS_rwlock);
  43. Pvoid_t *PValue = JudyHSIns(&rrddim_JudyHS_array, &rd->metric_uuid, sizeof(uuid_t), PJE0);
  44. mh = *PValue;
  45. if(!mh) {
  46. mh = callocz(1, sizeof(struct mem_metric_handle));
  47. mh->rd = rd;
  48. mh->refcount = 1;
  49. update_metric_handle_from_rrddim(mh, rd);
  50. *PValue = mh;
  51. __atomic_add_fetch(&rrddim_db_memory_size, sizeof(struct mem_metric_handle) + JUDYHS_INDEX_SIZE_ESTIMATE(sizeof(uuid_t)), __ATOMIC_RELAXED);
  52. }
  53. else {
  54. if(__atomic_add_fetch(&mh->refcount, 1, __ATOMIC_RELAXED) <= 0)
  55. mh = NULL;
  56. }
  57. netdata_rwlock_unlock(&rrddim_JudyHS_rwlock);
  58. }
  59. internal_fatal(mh->rd != rd, "RRDDIM_MEM: incorrect pointer returned from index.");
  60. return (STORAGE_METRIC_HANDLE *)mh;
  61. }
  62. STORAGE_METRIC_HANDLE *
  63. rrddim_metric_get(STORAGE_INSTANCE *db_instance __maybe_unused, uuid_t *uuid) {
  64. struct mem_metric_handle *mh = NULL;
  65. netdata_rwlock_rdlock(&rrddim_JudyHS_rwlock);
  66. Pvoid_t *PValue = JudyHSGet(rrddim_JudyHS_array, uuid, sizeof(uuid_t));
  67. if (likely(NULL != PValue)) {
  68. mh = *PValue;
  69. if(__atomic_add_fetch(&mh->refcount, 1, __ATOMIC_RELAXED) <= 0)
  70. mh = NULL;
  71. }
  72. netdata_rwlock_unlock(&rrddim_JudyHS_rwlock);
  73. return (STORAGE_METRIC_HANDLE *)mh;
  74. }
  75. STORAGE_METRIC_HANDLE *rrddim_metric_dup(STORAGE_METRIC_HANDLE *db_metric_handle) {
  76. struct mem_metric_handle *mh = (struct mem_metric_handle *)db_metric_handle;
  77. __atomic_add_fetch(&mh->refcount, 1, __ATOMIC_RELAXED);
  78. return db_metric_handle;
  79. }
  80. void rrddim_metric_release(STORAGE_METRIC_HANDLE *db_metric_handle __maybe_unused) {
  81. struct mem_metric_handle *mh = (struct mem_metric_handle *)db_metric_handle;
  82. if(__atomic_sub_fetch(&mh->refcount, 1, __ATOMIC_RELAXED) == 0) {
  83. // we are the last one holding this
  84. int32_t expected = 0;
  85. if(__atomic_compare_exchange_n(&mh->refcount, &expected, -99999, false, __ATOMIC_RELAXED, __ATOMIC_RELAXED)) {
  86. // we can delete it
  87. RRDDIM *rd = mh->rd;
  88. netdata_rwlock_wrlock(&rrddim_JudyHS_rwlock);
  89. JudyHSDel(&rrddim_JudyHS_array, &rd->metric_uuid, sizeof(uuid_t), PJE0);
  90. netdata_rwlock_unlock(&rrddim_JudyHS_rwlock);
  91. freez(mh);
  92. __atomic_sub_fetch(&rrddim_db_memory_size, sizeof(struct mem_metric_handle) + JUDYHS_INDEX_SIZE_ESTIMATE(sizeof(uuid_t)), __ATOMIC_RELAXED);
  93. }
  94. }
  95. }
  96. bool rrddim_metric_retention_by_uuid(STORAGE_INSTANCE *db_instance __maybe_unused, uuid_t *uuid, time_t *first_entry_s, time_t *last_entry_s) {
  97. STORAGE_METRIC_HANDLE *db_metric_handle = rrddim_metric_get(db_instance, uuid);
  98. if(!db_metric_handle)
  99. return false;
  100. *first_entry_s = rrddim_query_oldest_time_s(db_metric_handle);
  101. *last_entry_s = rrddim_query_latest_time_s(db_metric_handle);
  102. return true;
  103. }
  104. void rrddim_store_metric_change_collection_frequency(STORAGE_COLLECT_HANDLE *collection_handle, int update_every) {
  105. struct mem_collect_handle *ch = (struct mem_collect_handle *)collection_handle;
  106. struct mem_metric_handle *mh = (struct mem_metric_handle *)ch->db_metric_handle;
  107. rrddim_store_metric_flush(collection_handle);
  108. mh->update_every_s = update_every;
  109. }
  110. STORAGE_COLLECT_HANDLE *rrddim_collect_init(STORAGE_METRIC_HANDLE *db_metric_handle, uint32_t update_every __maybe_unused, STORAGE_METRICS_GROUP *smg __maybe_unused) {
  111. struct mem_metric_handle *mh = (struct mem_metric_handle *)db_metric_handle;
  112. RRDDIM *rd = mh->rd;
  113. update_metric_handle_from_rrddim(mh, rd);
  114. internal_fatal((uint32_t)mh->update_every_s != update_every, "RRDDIM: update requested does not match the dimension");
  115. struct mem_collect_handle *ch = callocz(1, sizeof(struct mem_collect_handle));
  116. ch->common.backend = STORAGE_ENGINE_BACKEND_RRDDIM;
  117. ch->rd = rd;
  118. ch->db_metric_handle = db_metric_handle;
  119. __atomic_add_fetch(&rrddim_db_memory_size, sizeof(struct mem_collect_handle), __ATOMIC_RELAXED);
  120. return (STORAGE_COLLECT_HANDLE *)ch;
  121. }
  122. void rrddim_store_metric_flush(STORAGE_COLLECT_HANDLE *collection_handle) {
  123. struct mem_collect_handle *ch = (struct mem_collect_handle *)collection_handle;
  124. struct mem_metric_handle *mh = (struct mem_metric_handle *)ch->db_metric_handle;
  125. RRDDIM *rd = mh->rd;
  126. size_t entries = mh->entries;
  127. storage_number empty = pack_storage_number(NAN, SN_FLAG_NONE);
  128. for(size_t i = 0; i < entries ;i++)
  129. rd->db[i] = empty;
  130. mh->counter = 0;
  131. mh->last_updated_s = 0;
  132. mh->current_entry = 0;
  133. }
  134. static inline void rrddim_fill_the_gap(STORAGE_COLLECT_HANDLE *collection_handle, time_t now_collect_s) {
  135. struct mem_collect_handle *ch = (struct mem_collect_handle *)collection_handle;
  136. struct mem_metric_handle *mh = (struct mem_metric_handle *)ch->db_metric_handle;
  137. RRDDIM *rd = mh->rd;
  138. internal_fatal(ch->rd != mh->rd, "RRDDIM: dimensions do not match");
  139. check_metric_handle_from_rrddim(mh);
  140. size_t entries = mh->entries;
  141. time_t update_every_s = mh->update_every_s;
  142. time_t last_stored_s = mh->last_updated_s;
  143. size_t gap_entries = (now_collect_s - last_stored_s) / update_every_s;
  144. if(gap_entries >= entries)
  145. rrddim_store_metric_flush(collection_handle);
  146. else {
  147. storage_number empty = pack_storage_number(NAN, SN_FLAG_NONE);
  148. size_t current_entry = mh->current_entry;
  149. time_t now_store_s = last_stored_s + update_every_s;
  150. // fill the dimension
  151. size_t c;
  152. for(c = 0; c < entries && now_store_s <= now_collect_s ; now_store_s += update_every_s, c++) {
  153. rd->db[current_entry++] = empty;
  154. if(unlikely(current_entry >= entries))
  155. current_entry = 0;
  156. }
  157. mh->counter += c;
  158. mh->current_entry = current_entry;
  159. mh->last_updated_s = now_store_s;
  160. }
  161. }
  162. void rrddim_collect_store_metric(STORAGE_COLLECT_HANDLE *collection_handle,
  163. usec_t point_in_time_ut,
  164. NETDATA_DOUBLE n,
  165. NETDATA_DOUBLE min_value __maybe_unused,
  166. NETDATA_DOUBLE max_value __maybe_unused,
  167. uint16_t count __maybe_unused,
  168. uint16_t anomaly_count __maybe_unused,
  169. SN_FLAGS flags)
  170. {
  171. struct mem_collect_handle *ch = (struct mem_collect_handle *)collection_handle;
  172. struct mem_metric_handle *mh = (struct mem_metric_handle *)ch->db_metric_handle;
  173. RRDDIM *rd = ch->rd;
  174. time_t point_in_time_s = (time_t)(point_in_time_ut / USEC_PER_SEC);
  175. internal_fatal(ch->rd != mh->rd, "RRDDIM: dimensions do not match");
  176. check_metric_handle_from_rrddim(mh);
  177. if(unlikely(point_in_time_s <= mh->last_updated_s))
  178. return;
  179. if(unlikely(mh->last_updated_s && point_in_time_s - mh->update_every_s > mh->last_updated_s))
  180. rrddim_fill_the_gap(collection_handle, point_in_time_s);
  181. rd->db[mh->current_entry] = pack_storage_number(n, flags);
  182. mh->counter++;
  183. mh->current_entry = (mh->current_entry + 1) >= mh->entries ? 0 : mh->current_entry + 1;
  184. mh->last_updated_s = point_in_time_s;
  185. }
  186. int rrddim_collect_finalize(STORAGE_COLLECT_HANDLE *collection_handle) {
  187. freez(collection_handle);
  188. __atomic_sub_fetch(&rrddim_db_memory_size, sizeof(struct mem_collect_handle), __ATOMIC_RELAXED);
  189. return 0;
  190. }
  191. // ----------------------------------------------------------------------------
  192. // get the total duration in seconds of the round-robin database
  193. #define metric_duration(mh) (( (time_t)(mh)->counter >= (time_t)(mh)->entries ? (time_t)(mh)->entries : (time_t)(mh)->counter ) * (time_t)(mh)->update_every_s)
  194. // get the last slot updated in the round-robin database
  195. #define rrddim_last_slot(mh) ((size_t)(((mh)->current_entry == 0) ? (mh)->entries - 1 : (mh)->current_entry - 1))
  196. // return the slot that has the oldest value
  197. #define rrddim_first_slot(mh) ((size_t)((mh)->counter >= (size_t)(mh)->entries ? (mh)->current_entry : 0))
  198. // get the slot of the round-robin database, for the given timestamp (t)
  199. // it always returns a valid slot, although it may not be for the time requested if the time is outside the round-robin database
  200. // only valid when not using dbengine
  201. static inline size_t rrddim_time2slot(STORAGE_METRIC_HANDLE *db_metric_handle, time_t t) {
  202. struct mem_metric_handle *mh = (struct mem_metric_handle *)db_metric_handle;
  203. RRDDIM *rd = mh->rd;
  204. size_t ret = 0;
  205. time_t last_entry_s = rrddim_query_latest_time_s(db_metric_handle);
  206. time_t first_entry_s = rrddim_query_oldest_time_s(db_metric_handle);
  207. size_t entries = mh->entries;
  208. size_t first_slot = rrddim_first_slot(mh);
  209. size_t last_slot = rrddim_last_slot(mh);
  210. size_t update_every = mh->update_every_s;
  211. if(t >= last_entry_s) {
  212. // the requested time is after the last entry we have
  213. ret = last_slot;
  214. }
  215. else {
  216. if(t <= first_entry_s) {
  217. // the requested time is before the first entry we have
  218. ret = first_slot;
  219. }
  220. else {
  221. if(last_slot >= (size_t)((last_entry_s - t) / update_every))
  222. ret = last_slot - ((last_entry_s - t) / update_every);
  223. else
  224. ret = last_slot - ((last_entry_s - t) / update_every) + entries;
  225. }
  226. }
  227. if(unlikely(ret >= entries)) {
  228. error("INTERNAL ERROR: rrddim_time2slot() on %s returns values outside entries", rrddim_name(rd));
  229. ret = entries - 1;
  230. }
  231. return ret;
  232. }
  233. // get the timestamp of a specific slot in the round-robin database
  234. // only valid when not using dbengine
  235. static inline time_t rrddim_slot2time(STORAGE_METRIC_HANDLE *db_metric_handle, size_t slot) {
  236. struct mem_metric_handle *mh = (struct mem_metric_handle *)db_metric_handle;
  237. RRDDIM *rd = mh->rd;
  238. time_t ret;
  239. time_t last_entry_s = rrddim_query_latest_time_s(db_metric_handle);
  240. time_t first_entry_s = rrddim_query_oldest_time_s(db_metric_handle);
  241. size_t entries = mh->entries;
  242. size_t last_slot = rrddim_last_slot(mh);
  243. size_t update_every = mh->update_every_s;
  244. if(slot >= entries) {
  245. error("INTERNAL ERROR: caller of rrddim_slot2time() gives invalid slot %zu", slot);
  246. slot = entries - 1;
  247. }
  248. if(slot > last_slot)
  249. ret = last_entry_s - (time_t)(update_every * (last_slot - slot + entries));
  250. else
  251. ret = last_entry_s - (time_t)(update_every * (last_slot - slot));
  252. if(unlikely(ret < first_entry_s)) {
  253. error("INTERNAL ERROR: rrddim_slot2time() on dimension '%s' of chart '%s' returned time (%ld) too far in the past (before first_entry_s %ld) for slot %zu",
  254. rrddim_name(rd), rrdset_id(rd->rrdset), ret, first_entry_s, slot);
  255. ret = first_entry_s;
  256. }
  257. if(unlikely(ret > last_entry_s)) {
  258. error("INTERNAL ERROR: rrddim_slot2time() on dimension '%s' of chart '%s' returned time (%ld) too far into the future (after last_entry_s %ld) for slot %zu",
  259. rrddim_name(rd), rrdset_id(rd->rrdset), ret, last_entry_s, slot);
  260. ret = last_entry_s;
  261. }
  262. return ret;
  263. }
  264. // ----------------------------------------------------------------------------
  265. // RRDDIM legacy database query functions
  266. void rrddim_query_init(STORAGE_METRIC_HANDLE *db_metric_handle, struct storage_engine_query_handle *handle, time_t start_time_s, time_t end_time_s, STORAGE_PRIORITY priority __maybe_unused) {
  267. struct mem_metric_handle *mh = (struct mem_metric_handle *)db_metric_handle;
  268. check_metric_handle_from_rrddim(mh);
  269. handle->start_time_s = start_time_s;
  270. handle->end_time_s = end_time_s;
  271. handle->priority = priority;
  272. handle->backend = STORAGE_ENGINE_BACKEND_RRDDIM;
  273. struct mem_query_handle* h = mallocz(sizeof(struct mem_query_handle));
  274. h->db_metric_handle = db_metric_handle;
  275. h->slot = rrddim_time2slot(db_metric_handle, start_time_s);
  276. h->last_slot = rrddim_time2slot(db_metric_handle, end_time_s);
  277. h->dt = mh->update_every_s;
  278. h->next_timestamp = start_time_s;
  279. h->slot_timestamp = rrddim_slot2time(db_metric_handle, h->slot);
  280. h->last_timestamp = rrddim_slot2time(db_metric_handle, h->last_slot);
  281. // info("RRDDIM QUERY INIT: start %ld, end %ld, next %ld, first %ld, last %ld, dt %ld", start_time, end_time, h->next_timestamp, h->slot_timestamp, h->last_timestamp, h->dt);
  282. __atomic_add_fetch(&rrddim_db_memory_size, sizeof(struct mem_query_handle), __ATOMIC_RELAXED);
  283. handle->handle = (STORAGE_QUERY_HANDLE *)h;
  284. }
  285. // Returns the metric and sets its timestamp into current_time
  286. // IT IS REQUIRED TO **ALWAYS** SET ALL RETURN VALUES (current_time, end_time, flags)
  287. // IT IS REQUIRED TO **ALWAYS** KEEP TRACK OF TIME, EVEN OUTSIDE THE DATABASE BOUNDARIES
  288. STORAGE_POINT rrddim_query_next_metric(struct storage_engine_query_handle *handle) {
  289. struct mem_query_handle* h = (struct mem_query_handle*)handle->handle;
  290. struct mem_metric_handle *mh = (struct mem_metric_handle *)h->db_metric_handle;
  291. RRDDIM *rd = mh->rd;
  292. size_t entries = mh->entries;
  293. size_t slot = h->slot;
  294. STORAGE_POINT sp;
  295. sp.count = 1;
  296. time_t this_timestamp = h->next_timestamp;
  297. h->next_timestamp += h->dt;
  298. // set this timestamp for our caller
  299. sp.start_time_s = this_timestamp - h->dt;
  300. sp.end_time_s = this_timestamp;
  301. if(unlikely(this_timestamp < h->slot_timestamp)) {
  302. storage_point_empty(sp, sp.start_time_s, sp.end_time_s);
  303. return sp;
  304. }
  305. if(unlikely(this_timestamp > h->last_timestamp)) {
  306. storage_point_empty(sp, sp.start_time_s, sp.end_time_s);
  307. return sp;
  308. }
  309. storage_number n = rd->db[slot++];
  310. if(unlikely(slot >= entries)) slot = 0;
  311. h->slot = slot;
  312. h->slot_timestamp += h->dt;
  313. sp.anomaly_count = is_storage_number_anomalous(n) ? 1 : 0;
  314. sp.flags = (n & SN_USER_FLAGS);
  315. sp.min = sp.max = sp.sum = unpack_storage_number(n);
  316. return sp;
  317. }
  318. int rrddim_query_is_finished(struct storage_engine_query_handle *handle) {
  319. struct mem_query_handle *h = (struct mem_query_handle*)handle->handle;
  320. return (h->next_timestamp > handle->end_time_s);
  321. }
  322. void rrddim_query_finalize(struct storage_engine_query_handle *handle) {
  323. #ifdef NETDATA_INTERNAL_CHECKS
  324. struct mem_query_handle *h = (struct mem_query_handle*)handle->handle;
  325. struct mem_metric_handle *mh = (struct mem_metric_handle *)h->db_metric_handle;
  326. internal_error(!rrddim_query_is_finished(handle),
  327. "QUERY: query for chart '%s' dimension '%s' has been stopped unfinished",
  328. rrdset_id(mh->rd->rrdset), rrddim_name(mh->rd));
  329. #endif
  330. freez(handle->handle);
  331. __atomic_sub_fetch(&rrddim_db_memory_size, sizeof(struct mem_query_handle), __ATOMIC_RELAXED);
  332. }
  333. time_t rrddim_query_align_to_optimal_before(struct storage_engine_query_handle *rrddim_handle) {
  334. return rrddim_handle->end_time_s;
  335. }
  336. time_t rrddim_query_latest_time_s(STORAGE_METRIC_HANDLE *db_metric_handle) {
  337. struct mem_metric_handle *mh = (struct mem_metric_handle *)db_metric_handle;
  338. return mh->last_updated_s;
  339. }
  340. time_t rrddim_query_oldest_time_s(STORAGE_METRIC_HANDLE *db_metric_handle) {
  341. struct mem_metric_handle *mh = (struct mem_metric_handle *)db_metric_handle;
  342. return (time_t)(mh->last_updated_s - metric_duration(mh));
  343. }