context.c 12 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338
  1. // SPDX-License-Identifier: GPL-3.0-or-later
  2. #include "internal.h"
  3. inline const char *rrdcontext_acquired_id(RRDCONTEXT_ACQUIRED *rca) {
  4. RRDCONTEXT *rc = rrdcontext_acquired_value(rca);
  5. return string2str(rc->id);
  6. }
  7. inline bool rrdcontext_acquired_belongs_to_host(RRDCONTEXT_ACQUIRED *rca, RRDHOST *host) {
  8. RRDCONTEXT *rc = rrdcontext_acquired_value(rca);
  9. return rc->rrdhost == host;
  10. }
  11. // ----------------------------------------------------------------------------
  12. // RRDCONTEXT
  13. static void rrdcontext_freez(RRDCONTEXT *rc) {
  14. string_freez(rc->id);
  15. string_freez(rc->title);
  16. string_freez(rc->units);
  17. string_freez(rc->family);
  18. }
  19. static void rrdcontext_insert_callback(const DICTIONARY_ITEM *item __maybe_unused, void *value, void *rrdhost) {
  20. RRDHOST *host = (RRDHOST *)rrdhost;
  21. RRDCONTEXT *rc = (RRDCONTEXT *)value;
  22. rc->rrdhost = host;
  23. rc->flags = rc->flags & RRD_FLAGS_ALLOWED_EXTERNALLY_ON_NEW_OBJECTS; // no need for atomics at constructor
  24. if(rc->hub.version) {
  25. // we are loading data from the SQL database
  26. if(rc->version)
  27. netdata_log_error("RRDCONTEXT: context '%s' is already initialized with version %"PRIu64", but it is loaded again from SQL with version %"PRIu64"", string2str(rc->id), rc->version, rc->hub.version);
  28. // IMPORTANT
  29. // replace all string pointers in rc->hub with our own versions
  30. // the originals are coming from a tmp allocation of sqlite
  31. string_freez(rc->id);
  32. rc->id = string_strdupz(rc->hub.id);
  33. rc->hub.id = string2str(rc->id);
  34. string_freez(rc->title);
  35. rc->title = string_strdupz(rc->hub.title);
  36. rc->hub.title = string2str(rc->title);
  37. string_freez(rc->units);
  38. rc->units = string_strdupz(rc->hub.units);
  39. rc->hub.units = string2str(rc->units);
  40. string_freez(rc->family);
  41. rc->family = string_strdupz(rc->hub.family);
  42. rc->hub.family = string2str(rc->family);
  43. rc->chart_type = rrdset_type_id(rc->hub.chart_type);
  44. rc->hub.chart_type = rrdset_type_name(rc->chart_type);
  45. rc->version = rc->hub.version;
  46. rc->priority = rc->hub.priority;
  47. rc->first_time_s = (time_t)rc->hub.first_time_s;
  48. rc->last_time_s = (time_t)rc->hub.last_time_s;
  49. if(rc->hub.deleted || !rc->hub.first_time_s)
  50. rrd_flag_set_deleted(rc, RRD_FLAG_NONE);
  51. else {
  52. if (rc->last_time_s == 0)
  53. rrd_flag_set_collected(rc);
  54. else
  55. rrd_flag_set_archived(rc);
  56. }
  57. rc->flags |= RRD_FLAG_UPDATE_REASON_LOAD_SQL; // no need for atomics at constructor
  58. }
  59. else {
  60. // we are adding this context now for the first time
  61. rc->version = now_realtime_sec();
  62. }
  63. rrdinstances_create_in_rrdcontext(rc);
  64. spinlock_init(&rc->spinlock);
  65. // signal the react callback to do the job
  66. rrd_flag_set_updated(rc, RRD_FLAG_UPDATE_REASON_NEW_OBJECT);
  67. }
  68. static void rrdcontext_delete_callback(const DICTIONARY_ITEM *item __maybe_unused, void *value, void *rrdhost __maybe_unused) {
  69. RRDCONTEXT *rc = (RRDCONTEXT *)value;
  70. rrdinstances_destroy_from_rrdcontext(rc);
  71. rrdcontext_freez(rc);
  72. }
  73. typedef enum __attribute__((packed)) {
  74. OLDNEW_KEEP_OLD,
  75. OLDNEW_USE_NEW,
  76. OLDNEW_MERGE,
  77. } OLDNEW;
  78. static inline OLDNEW oldnew_decide(bool archived, bool new_archived) {
  79. if(archived && !new_archived)
  80. return OLDNEW_USE_NEW;
  81. if(!archived && new_archived)
  82. return OLDNEW_KEEP_OLD;
  83. return OLDNEW_MERGE;
  84. }
  85. static inline void string_replace(STRING **stringpp, STRING *new_string) {
  86. STRING *old = *stringpp;
  87. *stringpp = string_dup(new_string);
  88. string_freez(old);
  89. }
  90. static inline void string_merge(STRING **stringpp, STRING *new_string) {
  91. STRING *old = *stringpp;
  92. *stringpp = string_2way_merge(*stringpp, new_string);
  93. string_freez(old);
  94. }
  95. static void rrdcontext_merge_with(RRDCONTEXT *rc, bool archived, STRING *title, STRING *family, STRING *units, RRDSET_TYPE chart_type, uint32_t priority) {
  96. OLDNEW oldnew = oldnew_decide(rrd_flag_is_archived(rc), archived);
  97. switch(oldnew) {
  98. case OLDNEW_KEEP_OLD:
  99. break;
  100. case OLDNEW_USE_NEW:
  101. if(rc->title != title) {
  102. string_replace(&rc->title, title);
  103. rrd_flag_set_updated(rc, RRD_FLAG_UPDATE_REASON_CHANGED_METADATA);
  104. }
  105. if(rc->family != family) {
  106. string_replace(&rc->family, family);
  107. rrd_flag_set_updated(rc, RRD_FLAG_UPDATE_REASON_CHANGED_METADATA);
  108. }
  109. break;
  110. case OLDNEW_MERGE:
  111. if(rc->title != title) {
  112. string_merge(&rc->title, title);
  113. rrd_flag_set_updated(rc, RRD_FLAG_UPDATE_REASON_CHANGED_METADATA);
  114. }
  115. if(rc->family != family) {
  116. string_merge(&rc->family, family);
  117. rrd_flag_set_updated(rc, RRD_FLAG_UPDATE_REASON_CHANGED_METADATA);
  118. }
  119. break;
  120. }
  121. switch(oldnew) {
  122. case OLDNEW_KEEP_OLD:
  123. break;
  124. case OLDNEW_USE_NEW:
  125. case OLDNEW_MERGE:
  126. if(rc->units != units) {
  127. string_replace(&rc->units, units);
  128. rrd_flag_set_updated(rc, RRD_FLAG_UPDATE_REASON_CHANGED_METADATA);
  129. }
  130. if(rc->chart_type != chart_type) {
  131. rc->chart_type = chart_type;
  132. rrd_flag_set_updated(rc, RRD_FLAG_UPDATE_REASON_CHANGED_METADATA);
  133. }
  134. if(rc->priority != priority) {
  135. rc->priority = priority;
  136. rrd_flag_set_updated(rc, RRD_FLAG_UPDATE_REASON_CHANGED_METADATA);
  137. }
  138. break;
  139. }
  140. }
  141. void rrdcontext_update_from_collected_rrdinstance(RRDINSTANCE *ri) {
  142. rrdcontext_merge_with(ri->rc, rrd_flag_is_archived(ri),
  143. ri->title, ri->family, ri->units, ri->chart_type, ri->priority);
  144. }
  145. static bool rrdcontext_conflict_callback(const DICTIONARY_ITEM *item __maybe_unused, void *old_value, void *new_value, void *rrdhost __maybe_unused) {
  146. RRDCONTEXT *rc = (RRDCONTEXT *)old_value;
  147. RRDCONTEXT *rc_new = (RRDCONTEXT *)new_value;
  148. //current rc is not archived, new_rc is archived, don't merge
  149. if (!rrd_flag_is_archived(rc) && rrd_flag_is_archived(rc_new)) {
  150. rrdcontext_freez(rc_new);
  151. return false;
  152. }
  153. rrdcontext_lock(rc);
  154. rrdcontext_merge_with(rc, rrd_flag_is_archived(rc_new),
  155. rc_new->title, rc_new->family, rc_new->units, rc_new->chart_type, rc_new->priority);
  156. rrd_flag_set(rc, rc_new->flags & RRD_FLAGS_ALLOWED_EXTERNALLY_ON_NEW_OBJECTS); // no need for atomics on rc_new
  157. if(rrd_flag_is_collected(rc) && rrd_flag_is_archived(rc))
  158. rrd_flag_set_collected(rc);
  159. if(rrd_flag_is_updated(rc))
  160. rrd_flag_set(rc, RRD_FLAG_UPDATE_REASON_UPDATED_OBJECT);
  161. rrdcontext_unlock(rc);
  162. // free the resources of the new one
  163. rrdcontext_freez(rc_new);
  164. // the react callback will continue from here
  165. return rrd_flag_is_updated(rc);
  166. }
  167. static void rrdcontext_react_callback(const DICTIONARY_ITEM *item __maybe_unused, void *value, void *rrdhost __maybe_unused) {
  168. RRDCONTEXT *rc = (RRDCONTEXT *)value;
  169. rrdcontext_trigger_updates(rc, __FUNCTION__ );
  170. }
  171. void rrdcontext_trigger_updates(RRDCONTEXT *rc, const char *function) {
  172. if(rrd_flag_is_updated(rc) || !rrd_flag_check(rc, RRD_FLAG_LIVE_RETENTION))
  173. rrdcontext_queue_for_post_processing(rc, function, rc->flags);
  174. }
  175. static void rrdcontext_hub_queue_insert_callback(const DICTIONARY_ITEM *item __maybe_unused, void *context, void *nothing __maybe_unused) {
  176. RRDCONTEXT *rc = context;
  177. rrd_flag_set(rc, RRD_FLAG_QUEUED_FOR_HUB);
  178. rc->queue.queued_ut = now_realtime_usec();
  179. rc->queue.queued_flags = rrd_flags_get(rc);
  180. }
  181. static void rrdcontext_hub_queue_delete_callback(const DICTIONARY_ITEM *item __maybe_unused, void *context, void *nothing __maybe_unused) {
  182. RRDCONTEXT *rc = context;
  183. rrd_flag_clear(rc, RRD_FLAG_QUEUED_FOR_HUB);
  184. }
  185. static bool rrdcontext_hub_queue_conflict_callback(const DICTIONARY_ITEM *item __maybe_unused, void *context, void *new_context __maybe_unused, void *nothing __maybe_unused) {
  186. // context and new_context are the same
  187. // we just need to update the timings
  188. RRDCONTEXT *rc = context;
  189. rrd_flag_set(rc, RRD_FLAG_QUEUED_FOR_HUB);
  190. rc->queue.queued_ut = now_realtime_usec();
  191. rc->queue.queued_flags |= rrd_flags_get(rc);
  192. return true;
  193. }
  194. static void rrdcontext_post_processing_queue_insert_callback(const DICTIONARY_ITEM *item __maybe_unused, void *context, void *nothing __maybe_unused) {
  195. RRDCONTEXT *rc = context;
  196. rrd_flag_set(rc, RRD_FLAG_QUEUED_FOR_PP);
  197. rc->pp.queued_flags = rc->flags;
  198. rc->pp.queued_ut = now_realtime_usec();
  199. }
  200. static void rrdcontext_post_processing_queue_delete_callback(const DICTIONARY_ITEM *item __maybe_unused, void *context, void *nothing __maybe_unused) {
  201. RRDCONTEXT *rc = context;
  202. rrd_flag_clear(rc, RRD_FLAG_QUEUED_FOR_PP);
  203. rc->pp.dequeued_ut = now_realtime_usec();
  204. }
  205. static bool rrdcontext_post_processing_queue_conflict_callback(const DICTIONARY_ITEM *item __maybe_unused, void *context, void *new_context __maybe_unused, void *nothing __maybe_unused) {
  206. RRDCONTEXT *rc = context;
  207. bool changed = false;
  208. if(!(rc->flags & RRD_FLAG_QUEUED_FOR_PP)) {
  209. rrd_flag_set(rc, RRD_FLAG_QUEUED_FOR_PP);
  210. changed = true;
  211. }
  212. if(rc->pp.queued_flags != rc->flags) {
  213. rc->pp.queued_flags |= rc->flags;
  214. changed = true;
  215. }
  216. return changed;
  217. }
  218. void rrdhost_create_rrdcontexts(RRDHOST *host) {
  219. if(unlikely(!host)) return;
  220. if(likely(host->rrdctx.contexts)) return;
  221. host->rrdctx.contexts = dictionary_create_advanced(
  222. DICT_OPTION_DONT_OVERWRITE_VALUE | DICT_OPTION_FIXED_SIZE,
  223. &dictionary_stats_category_rrdcontext, sizeof(RRDCONTEXT));
  224. dictionary_register_insert_callback(host->rrdctx.contexts, rrdcontext_insert_callback, host);
  225. dictionary_register_delete_callback(host->rrdctx.contexts, rrdcontext_delete_callback, host);
  226. dictionary_register_conflict_callback(host->rrdctx.contexts, rrdcontext_conflict_callback, host);
  227. dictionary_register_react_callback(host->rrdctx.contexts, rrdcontext_react_callback, host);
  228. host->rrdctx.hub_queue = dictionary_create_advanced(DICT_OPTION_DONT_OVERWRITE_VALUE | DICT_OPTION_VALUE_LINK_DONT_CLONE, &dictionary_stats_category_rrdcontext, 0);
  229. dictionary_register_insert_callback(host->rrdctx.hub_queue, rrdcontext_hub_queue_insert_callback, NULL);
  230. dictionary_register_delete_callback(host->rrdctx.hub_queue, rrdcontext_hub_queue_delete_callback, NULL);
  231. dictionary_register_conflict_callback(host->rrdctx.hub_queue, rrdcontext_hub_queue_conflict_callback, NULL);
  232. host->rrdctx.pp_queue = dictionary_create_advanced(DICT_OPTION_DONT_OVERWRITE_VALUE | DICT_OPTION_VALUE_LINK_DONT_CLONE, &dictionary_stats_category_rrdcontext, 0);
  233. dictionary_register_insert_callback(host->rrdctx.pp_queue, rrdcontext_post_processing_queue_insert_callback, NULL);
  234. dictionary_register_delete_callback(host->rrdctx.pp_queue, rrdcontext_post_processing_queue_delete_callback, NULL);
  235. dictionary_register_conflict_callback(host->rrdctx.pp_queue, rrdcontext_post_processing_queue_conflict_callback, NULL);
  236. }
  237. void rrdhost_destroy_rrdcontexts(RRDHOST *host) {
  238. if(unlikely(!host)) return;
  239. if(unlikely(!host->rrdctx.contexts)) return;
  240. DICTIONARY *old;
  241. if(host->rrdctx.hub_queue) {
  242. old = host->rrdctx.hub_queue;
  243. host->rrdctx.hub_queue = NULL;
  244. RRDCONTEXT *rc;
  245. dfe_start_write(old, rc) {
  246. dictionary_del(old, string2str(rc->id));
  247. }
  248. dfe_done(rc);
  249. dictionary_destroy(old);
  250. }
  251. if(host->rrdctx.pp_queue) {
  252. old = host->rrdctx.pp_queue;
  253. host->rrdctx.pp_queue = NULL;
  254. RRDCONTEXT *rc;
  255. dfe_start_write(old, rc) {
  256. dictionary_del(old, string2str(rc->id));
  257. }
  258. dfe_done(rc);
  259. dictionary_destroy(old);
  260. }
  261. old = host->rrdctx.contexts;
  262. host->rrdctx.contexts = NULL;
  263. dictionary_destroy(old);
  264. }