rrddim_mem.c 8.1 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222
  1. // SPDX-License-Identifier: GPL-3.0-or-later
  2. #include "rrddim_mem.h"
  3. // ----------------------------------------------------------------------------
  4. // RRDDIM legacy data collection functions
  5. STORAGE_METRIC_HANDLE *rrddim_metric_init(RRDDIM *rd, STORAGE_INSTANCE *db_instance __maybe_unused) {
  6. return (STORAGE_METRIC_HANDLE *)rd;
  7. }
  8. void rrddim_metric_free(STORAGE_METRIC_HANDLE *db_metric_handle __maybe_unused) {
  9. ;
  10. }
  11. STORAGE_COLLECT_HANDLE *rrddim_collect_init(STORAGE_METRIC_HANDLE *db_metric_handle) {
  12. RRDDIM *rd = (RRDDIM *)db_metric_handle;
  13. rd->db[rd->rrdset->current_entry] = SN_EMPTY_SLOT;
  14. struct mem_collect_handle *ch = calloc(1, sizeof(struct mem_collect_handle));
  15. ch->rd = rd;
  16. return (STORAGE_COLLECT_HANDLE *)ch;
  17. }
  18. void rrddim_collect_store_metric(STORAGE_COLLECT_HANDLE *collection_handle, usec_t point_in_time, NETDATA_DOUBLE number,
  19. NETDATA_DOUBLE min_value,
  20. NETDATA_DOUBLE max_value,
  21. uint16_t count,
  22. uint16_t anomaly_count,
  23. SN_FLAGS flags)
  24. {
  25. UNUSED(point_in_time);
  26. UNUSED(min_value);
  27. UNUSED(max_value);
  28. UNUSED(count);
  29. UNUSED(anomaly_count);
  30. struct mem_collect_handle *ch = (struct mem_collect_handle *)collection_handle;
  31. RRDDIM *rd = ch->rd;
  32. rd->db[rd->rrdset->current_entry] = pack_storage_number(number, flags);
  33. }
  34. void rrddim_store_metric_flush(STORAGE_COLLECT_HANDLE *collection_handle) {
  35. struct mem_collect_handle *ch = (struct mem_collect_handle *)collection_handle;
  36. RRDDIM *rd = ch->rd;
  37. memset(rd->db, 0, rd->entries * sizeof(storage_number));
  38. }
  39. int rrddim_collect_finalize(STORAGE_COLLECT_HANDLE *collection_handle) {
  40. free(collection_handle);
  41. return 0;
  42. }
  43. // ----------------------------------------------------------------------------
  44. // get the total duration in seconds of the round robin database
  45. #define rrddim_duration(st) (( (time_t)(rd)->rrdset->counter >= (time_t)(rd)->rrdset->entries ? (time_t)(rd)->rrdset->entries : (time_t)(rd)->rrdset->counter ) * (time_t)(rd)->rrdset->update_every)
  46. // get the last slot updated in the round robin database
  47. #define rrddim_last_slot(rd) ((size_t)(((rd)->rrdset->current_entry == 0) ? (rd)->rrdset->entries - 1 : (rd)->rrdset->current_entry - 1))
  48. // return the slot that has the oldest value
  49. #define rrddim_first_slot(rd) ((size_t)((rd)->rrdset->counter >= (size_t)(rd)->rrdset->entries ? (rd)->rrdset->current_entry : 0))
  50. // get the slot of the round robin database, for the given timestamp (t)
  51. // it always returns a valid slot, although may not be for the time requested if the time is outside the round robin database
  52. // only valid when not using dbengine
  53. static inline size_t rrddim_time2slot(RRDDIM *rd, time_t t) {
  54. size_t ret = 0;
  55. time_t last_entry_t = rrddim_query_latest_time((STORAGE_METRIC_HANDLE *)rd);
  56. time_t first_entry_t = rrddim_query_oldest_time((STORAGE_METRIC_HANDLE *)rd);
  57. size_t entries = rd->rrdset->entries;
  58. size_t first_slot = rrddim_first_slot(rd);
  59. size_t last_slot = rrddim_last_slot(rd);
  60. size_t update_every = rd->rrdset->update_every;
  61. if(t >= last_entry_t) {
  62. // the requested time is after the last entry we have
  63. ret = last_slot;
  64. }
  65. else {
  66. if(t <= first_entry_t) {
  67. // the requested time is before the first entry we have
  68. ret = first_slot;
  69. }
  70. else {
  71. if(last_slot >= (size_t)((last_entry_t - t) / update_every))
  72. ret = last_slot - ((last_entry_t - t) / update_every);
  73. else
  74. ret = last_slot - ((last_entry_t - t) / update_every) + entries;
  75. }
  76. }
  77. if(unlikely(ret >= entries)) {
  78. error("INTERNAL ERROR: rrddim_time2slot() on %s returns values outside entries", rd->name);
  79. ret = entries - 1;
  80. }
  81. return ret;
  82. }
  83. // get the timestamp of a specific slot in the round robin database
  84. // only valid when not using dbengine
  85. static inline time_t rrddim_slot2time(RRDDIM *rd, size_t slot) {
  86. time_t ret;
  87. time_t last_entry_t = rrddim_query_latest_time((STORAGE_METRIC_HANDLE *)rd);
  88. time_t first_entry_t = rrddim_query_oldest_time((STORAGE_METRIC_HANDLE *)rd);
  89. size_t entries = rd->rrdset->entries;
  90. size_t last_slot = rrddim_last_slot(rd);
  91. size_t update_every = rd->rrdset->update_every;
  92. if(slot >= entries) {
  93. error("INTERNAL ERROR: caller of rrddim_slot2time() gives invalid slot %zu", slot);
  94. slot = entries - 1;
  95. }
  96. if(slot > last_slot)
  97. ret = last_entry_t - (time_t)(update_every * (last_slot - slot + entries));
  98. else
  99. ret = last_entry_t - (time_t)(update_every * (last_slot - slot));
  100. if(unlikely(ret < first_entry_t)) {
  101. error("INTERNAL ERROR: rrddim_slot2time() on %s returns time too far in the past", rd->name);
  102. ret = first_entry_t;
  103. }
  104. if(unlikely(ret > last_entry_t)) {
  105. error("INTERNAL ERROR: rrddim_slot2time() on %s returns time into the future", rd->name);
  106. ret = last_entry_t;
  107. }
  108. return ret;
  109. }
  110. // ----------------------------------------------------------------------------
  111. // RRDDIM legacy database query functions
  112. void rrddim_query_init(STORAGE_METRIC_HANDLE *db_metric_handle, struct rrddim_query_handle *handle, time_t start_time, time_t end_time, TIER_QUERY_FETCH tier_query_fetch_type) {
  113. UNUSED(tier_query_fetch_type);
  114. RRDDIM *rd = (RRDDIM *)db_metric_handle;
  115. handle->rd = rd;
  116. handle->start_time = start_time;
  117. handle->end_time = end_time;
  118. struct mem_query_handle* h = calloc(1, sizeof(struct mem_query_handle));
  119. h->slot = rrddim_time2slot(rd, start_time);
  120. h->last_slot = rrddim_time2slot(rd, end_time);
  121. h->dt = rd->rrdset->update_every;
  122. h->next_timestamp = start_time;
  123. h->slot_timestamp = rrddim_slot2time(rd, h->slot);
  124. h->last_timestamp = rrddim_slot2time(rd, h->last_slot);
  125. // info("RRDDIM QUERY INIT: start %ld, end %ld, next %ld, first %ld, last %ld, dt %ld", start_time, end_time, h->next_timestamp, h->slot_timestamp, h->last_timestamp, h->dt);
  126. handle->handle = (STORAGE_QUERY_HANDLE *)h;
  127. }
  128. // Returns the metric and sets its timestamp into current_time
  129. // IT IS REQUIRED TO **ALWAYS** SET ALL RETURN VALUES (current_time, end_time, flags)
  130. // IT IS REQUIRED TO **ALWAYS** KEEP TRACK OF TIME, EVEN OUTSIDE THE DATABASE BOUNDARIES
  131. STORAGE_POINT rrddim_query_next_metric(struct rrddim_query_handle *handle) {
  132. RRDDIM *rd = handle->rd;
  133. struct mem_query_handle* h = (struct mem_query_handle*)handle->handle;
  134. size_t entries = rd->rrdset->entries;
  135. size_t slot = h->slot;
  136. STORAGE_POINT sp;
  137. sp.count = 1;
  138. time_t this_timestamp = h->next_timestamp;
  139. h->next_timestamp += h->dt;
  140. // set this timestamp for our caller
  141. sp.start_time = this_timestamp - h->dt;
  142. sp.end_time = this_timestamp;
  143. if(unlikely(this_timestamp < h->slot_timestamp)) {
  144. storage_point_empty(sp, sp.start_time, sp.end_time);
  145. return sp;
  146. }
  147. if(unlikely(this_timestamp > h->last_timestamp)) {
  148. storage_point_empty(sp, sp.start_time, sp.end_time);
  149. return sp;
  150. }
  151. storage_number n = rd->db[slot++];
  152. if(unlikely(slot >= entries)) slot = 0;
  153. h->slot = slot;
  154. h->slot_timestamp += h->dt;
  155. sp.anomaly_count = (n & SN_ANOMALY_BIT) ? 0 : 1;
  156. sp.flags = (n & SN_ALL_FLAGS);
  157. sp.min = sp.max = sp.sum = unpack_storage_number(n);
  158. return sp;
  159. }
  160. int rrddim_query_is_finished(struct rrddim_query_handle *handle) {
  161. struct mem_query_handle* h = (struct mem_query_handle*)handle->handle;
  162. return (h->next_timestamp > handle->end_time);
  163. }
  164. void rrddim_query_finalize(struct rrddim_query_handle *handle) {
  165. #ifdef NETDATA_INTERNAL_CHECKS
  166. if(!rrddim_query_is_finished(handle))
  167. error("QUERY: query for chart '%s' dimension '%s' has been stopped unfinished", handle->rd->rrdset->id, handle->rd->name);
  168. #endif
  169. freez(handle->handle);
  170. }
  171. time_t rrddim_query_latest_time(STORAGE_METRIC_HANDLE *db_metric_handle) {
  172. RRDDIM *rd = (RRDDIM *)db_metric_handle;
  173. return rd->rrdset->last_updated.tv_sec;
  174. }
  175. time_t rrddim_query_oldest_time(STORAGE_METRIC_HANDLE *db_metric_handle) {
  176. RRDDIM *rd = (RRDDIM *)db_metric_handle;
  177. return (time_t)(rd->rrdset->last_updated.tv_sec - rrddim_duration(rd));
  178. }