metric.c 29 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737738739740741742743744745746747748749750751752753754755756757758759760761762763764765766767768769770771772773774775776777778779780781782783784785786787788789790791792793794795796797798799800801802803804805806807808809810811812813814815816817818819820821822823824825826827828829830831832833834835836837838839840841842843844845846847848849850851852853854855856857858859860861862863864865866867868869870871872873874875
  1. #include "metric.h"
  2. typedef int32_t REFCOUNT;
  3. #define REFCOUNT_DELETING (-100)
  4. typedef enum __attribute__ ((__packed__)) {
  5. METRIC_FLAG_HAS_RETENTION = (1 << 0),
  6. } METRIC_FLAGS;
  7. struct metric {
  8. uuid_t uuid; // never changes
  9. Word_t section; // never changes
  10. time_t first_time_s; //
  11. time_t latest_time_s_clean; // archived pages latest time
  12. time_t latest_time_s_hot; // latest time of the currently collected page
  13. uint32_t latest_update_every_s; //
  14. pid_t writer;
  15. METRIC_FLAGS flags;
  16. REFCOUNT refcount;
  17. SPINLOCK spinlock; // protects all variable members
  18. // THIS IS allocated with malloc()
  19. // YOU HAVE TO INITIALIZE IT YOURSELF !
  20. };
  21. static struct aral_statistics mrg_aral_statistics;
  22. struct mrg {
  23. ARAL *aral[MRG_PARTITIONS];
  24. struct pgc_index {
  25. netdata_rwlock_t rwlock;
  26. Pvoid_t uuid_judy; // each UUID has a JudyL of sections (tiers)
  27. } index[MRG_PARTITIONS];
  28. struct mrg_statistics stats;
  29. size_t entries_per_partition[MRG_PARTITIONS];
  30. };
  31. static inline void MRG_STATS_DUPLICATE_ADD(MRG *mrg) {
  32. __atomic_add_fetch(&mrg->stats.additions_duplicate, 1, __ATOMIC_RELAXED);
  33. }
  34. static inline void MRG_STATS_ADDED_METRIC(MRG *mrg, size_t partition) {
  35. __atomic_add_fetch(&mrg->stats.entries, 1, __ATOMIC_RELAXED);
  36. __atomic_add_fetch(&mrg->stats.additions, 1, __ATOMIC_RELAXED);
  37. __atomic_add_fetch(&mrg->stats.size, sizeof(METRIC), __ATOMIC_RELAXED);
  38. __atomic_add_fetch(&mrg->entries_per_partition[partition], 1, __ATOMIC_RELAXED);
  39. }
  40. static inline void MRG_STATS_DELETED_METRIC(MRG *mrg, size_t partition) {
  41. __atomic_sub_fetch(&mrg->stats.entries, 1, __ATOMIC_RELAXED);
  42. __atomic_sub_fetch(&mrg->stats.size, sizeof(METRIC), __ATOMIC_RELAXED);
  43. __atomic_add_fetch(&mrg->stats.deletions, 1, __ATOMIC_RELAXED);
  44. __atomic_sub_fetch(&mrg->entries_per_partition[partition], 1, __ATOMIC_RELAXED);
  45. }
  46. static inline void MRG_STATS_SEARCH_HIT(MRG *mrg) {
  47. __atomic_add_fetch(&mrg->stats.search_hits, 1, __ATOMIC_RELAXED);
  48. }
  49. static inline void MRG_STATS_SEARCH_MISS(MRG *mrg) {
  50. __atomic_add_fetch(&mrg->stats.search_misses, 1, __ATOMIC_RELAXED);
  51. }
  52. static inline void MRG_STATS_DELETE_MISS(MRG *mrg) {
  53. __atomic_add_fetch(&mrg->stats.delete_misses, 1, __ATOMIC_RELAXED);
  54. }
  55. static inline void mrg_index_read_lock(MRG *mrg, size_t partition) {
  56. netdata_rwlock_rdlock(&mrg->index[partition].rwlock);
  57. }
  58. static inline void mrg_index_read_unlock(MRG *mrg, size_t partition) {
  59. netdata_rwlock_unlock(&mrg->index[partition].rwlock);
  60. }
  61. static inline void mrg_index_write_lock(MRG *mrg, size_t partition) {
  62. netdata_rwlock_wrlock(&mrg->index[partition].rwlock);
  63. }
  64. static inline void mrg_index_write_unlock(MRG *mrg, size_t partition) {
  65. netdata_rwlock_unlock(&mrg->index[partition].rwlock);
  66. }
  67. static inline void mrg_stats_size_judyl_change(MRG *mrg, size_t mem_before_judyl, size_t mem_after_judyl) {
  68. if(mem_after_judyl > mem_before_judyl)
  69. __atomic_add_fetch(&mrg->stats.size, mem_after_judyl - mem_before_judyl, __ATOMIC_RELAXED);
  70. else if(mem_after_judyl < mem_before_judyl)
  71. __atomic_sub_fetch(&mrg->stats.size, mem_before_judyl - mem_after_judyl, __ATOMIC_RELAXED);
  72. }
  73. static inline void mrg_stats_size_judyhs_added_uuid(MRG *mrg) {
  74. __atomic_add_fetch(&mrg->stats.size, JUDYHS_INDEX_SIZE_ESTIMATE(sizeof(uuid_t)), __ATOMIC_RELAXED);
  75. }
  76. static inline void mrg_stats_size_judyhs_removed_uuid(MRG *mrg) {
  77. __atomic_sub_fetch(&mrg->stats.size, JUDYHS_INDEX_SIZE_ESTIMATE(sizeof(uuid_t)), __ATOMIC_RELAXED);
  78. }
  79. static inline size_t uuid_partition(MRG *mrg __maybe_unused, uuid_t *uuid) {
  80. uint8_t *u = (uint8_t *)uuid;
  81. return u[UUID_SZ - 1] % MRG_PARTITIONS;
  82. }
  83. static inline bool metric_has_retention_unsafe(MRG *mrg __maybe_unused, METRIC *metric) {
  84. bool has_retention = (metric->first_time_s || metric->latest_time_s_clean || metric->latest_time_s_hot);
  85. if(has_retention && !(metric->flags & METRIC_FLAG_HAS_RETENTION)) {
  86. metric->flags |= METRIC_FLAG_HAS_RETENTION;
  87. __atomic_add_fetch(&mrg->stats.entries_with_retention, 1, __ATOMIC_RELAXED);
  88. }
  89. else if(!has_retention && (metric->flags & METRIC_FLAG_HAS_RETENTION)) {
  90. metric->flags &= ~METRIC_FLAG_HAS_RETENTION;
  91. __atomic_sub_fetch(&mrg->stats.entries_with_retention, 1, __ATOMIC_RELAXED);
  92. }
  93. return has_retention;
  94. }
  95. static inline REFCOUNT metric_acquire(MRG *mrg __maybe_unused, METRIC *metric, bool having_spinlock) {
  96. REFCOUNT refcount;
  97. if(!having_spinlock)
  98. netdata_spinlock_lock(&metric->spinlock);
  99. if(unlikely(metric->refcount < 0))
  100. fatal("METRIC: refcount is %d (negative) during acquire", metric->refcount);
  101. refcount = ++metric->refcount;
  102. // update its retention flags
  103. metric_has_retention_unsafe(mrg, metric);
  104. if(!having_spinlock)
  105. netdata_spinlock_unlock(&metric->spinlock);
  106. if(refcount == 1)
  107. __atomic_add_fetch(&mrg->stats.entries_referenced, 1, __ATOMIC_RELAXED);
  108. __atomic_add_fetch(&mrg->stats.current_references, 1, __ATOMIC_RELAXED);
  109. return refcount;
  110. }
  111. static inline bool metric_release_and_can_be_deleted(MRG *mrg __maybe_unused, METRIC *metric) {
  112. bool ret = true;
  113. REFCOUNT refcount;
  114. netdata_spinlock_lock(&metric->spinlock);
  115. if(unlikely(metric->refcount <= 0))
  116. fatal("METRIC: refcount is %d (zero or negative) during release", metric->refcount);
  117. refcount = --metric->refcount;
  118. if(likely(metric_has_retention_unsafe(mrg, metric) || refcount != 0))
  119. ret = false;
  120. netdata_spinlock_unlock(&metric->spinlock);
  121. if(unlikely(!refcount))
  122. __atomic_sub_fetch(&mrg->stats.entries_referenced, 1, __ATOMIC_RELAXED);
  123. __atomic_sub_fetch(&mrg->stats.current_references, 1, __ATOMIC_RELAXED);
  124. return ret;
  125. }
  126. static METRIC *metric_add_and_acquire(MRG *mrg, MRG_ENTRY *entry, bool *ret) {
  127. size_t partition = uuid_partition(mrg, &entry->uuid);
  128. METRIC *allocation = aral_mallocz(mrg->aral[partition]);
  129. mrg_index_write_lock(mrg, partition);
  130. size_t mem_before_judyl, mem_after_judyl;
  131. Pvoid_t *sections_judy_pptr = JudyHSIns(&mrg->index[partition].uuid_judy, &entry->uuid, sizeof(uuid_t), PJE0);
  132. if(unlikely(!sections_judy_pptr || sections_judy_pptr == PJERR))
  133. fatal("DBENGINE METRIC: corrupted UUIDs JudyHS array");
  134. if(unlikely(!*sections_judy_pptr))
  135. mrg_stats_size_judyhs_added_uuid(mrg);
  136. mem_before_judyl = JudyLMemUsed(*sections_judy_pptr);
  137. Pvoid_t *PValue = JudyLIns(sections_judy_pptr, entry->section, PJE0);
  138. mem_after_judyl = JudyLMemUsed(*sections_judy_pptr);
  139. mrg_stats_size_judyl_change(mrg, mem_before_judyl, mem_after_judyl);
  140. if(unlikely(!PValue || PValue == PJERR))
  141. fatal("DBENGINE METRIC: corrupted section JudyL array");
  142. if(unlikely(*PValue != NULL)) {
  143. METRIC *metric = *PValue;
  144. metric_acquire(mrg, metric, false);
  145. mrg_index_write_unlock(mrg, partition);
  146. if(ret)
  147. *ret = false;
  148. aral_freez(mrg->aral[partition], allocation);
  149. MRG_STATS_DUPLICATE_ADD(mrg);
  150. return metric;
  151. }
  152. METRIC *metric = allocation;
  153. uuid_copy(metric->uuid, entry->uuid);
  154. metric->section = entry->section;
  155. metric->first_time_s = entry->first_time_s;
  156. metric->latest_time_s_clean = entry->last_time_s;
  157. metric->latest_time_s_hot = 0;
  158. metric->latest_update_every_s = entry->latest_update_every_s;
  159. metric->writer = 0;
  160. metric->refcount = 0;
  161. metric->flags = 0;
  162. netdata_spinlock_init(&metric->spinlock);
  163. metric_acquire(mrg, metric, true); // no spinlock use required here
  164. *PValue = metric;
  165. mrg_index_write_unlock(mrg, partition);
  166. if(ret)
  167. *ret = true;
  168. MRG_STATS_ADDED_METRIC(mrg, partition);
  169. return metric;
  170. }
  171. static METRIC *metric_get_and_acquire(MRG *mrg, uuid_t *uuid, Word_t section) {
  172. size_t partition = uuid_partition(mrg, uuid);
  173. mrg_index_read_lock(mrg, partition);
  174. Pvoid_t *sections_judy_pptr = JudyHSGet(mrg->index[partition].uuid_judy, uuid, sizeof(uuid_t));
  175. if(unlikely(!sections_judy_pptr)) {
  176. mrg_index_read_unlock(mrg, partition);
  177. MRG_STATS_SEARCH_MISS(mrg);
  178. return NULL;
  179. }
  180. Pvoid_t *PValue = JudyLGet(*sections_judy_pptr, section, PJE0);
  181. if(unlikely(!PValue)) {
  182. mrg_index_read_unlock(mrg, partition);
  183. MRG_STATS_SEARCH_MISS(mrg);
  184. return NULL;
  185. }
  186. METRIC *metric = *PValue;
  187. metric_acquire(mrg, metric, false);
  188. mrg_index_read_unlock(mrg, partition);
  189. MRG_STATS_SEARCH_HIT(mrg);
  190. return metric;
  191. }
  192. static bool acquired_metric_del(MRG *mrg, METRIC *metric) {
  193. size_t partition = uuid_partition(mrg, &metric->uuid);
  194. size_t mem_before_judyl, mem_after_judyl;
  195. mrg_index_write_lock(mrg, partition);
  196. if(!metric_release_and_can_be_deleted(mrg, metric)) {
  197. mrg_index_write_unlock(mrg, partition);
  198. __atomic_add_fetch(&mrg->stats.delete_having_retention_or_referenced, 1, __ATOMIC_RELAXED);
  199. return false;
  200. }
  201. Pvoid_t *sections_judy_pptr = JudyHSGet(mrg->index[partition].uuid_judy, &metric->uuid, sizeof(uuid_t));
  202. if(unlikely(!sections_judy_pptr || !*sections_judy_pptr)) {
  203. mrg_index_write_unlock(mrg, partition);
  204. MRG_STATS_DELETE_MISS(mrg);
  205. return false;
  206. }
  207. mem_before_judyl = JudyLMemUsed(*sections_judy_pptr);
  208. int rc = JudyLDel(sections_judy_pptr, metric->section, PJE0);
  209. mem_after_judyl = JudyLMemUsed(*sections_judy_pptr);
  210. mrg_stats_size_judyl_change(mrg, mem_before_judyl, mem_after_judyl);
  211. if(unlikely(!rc)) {
  212. mrg_index_write_unlock(mrg, partition);
  213. MRG_STATS_DELETE_MISS(mrg);
  214. return false;
  215. }
  216. if(!*sections_judy_pptr) {
  217. rc = JudyHSDel(&mrg->index[partition].uuid_judy, &metric->uuid, sizeof(uuid_t), PJE0);
  218. if(unlikely(!rc))
  219. fatal("DBENGINE METRIC: cannot delete UUID from JudyHS");
  220. mrg_stats_size_judyhs_removed_uuid(mrg);
  221. }
  222. mrg_index_write_unlock(mrg, partition);
  223. aral_freez(mrg->aral[partition], metric);
  224. MRG_STATS_DELETED_METRIC(mrg, partition);
  225. return true;
  226. }
  227. // ----------------------------------------------------------------------------
  228. // public API
  229. MRG *mrg_create(void) {
  230. MRG *mrg = callocz(1, sizeof(MRG));
  231. for(size_t i = 0; i < MRG_PARTITIONS ; i++) {
  232. netdata_rwlock_init(&mrg->index[i].rwlock);
  233. char buf[ARAL_MAX_NAME + 1];
  234. snprintfz(buf, ARAL_MAX_NAME, "mrg[%zu]", i);
  235. mrg->aral[i] = aral_create(buf,
  236. sizeof(METRIC),
  237. 0,
  238. 16384,
  239. &mrg_aral_statistics,
  240. NULL, NULL, false,
  241. false);
  242. }
  243. mrg->stats.size = sizeof(MRG);
  244. return mrg;
  245. }
  246. size_t mrg_aral_structures(void) {
  247. return aral_structures_from_stats(&mrg_aral_statistics);
  248. }
  249. size_t mrg_aral_overhead(void) {
  250. return aral_overhead_from_stats(&mrg_aral_statistics);
  251. }
  252. void mrg_destroy(MRG *mrg __maybe_unused) {
  253. // no destruction possible
  254. // we can't traverse the metrics list
  255. // to delete entries, the caller needs to keep pointers to them
  256. // and delete them one by one
  257. ;
  258. }
  259. METRIC *mrg_metric_add_and_acquire(MRG *mrg, MRG_ENTRY entry, bool *ret) {
  260. // internal_fatal(entry.latest_time_s > max_acceptable_collected_time(),
  261. // "DBENGINE METRIC: metric latest time is in the future");
  262. return metric_add_and_acquire(mrg, &entry, ret);
  263. }
  264. METRIC *mrg_metric_get_and_acquire(MRG *mrg, uuid_t *uuid, Word_t section) {
  265. return metric_get_and_acquire(mrg, uuid, section);
  266. }
  267. bool mrg_metric_release_and_delete(MRG *mrg, METRIC *metric) {
  268. return acquired_metric_del(mrg, metric);
  269. }
  270. METRIC *mrg_metric_dup(MRG *mrg, METRIC *metric) {
  271. metric_acquire(mrg, metric, false);
  272. return metric;
  273. }
  274. bool mrg_metric_release(MRG *mrg, METRIC *metric) {
  275. return metric_release_and_can_be_deleted(mrg, metric);
  276. }
  277. Word_t mrg_metric_id(MRG *mrg __maybe_unused, METRIC *metric) {
  278. return (Word_t)metric;
  279. }
  280. uuid_t *mrg_metric_uuid(MRG *mrg __maybe_unused, METRIC *metric) {
  281. return &metric->uuid;
  282. }
  283. Word_t mrg_metric_section(MRG *mrg __maybe_unused, METRIC *metric) {
  284. return metric->section;
  285. }
  286. bool mrg_metric_set_first_time_s(MRG *mrg __maybe_unused, METRIC *metric, time_t first_time_s) {
  287. netdata_spinlock_lock(&metric->spinlock);
  288. metric->first_time_s = first_time_s;
  289. metric_has_retention_unsafe(mrg, metric);
  290. netdata_spinlock_unlock(&metric->spinlock);
  291. return true;
  292. }
  293. void mrg_metric_expand_retention(MRG *mrg __maybe_unused, METRIC *metric, time_t first_time_s, time_t last_time_s, time_t update_every_s) {
  294. internal_fatal(first_time_s > max_acceptable_collected_time(),
  295. "DBENGINE METRIC: metric first time is in the future");
  296. internal_fatal(last_time_s > max_acceptable_collected_time(),
  297. "DBENGINE METRIC: metric last time is in the future");
  298. netdata_spinlock_lock(&metric->spinlock);
  299. if(unlikely(first_time_s && (!metric->first_time_s || first_time_s < metric->first_time_s)))
  300. metric->first_time_s = first_time_s;
  301. if(likely(last_time_s && (!metric->latest_time_s_clean || last_time_s > metric->latest_time_s_clean))) {
  302. metric->latest_time_s_clean = last_time_s;
  303. if(likely(update_every_s))
  304. metric->latest_update_every_s = update_every_s;
  305. }
  306. else if(unlikely(!metric->latest_update_every_s && update_every_s))
  307. metric->latest_update_every_s = update_every_s;
  308. metric_has_retention_unsafe(mrg, metric);
  309. netdata_spinlock_unlock(&metric->spinlock);
  310. }
  311. bool mrg_metric_set_first_time_s_if_bigger(MRG *mrg __maybe_unused, METRIC *metric, time_t first_time_s) {
  312. bool ret = false;
  313. netdata_spinlock_lock(&metric->spinlock);
  314. if(first_time_s > metric->first_time_s) {
  315. metric->first_time_s = first_time_s;
  316. ret = true;
  317. }
  318. metric_has_retention_unsafe(mrg, metric);
  319. netdata_spinlock_unlock(&metric->spinlock);
  320. return ret;
  321. }
  322. time_t mrg_metric_get_first_time_s(MRG *mrg __maybe_unused, METRIC *metric) {
  323. time_t first_time_s;
  324. netdata_spinlock_lock(&metric->spinlock);
  325. if(unlikely(!metric->first_time_s)) {
  326. if(metric->latest_time_s_clean)
  327. metric->first_time_s = metric->latest_time_s_clean;
  328. else if(metric->latest_time_s_hot)
  329. metric->first_time_s = metric->latest_time_s_hot;
  330. }
  331. first_time_s = metric->first_time_s;
  332. netdata_spinlock_unlock(&metric->spinlock);
  333. return first_time_s;
  334. }
  335. void mrg_metric_get_retention(MRG *mrg __maybe_unused, METRIC *metric, time_t *first_time_s, time_t *last_time_s, time_t *update_every_s) {
  336. netdata_spinlock_lock(&metric->spinlock);
  337. if(unlikely(!metric->first_time_s)) {
  338. if(metric->latest_time_s_clean)
  339. metric->first_time_s = metric->latest_time_s_clean;
  340. else if(metric->latest_time_s_hot)
  341. metric->first_time_s = metric->latest_time_s_hot;
  342. }
  343. *first_time_s = metric->first_time_s;
  344. *last_time_s = MAX(metric->latest_time_s_clean, metric->latest_time_s_hot);
  345. *update_every_s = metric->latest_update_every_s;
  346. netdata_spinlock_unlock(&metric->spinlock);
  347. }
  348. bool mrg_metric_set_clean_latest_time_s(MRG *mrg __maybe_unused, METRIC *metric, time_t latest_time_s) {
  349. netdata_spinlock_lock(&metric->spinlock);
  350. // internal_fatal(latest_time_s > max_acceptable_collected_time(),
  351. // "DBENGINE METRIC: metric latest time is in the future");
  352. // internal_fatal(metric->latest_time_s_clean > latest_time_s,
  353. // "DBENGINE METRIC: metric new clean latest time is older than the previous one");
  354. metric->latest_time_s_clean = latest_time_s;
  355. if(unlikely(!metric->first_time_s))
  356. metric->first_time_s = latest_time_s;
  357. // if(unlikely(metric->first_time_s > latest_time_s))
  358. // metric->first_time_s = latest_time_s;
  359. metric_has_retention_unsafe(mrg, metric);
  360. netdata_spinlock_unlock(&metric->spinlock);
  361. return true;
  362. }
  363. // returns true when metric still has retention
  364. bool mrg_metric_zero_disk_retention(MRG *mrg __maybe_unused, METRIC *metric) {
  365. Word_t section = mrg_metric_section(mrg, metric);
  366. bool do_again = false;
  367. size_t countdown = 5;
  368. bool ret = true;
  369. do {
  370. time_t min_first_time_s = LONG_MAX;
  371. time_t max_end_time_s = 0;
  372. PGC_PAGE *page;
  373. PGC_SEARCH method = PGC_SEARCH_FIRST;
  374. time_t page_first_time_s = 0;
  375. time_t page_end_time_s = 0;
  376. while ((page = pgc_page_get_and_acquire(main_cache, section, (Word_t)metric, page_first_time_s, method))) {
  377. method = PGC_SEARCH_NEXT;
  378. bool is_hot = pgc_is_page_hot(page);
  379. bool is_dirty = pgc_is_page_dirty(page);
  380. page_first_time_s = pgc_page_start_time_s(page);
  381. page_end_time_s = pgc_page_end_time_s(page);
  382. if ((is_hot || is_dirty) && page_first_time_s < min_first_time_s)
  383. min_first_time_s = page_first_time_s;
  384. if (is_dirty && page_end_time_s > max_end_time_s)
  385. max_end_time_s = page_end_time_s;
  386. pgc_page_release(main_cache, page);
  387. }
  388. if (min_first_time_s == LONG_MAX)
  389. min_first_time_s = 0;
  390. netdata_spinlock_lock(&metric->spinlock);
  391. if (--countdown && !min_first_time_s && metric->latest_time_s_hot)
  392. do_again = true;
  393. else {
  394. internal_error(!countdown, "METRIC: giving up on updating the retention of metric without disk retention");
  395. do_again = false;
  396. metric->first_time_s = min_first_time_s;
  397. metric->latest_time_s_clean = max_end_time_s;
  398. ret = metric_has_retention_unsafe(mrg, metric);
  399. }
  400. netdata_spinlock_unlock(&metric->spinlock);
  401. } while(do_again);
  402. return ret;
  403. }
  404. bool mrg_metric_set_hot_latest_time_s(MRG *mrg __maybe_unused, METRIC *metric, time_t latest_time_s) {
  405. // internal_fatal(latest_time_s > max_acceptable_collected_time(),
  406. // "DBENGINE METRIC: metric latest time is in the future");
  407. netdata_spinlock_lock(&metric->spinlock);
  408. metric->latest_time_s_hot = latest_time_s;
  409. if(unlikely(!metric->first_time_s))
  410. metric->first_time_s = latest_time_s;
  411. // if(unlikely(metric->first_time_s > latest_time_s))
  412. // metric->first_time_s = latest_time_s;
  413. metric_has_retention_unsafe(mrg, metric);
  414. netdata_spinlock_unlock(&metric->spinlock);
  415. return true;
  416. }
  417. time_t mrg_metric_get_latest_time_s(MRG *mrg __maybe_unused, METRIC *metric) {
  418. time_t max;
  419. netdata_spinlock_lock(&metric->spinlock);
  420. max = MAX(metric->latest_time_s_clean, metric->latest_time_s_hot);
  421. netdata_spinlock_unlock(&metric->spinlock);
  422. return max;
  423. }
  424. bool mrg_metric_set_update_every(MRG *mrg __maybe_unused, METRIC *metric, time_t update_every_s) {
  425. if(!update_every_s)
  426. return false;
  427. netdata_spinlock_lock(&metric->spinlock);
  428. metric->latest_update_every_s = update_every_s;
  429. netdata_spinlock_unlock(&metric->spinlock);
  430. return true;
  431. }
  432. bool mrg_metric_set_update_every_s_if_zero(MRG *mrg __maybe_unused, METRIC *metric, time_t update_every_s) {
  433. if(!update_every_s)
  434. return false;
  435. netdata_spinlock_lock(&metric->spinlock);
  436. if(!metric->latest_update_every_s)
  437. metric->latest_update_every_s = update_every_s;
  438. netdata_spinlock_unlock(&metric->spinlock);
  439. return true;
  440. }
  441. time_t mrg_metric_get_update_every_s(MRG *mrg __maybe_unused, METRIC *metric) {
  442. time_t update_every_s;
  443. netdata_spinlock_lock(&metric->spinlock);
  444. update_every_s = metric->latest_update_every_s;
  445. netdata_spinlock_unlock(&metric->spinlock);
  446. return update_every_s;
  447. }
  448. bool mrg_metric_set_writer(MRG *mrg, METRIC *metric) {
  449. bool done = false;
  450. netdata_spinlock_lock(&metric->spinlock);
  451. if(!metric->writer) {
  452. metric->writer = gettid();
  453. __atomic_add_fetch(&mrg->stats.writers, 1, __ATOMIC_RELAXED);
  454. done = true;
  455. }
  456. else
  457. __atomic_add_fetch(&mrg->stats.writers_conflicts, 1, __ATOMIC_RELAXED);
  458. netdata_spinlock_unlock(&metric->spinlock);
  459. return done;
  460. }
  461. bool mrg_metric_clear_writer(MRG *mrg, METRIC *metric) {
  462. bool done = false;
  463. netdata_spinlock_lock(&metric->spinlock);
  464. if(metric->writer) {
  465. metric->writer = 0;
  466. __atomic_sub_fetch(&mrg->stats.writers, 1, __ATOMIC_RELAXED);
  467. done = true;
  468. }
  469. netdata_spinlock_unlock(&metric->spinlock);
  470. return done;
  471. }
  472. struct mrg_statistics mrg_get_statistics(MRG *mrg) {
  473. // FIXME - use atomics
  474. return mrg->stats;
  475. }
  476. // ----------------------------------------------------------------------------
  477. // unit test
  478. #ifdef MRG_STRESS_TEST
  479. static void mrg_stress(MRG *mrg, size_t entries, size_t sections) {
  480. bool ret;
  481. info("DBENGINE METRIC: stress testing %zu entries on %zu sections...", entries, sections);
  482. METRIC *array[entries][sections];
  483. for(size_t i = 0; i < entries ; i++) {
  484. MRG_ENTRY e = {
  485. .first_time_s = (time_t)(i + 1),
  486. .latest_time_s = (time_t)(i + 2),
  487. .latest_update_every_s = (time_t)(i + 3),
  488. };
  489. uuid_generate_random(e.uuid);
  490. for(size_t section = 0; section < sections ;section++) {
  491. e.section = section;
  492. array[i][section] = mrg_metric_add_and_acquire(mrg, e, &ret);
  493. if(!ret)
  494. fatal("DBENGINE METRIC: failed to add metric %zu, section %zu", i, section);
  495. if(mrg_metric_add_and_acquire(mrg, e, &ret) != array[i][section])
  496. fatal("DBENGINE METRIC: adding the same metric twice, returns a different metric");
  497. if(ret)
  498. fatal("DBENGINE METRIC: adding the same metric twice, returns success");
  499. if(mrg_metric_get_and_acquire(mrg, &e.uuid, e.section) != array[i][section])
  500. fatal("DBENGINE METRIC: cannot get back the same metric");
  501. if(uuid_compare(*mrg_metric_uuid(mrg, array[i][section]), e.uuid) != 0)
  502. fatal("DBENGINE METRIC: uuids do not match");
  503. }
  504. }
  505. for(size_t i = 0; i < entries ; i++) {
  506. for (size_t section = 0; section < sections; section++) {
  507. uuid_t uuid;
  508. uuid_generate_random(uuid);
  509. if(mrg_metric_get_and_acquire(mrg, &uuid, section))
  510. fatal("DBENGINE METRIC: found non-existing uuid");
  511. if(mrg_metric_id(mrg, array[i][section]) != (Word_t)array[i][section])
  512. fatal("DBENGINE METRIC: metric id does not match");
  513. if(mrg_metric_get_first_time_s(mrg, array[i][section]) != (time_t)(i + 1))
  514. fatal("DBENGINE METRIC: wrong first time returned");
  515. if(mrg_metric_get_latest_time_s(mrg, array[i][section]) != (time_t)(i + 2))
  516. fatal("DBENGINE METRIC: wrong latest time returned");
  517. if(mrg_metric_get_update_every_s(mrg, array[i][section]) != (time_t)(i + 3))
  518. fatal("DBENGINE METRIC: wrong latest time returned");
  519. if(!mrg_metric_set_first_time_s(mrg, array[i][section], (time_t)((i + 1) * 2)))
  520. fatal("DBENGINE METRIC: cannot set first time");
  521. if(!mrg_metric_set_clean_latest_time_s(mrg, array[i][section], (time_t) ((i + 1) * 3)))
  522. fatal("DBENGINE METRIC: cannot set latest time");
  523. if(!mrg_metric_set_update_every(mrg, array[i][section], (time_t)((i + 1) * 4)))
  524. fatal("DBENGINE METRIC: cannot set update every");
  525. if(mrg_metric_get_first_time_s(mrg, array[i][section]) != (time_t)((i + 1) * 2))
  526. fatal("DBENGINE METRIC: wrong first time returned");
  527. if(mrg_metric_get_latest_time_s(mrg, array[i][section]) != (time_t)((i + 1) * 3))
  528. fatal("DBENGINE METRIC: wrong latest time returned");
  529. if(mrg_metric_get_update_every_s(mrg, array[i][section]) != (time_t)((i + 1) * 4))
  530. fatal("DBENGINE METRIC: wrong latest time returned");
  531. }
  532. }
  533. for(size_t i = 0; i < entries ; i++) {
  534. for (size_t section = 0; section < sections; section++) {
  535. if(!mrg_metric_release_and_delete(mrg, array[i][section]))
  536. fatal("DBENGINE METRIC: failed to delete metric");
  537. }
  538. }
  539. }
  540. static void *mrg_stress_test_thread1(void *ptr) {
  541. MRG *mrg = ptr;
  542. for(int i = 0; i < 5 ; i++)
  543. mrg_stress(mrg, 10000, 5);
  544. return ptr;
  545. }
  546. static void *mrg_stress_test_thread2(void *ptr) {
  547. MRG *mrg = ptr;
  548. for(int i = 0; i < 10 ; i++)
  549. mrg_stress(mrg, 500, 50);
  550. return ptr;
  551. }
  552. static void *mrg_stress_test_thread3(void *ptr) {
  553. MRG *mrg = ptr;
  554. for(int i = 0; i < 50 ; i++)
  555. mrg_stress(mrg, 5000, 1);
  556. return ptr;
  557. }
  558. #endif
  559. int mrg_unittest(void) {
  560. MRG *mrg = mrg_create();
  561. METRIC *m1_t0, *m2_t0, *m3_t0, *m4_t0;
  562. METRIC *m1_t1, *m2_t1, *m3_t1, *m4_t1;
  563. bool ret;
  564. MRG_ENTRY entry = {
  565. .section = 0,
  566. .first_time_s = 2,
  567. .last_time_s = 3,
  568. .latest_update_every_s = 4,
  569. };
  570. uuid_generate(entry.uuid);
  571. m1_t0 = mrg_metric_add_and_acquire(mrg, entry, &ret);
  572. if(!ret)
  573. fatal("DBENGINE METRIC: failed to add metric");
  574. // add the same metric again
  575. m2_t0 = mrg_metric_add_and_acquire(mrg, entry, &ret);
  576. if(m2_t0 != m1_t0)
  577. fatal("DBENGINE METRIC: adding the same metric twice, does not return the same pointer");
  578. if(ret)
  579. fatal("DBENGINE METRIC: managed to add the same metric twice");
  580. m3_t0 = mrg_metric_get_and_acquire(mrg, &entry.uuid, entry.section);
  581. if(m3_t0 != m1_t0)
  582. fatal("DBENGINE METRIC: cannot find the metric added");
  583. // add the same metric again
  584. m4_t0 = mrg_metric_add_and_acquire(mrg, entry, &ret);
  585. if(m4_t0 != m1_t0)
  586. fatal("DBENGINE METRIC: adding the same metric twice, does not return the same pointer");
  587. if(ret)
  588. fatal("DBENGINE METRIC: managed to add the same metric twice");
  589. // add the same metric in another section
  590. entry.section = 1;
  591. m1_t1 = mrg_metric_add_and_acquire(mrg, entry, &ret);
  592. if(!ret)
  593. fatal("DBENGINE METRIC: failed to add metric in section %zu", (size_t)entry.section);
  594. // add the same metric again
  595. m2_t1 = mrg_metric_add_and_acquire(mrg, entry, &ret);
  596. if(m2_t1 != m1_t1)
  597. fatal("DBENGINE METRIC: adding the same metric twice (section %zu), does not return the same pointer", (size_t)entry.section);
  598. if(ret)
  599. fatal("DBENGINE METRIC: managed to add the same metric twice in (section 0)");
  600. m3_t1 = mrg_metric_get_and_acquire(mrg, &entry.uuid, entry.section);
  601. if(m3_t1 != m1_t1)
  602. fatal("DBENGINE METRIC: cannot find the metric added (section %zu)", (size_t)entry.section);
  603. // delete the first metric
  604. mrg_metric_release(mrg, m2_t0);
  605. mrg_metric_release(mrg, m3_t0);
  606. mrg_metric_release(mrg, m4_t0);
  607. mrg_metric_set_first_time_s(mrg, m1_t0, 0);
  608. mrg_metric_set_clean_latest_time_s(mrg, m1_t0, 0);
  609. mrg_metric_set_hot_latest_time_s(mrg, m1_t0, 0);
  610. if(!mrg_metric_release_and_delete(mrg, m1_t0))
  611. fatal("DBENGINE METRIC: cannot delete the first metric");
  612. m4_t1 = mrg_metric_get_and_acquire(mrg, &entry.uuid, entry.section);
  613. if(m4_t1 != m1_t1)
  614. fatal("DBENGINE METRIC: cannot find the metric added (section %zu), after deleting the first one", (size_t)entry.section);
  615. // delete the second metric
  616. mrg_metric_release(mrg, m2_t1);
  617. mrg_metric_release(mrg, m3_t1);
  618. mrg_metric_release(mrg, m4_t1);
  619. mrg_metric_set_first_time_s(mrg, m1_t1, 0);
  620. mrg_metric_set_clean_latest_time_s(mrg, m1_t1, 0);
  621. mrg_metric_set_hot_latest_time_s(mrg, m1_t1, 0);
  622. if(!mrg_metric_release_and_delete(mrg, m1_t1))
  623. fatal("DBENGINE METRIC: cannot delete the second metric");
  624. if(mrg->stats.entries != 0)
  625. fatal("DBENGINE METRIC: invalid entries counter");
  626. #ifdef MRG_STRESS_TEST
  627. usec_t started_ut = now_monotonic_usec();
  628. pthread_t thread1;
  629. netdata_thread_create(&thread1, "TH1",
  630. NETDATA_THREAD_OPTION_JOINABLE | NETDATA_THREAD_OPTION_DONT_LOG,
  631. mrg_stress_test_thread1, mrg);
  632. pthread_t thread2;
  633. netdata_thread_create(&thread2, "TH2",
  634. NETDATA_THREAD_OPTION_JOINABLE | NETDATA_THREAD_OPTION_DONT_LOG,
  635. mrg_stress_test_thread2, mrg);
  636. pthread_t thread3;
  637. netdata_thread_create(&thread3, "TH3",
  638. NETDATA_THREAD_OPTION_JOINABLE | NETDATA_THREAD_OPTION_DONT_LOG,
  639. mrg_stress_test_thread3, mrg);
  640. sleep_usec(5 * USEC_PER_SEC);
  641. netdata_thread_cancel(thread1);
  642. netdata_thread_cancel(thread2);
  643. netdata_thread_cancel(thread3);
  644. netdata_thread_join(thread1, NULL);
  645. netdata_thread_join(thread2, NULL);
  646. netdata_thread_join(thread3, NULL);
  647. usec_t ended_ut = now_monotonic_usec();
  648. info("DBENGINE METRIC: did %zu additions, %zu duplicate additions, "
  649. "%zu deletions, %zu wrong deletions, "
  650. "%zu successful searches, %zu wrong searches, "
  651. "%zu successful pointer validations, %zu wrong pointer validations "
  652. "in %llu usecs",
  653. mrg->stats.additions, mrg->stats.additions_duplicate,
  654. mrg->stats.deletions, mrg->stats.delete_misses,
  655. mrg->stats.search_hits, mrg->stats.search_misses,
  656. mrg->stats.pointer_validation_hits, mrg->stats.pointer_validation_misses,
  657. ended_ut - started_ut);
  658. #endif
  659. mrg_destroy(mrg);
  660. info("DBENGINE METRIC: all tests passed!");
  661. return 0;
  662. }