rrdcontext.c 103 KB


  1. // SPDX-License-Identifier: GPL-3.0-or-later
  2. #include "rrdcontext.h"
  3. #include "sqlite/sqlite_context.h"
  4. #include "aclk/schema-wrappers/context.h"
  5. #include "aclk/aclk_contexts_api.h"
  6. #include "aclk/aclk_api.h"
  7. int rrdcontext_enabled = CONFIG_BOOLEAN_YES;
  8. #define MESSAGES_PER_BUNDLE_TO_SEND_TO_HUB_PER_HOST 5000
  9. #define FULL_RETENTION_SCAN_DELAY_AFTER_DB_ROTATION_SECS 120
  10. #define RRDCONTEXT_WORKER_THREAD_HEARTBEAT_SECS 1
  11. #define RRDCONTEXT_MINIMUM_ALLOWED_PRIORITY 10
  12. // #define LOG_TRANSITIONS 1
  13. // #define LOG_RRDINSTANCES 1
  14. typedef enum {
  15. RRD_FLAG_NONE = 0,
  16. RRD_FLAG_DELETED = (1 << 0), // this is a deleted object (metrics, instances, contexts)
  17. RRD_FLAG_COLLECTED = (1 << 1), // this object is currently being collected
  18. RRD_FLAG_UPDATED = (1 << 2), // this object has updates to propagate
  19. RRD_FLAG_ARCHIVED = (1 << 3), // this object is not currently being collected
  20. RRD_FLAG_OWN_LABELS = (1 << 4), // this instance has its own labels - not linked to an RRDSET
  21. RRD_FLAG_LIVE_RETENTION = (1 << 5), // we have got live retention from the database
  22. RRD_FLAG_QUEUED = (1 << 6), // this context is currently queued to be dispatched to hub
  23. RRD_FLAG_DONT_PROCESS = (1 << 7), // don't process updates for this object
  24. RRD_FLAG_HIDDEN = (1 << 8), // don't expose this to the hub or the API
  25. RRD_FLAG_UPDATE_REASON_LOAD_SQL = (1 << 10), // this object has just been loaded from SQL
  26. RRD_FLAG_UPDATE_REASON_NEW_OBJECT = (1 << 11), // this object has just been created
  27. RRD_FLAG_UPDATE_REASON_UPDATED_OBJECT = (1 << 12), // we received an update on this object
  28. RRD_FLAG_UPDATE_REASON_CHANGED_LINKING = (1 << 13), // an instance or a metric switched RRDSET or RRDDIM
  29. RRD_FLAG_UPDATE_REASON_CHANGED_UUID = (1 << 14), // an instance or a metric changed UUID
  30. RRD_FLAG_UPDATE_REASON_CHANGED_NAME = (1 << 15), // an instance or a metric changed name
  31. RRD_FLAG_UPDATE_REASON_CHANGED_UNITS = (1 << 16), // this context or instance changed units
  32. RRD_FLAG_UPDATE_REASON_CHANGED_TITLE = (1 << 17), // this context or instance changed title
  33. RRD_FLAG_UPDATE_REASON_CHANGED_FAMILY = (1 << 18), // the context or the instance changed family
  34. RRD_FLAG_UPDATE_REASON_CHANGED_CHART_TYPE = (1 << 19), // this context or instance changed chart type
  35. RRD_FLAG_UPDATE_REASON_CHANGED_PRIORITY = (1 << 20), // this context or instance changed its priority
  36. RRD_FLAG_UPDATE_REASON_CHANGED_UPDATE_EVERY = (1 << 21), // the instance or the metric changed update frequency
  37. RRD_FLAG_UPDATE_REASON_ZERO_RETENTION = (1 << 22), // this object has not retention
  38. RRD_FLAG_UPDATE_REASON_CHANGED_FIRST_TIME_T = (1 << 23), // this object changed its oldest time in the db
  39. RRD_FLAG_UPDATE_REASON_CHANGED_LAST_TIME_T = (1 << 24), // this object change its latest time in the db
  40. RRD_FLAG_UPDATE_REASON_STOPPED_BEING_COLLECTED = (1 << 25), // this object has stopped being collected
  41. RRD_FLAG_UPDATE_REASON_STARTED_BEING_COLLECTED = (1 << 26), // this object has started being collected
  42. RRD_FLAG_UPDATE_REASON_DISCONNECTED_CHILD = (1 << 27), // this context belongs to a host that just disconnected
  43. RRD_FLAG_UPDATE_REASON_DB_ROTATION = (1 << 28), // this context changed because of a db rotation
  44. RRD_FLAG_UPDATE_REASON_UNUSED = (1 << 29), // this context is not used anymore
  45. RRD_FLAG_UPDATE_REASON_CHANGED_FLAGS = (1 << 30), // this context is not used anymore
  46. } RRD_FLAGS;
  47. #define RRD_FLAG_ALL_UPDATE_REASONS ( \
  48. RRD_FLAG_UPDATE_REASON_LOAD_SQL \
  49. |RRD_FLAG_UPDATE_REASON_NEW_OBJECT \
  50. |RRD_FLAG_UPDATE_REASON_UPDATED_OBJECT \
  51. |RRD_FLAG_UPDATE_REASON_CHANGED_LINKING \
  52. |RRD_FLAG_UPDATE_REASON_CHANGED_UUID \
  53. |RRD_FLAG_UPDATE_REASON_CHANGED_NAME \
  54. |RRD_FLAG_UPDATE_REASON_CHANGED_UNITS \
  55. |RRD_FLAG_UPDATE_REASON_CHANGED_TITLE \
  56. |RRD_FLAG_UPDATE_REASON_CHANGED_FAMILY \
  57. |RRD_FLAG_UPDATE_REASON_CHANGED_CHART_TYPE \
  58. |RRD_FLAG_UPDATE_REASON_CHANGED_PRIORITY \
  59. |RRD_FLAG_UPDATE_REASON_CHANGED_UPDATE_EVERY \
  60. |RRD_FLAG_UPDATE_REASON_ZERO_RETENTION \
  61. |RRD_FLAG_UPDATE_REASON_CHANGED_FIRST_TIME_T \
  62. |RRD_FLAG_UPDATE_REASON_CHANGED_LAST_TIME_T \
  63. |RRD_FLAG_UPDATE_REASON_STOPPED_BEING_COLLECTED \
  64. |RRD_FLAG_UPDATE_REASON_STARTED_BEING_COLLECTED \
  65. |RRD_FLAG_UPDATE_REASON_DISCONNECTED_CHILD \
  66. |RRD_FLAG_UPDATE_REASON_DB_ROTATION \
  67. |RRD_FLAG_UPDATE_REASON_UNUSED \
  68. |RRD_FLAG_UPDATE_REASON_CHANGED_FLAGS \
  69. )
  70. #define RRD_FLAGS_ALLOWED_EXTERNALLY_ON_NEW_OBJECTS ( \
  71. RRD_FLAG_ARCHIVED \
  72. |RRD_FLAG_DONT_PROCESS \
  73. |RRD_FLAG_HIDDEN \
  74. |RRD_FLAG_ALL_UPDATE_REASONS \
  75. )
  76. #define RRD_FLAGS_PREVENTING_DELETIONS ( \
  77. RRD_FLAG_QUEUED \
  78. |RRD_FLAG_COLLECTED \
  79. |RRD_FLAG_UPDATE_REASON_LOAD_SQL \
  80. |RRD_FLAG_UPDATE_REASON_NEW_OBJECT \
  81. |RRD_FLAG_UPDATE_REASON_UPDATED_OBJECT \
  82. |RRD_FLAG_UPDATE_REASON_CHANGED_LINKING \
  83. )
  84. #define rrd_flag_set_updated(obj, reason) (obj)->flags |= (RRD_FLAG_UPDATED | (reason))
  85. #define rrd_flag_unset_updated(obj) (obj)->flags &= ~(RRD_FLAG_UPDATED | RRD_FLAG_ALL_UPDATE_REASONS)
  86. #define rrd_flag_set_collected(obj) do { \
  87. if(likely( !((obj)->flags & RRD_FLAG_COLLECTED))) \
  88. (obj)->flags |= (RRD_FLAG_COLLECTED | RRD_FLAG_UPDATE_REASON_STARTED_BEING_COLLECTED | RRD_FLAG_UPDATED); \
  89. if(likely( ((obj)->flags & (RRD_FLAG_ARCHIVED | RRD_FLAG_UPDATE_REASON_STOPPED_BEING_COLLECTED)))) \
  90. (obj)->flags &= ~(RRD_FLAG_ARCHIVED | RRD_FLAG_UPDATE_REASON_STOPPED_BEING_COLLECTED); \
  91. if(unlikely(((obj)->flags & (RRD_FLAG_DELETED | RRD_FLAG_UPDATE_REASON_ZERO_RETENTION)))) \
  92. (obj)->flags &= ~(RRD_FLAG_DELETED | RRD_FLAG_UPDATE_REASON_ZERO_RETENTION); \
  93. if(unlikely(((obj)->flags & RRD_FLAG_DONT_PROCESS))) \
  94. (obj)->flags &= ~RRD_FLAG_DONT_PROCESS; \
  95. } while(0)
  96. #define rrd_flag_set_archived(obj) do { \
  97. if(likely( !((obj)->flags & RRD_FLAG_ARCHIVED))) \
  98. (obj)->flags |= (RRD_FLAG_ARCHIVED | RRD_FLAG_UPDATE_REASON_STOPPED_BEING_COLLECTED | RRD_FLAG_UPDATED); \
  99. if(likely( ((obj)->flags & (RRD_FLAG_COLLECTED | RRD_FLAG_UPDATE_REASON_STARTED_BEING_COLLECTED)))) \
  100. (obj)->flags &= ~(RRD_FLAG_COLLECTED | RRD_FLAG_UPDATE_REASON_STARTED_BEING_COLLECTED); \
  101. if(unlikely(((obj)->flags & (RRD_FLAG_DELETED | RRD_FLAG_UPDATE_REASON_ZERO_RETENTION)))) \
  102. (obj)->flags &= ~(RRD_FLAG_DELETED | RRD_FLAG_UPDATE_REASON_ZERO_RETENTION); \
  103. } while(0)
  104. #define rrd_flag_set_deleted(obj, reason) do { \
  105. if(likely( !((obj)->flags & RRD_FLAG_DELETED))) \
  106. (obj)->flags |= (RRD_FLAG_DELETED | RRD_FLAG_UPDATE_REASON_ZERO_RETENTION | RRD_FLAG_UPDATED | (reason)); \
  107. if(unlikely(((obj)->flags & RRD_FLAG_ARCHIVED))) \
  108. (obj)->flags &= ~RRD_FLAG_ARCHIVED; \
  109. if(likely( ((obj)->flags & RRD_FLAG_COLLECTED))) \
  110. (obj)->flags &= ~RRD_FLAG_COLLECTED; \
  111. } while(0)
  112. #define rrd_flag_is_collected(obj) ((obj)->flags & RRD_FLAG_COLLECTED)
  113. #define rrd_flag_is_archived(obj) ((obj)->flags & RRD_FLAG_ARCHIVED)
  114. static struct rrdcontext_reason {
  115. RRD_FLAGS flag;
  116. const char *name;
  117. usec_t delay_ut;
  118. } rrdcontext_reasons[] = {
  119. // context related
  120. { RRD_FLAG_UPDATE_REASON_NEW_OBJECT, "object created", 60 * USEC_PER_SEC },
  121. { RRD_FLAG_UPDATE_REASON_UPDATED_OBJECT, "object updated", 60 * USEC_PER_SEC },
  122. { RRD_FLAG_UPDATE_REASON_LOAD_SQL, "loaded from sql", 60 * USEC_PER_SEC },
  123. { RRD_FLAG_UPDATE_REASON_CHANGED_TITLE, "changed title", 30 * USEC_PER_SEC },
  124. { RRD_FLAG_UPDATE_REASON_CHANGED_UNITS, "changed units", 30 * USEC_PER_SEC },
  125. { RRD_FLAG_UPDATE_REASON_CHANGED_FAMILY, "changed family", 30 * USEC_PER_SEC },
  126. { RRD_FLAG_UPDATE_REASON_CHANGED_PRIORITY, "changed priority", 30 * USEC_PER_SEC },
  127. { RRD_FLAG_UPDATE_REASON_ZERO_RETENTION, "has no retention", 60 * USEC_PER_SEC },
  128. { RRD_FLAG_UPDATE_REASON_CHANGED_FIRST_TIME_T, "updated first_time_t", 30 * USEC_PER_SEC },
  129. { RRD_FLAG_UPDATE_REASON_CHANGED_LAST_TIME_T, "updated last_time_t", 60 * USEC_PER_SEC },
  130. { RRD_FLAG_UPDATE_REASON_CHANGED_CHART_TYPE, "changed chart type", 30 * USEC_PER_SEC },
  131. { RRD_FLAG_UPDATE_REASON_STOPPED_BEING_COLLECTED, "stopped collected", 60 * USEC_PER_SEC },
  132. { RRD_FLAG_UPDATE_REASON_STARTED_BEING_COLLECTED, "started collected", 0 * USEC_PER_SEC },
  133. { RRD_FLAG_UPDATE_REASON_UNUSED, "unused", 0 * USEC_PER_SEC },
  134. // not context related
  135. { RRD_FLAG_UPDATE_REASON_CHANGED_UUID, "changed uuid", 60 * USEC_PER_SEC },
  136. { RRD_FLAG_UPDATE_REASON_CHANGED_UPDATE_EVERY, "changed updated every",60 * USEC_PER_SEC },
  137. { RRD_FLAG_UPDATE_REASON_CHANGED_LINKING, "changed rrd link", 60 * USEC_PER_SEC },
  138. { RRD_FLAG_UPDATE_REASON_CHANGED_NAME, "changed name", 60 * USEC_PER_SEC },
  139. { RRD_FLAG_UPDATE_REASON_DISCONNECTED_CHILD, "child disconnected", 30 * USEC_PER_SEC },
  140. { RRD_FLAG_UPDATE_REASON_DB_ROTATION, "db rotation", 60 * USEC_PER_SEC },
  141. { RRD_FLAG_UPDATE_REASON_CHANGED_FLAGS, "changed flags", 60 * USEC_PER_SEC },
  142. // terminator
  143. { 0, NULL, 0 },
  144. };
  145. typedef struct rrdmetric {
  146. uuid_t uuid;
  147. STRING *id;
  148. STRING *name;
  149. RRDDIM *rrddim;
  150. time_t first_time_t;
  151. time_t last_time_t;
  152. RRD_FLAGS flags;
  153. struct rrdinstance *ri;
  154. usec_t created_ut; // the time this object was created
  155. } RRDMETRIC;
  156. typedef struct rrdinstance {
  157. uuid_t uuid;
  158. STRING *id;
  159. STRING *name;
  160. STRING *title;
  161. STRING *units;
  162. STRING *family;
  163. uint32_t priority;
  164. RRDSET_TYPE chart_type;
  165. RRD_FLAGS flags; // flags related to this instance
  166. time_t first_time_t;
  167. time_t last_time_t;
  168. int update_every; // data collection frequency
  169. RRDSET *rrdset; // pointer to RRDSET when collected, or NULL
  170. DICTIONARY *rrdlabels; // linked to RRDSET->state->chart_labels or own version
  171. struct rrdcontext *rc;
  172. DICTIONARY *rrdmetrics;
  173. } RRDINSTANCE;
  174. typedef struct rrdcontext {
  175. uint64_t version;
  176. STRING *id;
  177. STRING *title;
  178. STRING *units;
  179. STRING *family;
  180. uint32_t priority;
  181. RRDSET_TYPE chart_type;
  182. RRD_FLAGS flags;
  183. time_t first_time_t;
  184. time_t last_time_t;
  185. VERSIONED_CONTEXT_DATA hub;
  186. DICTIONARY *rrdinstances;
  187. RRDHOST *rrdhost;
  188. struct {
  189. RRD_FLAGS queued_flags; // the last flags that triggered the queueing
  190. usec_t queued_ut; // the last time this was queued
  191. usec_t delay_calc_ut; // the last time we calculated the scheduled_dispatched_ut
  192. usec_t scheduled_dispatch_ut; // the time it was/is scheduled to be sent
  193. usec_t dequeued_ut; // the last time we sent (or deduped) this context
  194. } queue;
  195. netdata_mutex_t mutex;
  196. } RRDCONTEXT;
  197. // ----------------------------------------------------------------------------
  198. // helper one-liners for RRDMETRIC
  199. static inline RRDMETRIC *rrdmetric_acquired_value(RRDMETRIC_ACQUIRED *rma) {
  200. return dictionary_acquired_item_value((DICTIONARY_ITEM *)rma);
  201. }
  202. static inline void rrdmetric_release(RRDMETRIC_ACQUIRED *rma) {
  203. RRDMETRIC *rm = rrdmetric_acquired_value(rma);
  204. dictionary_acquired_item_release(rm->ri->rrdmetrics, (DICTIONARY_ITEM *)rma);
  205. }
  206. // ----------------------------------------------------------------------------
  207. // helper one-liners for RRDINSTANCE
  208. static inline RRDINSTANCE_ACQUIRED *rrdinstance_dup(RRDINSTANCE_ACQUIRED *ria) {
  209. return (RRDINSTANCE_ACQUIRED *)dictionary_acquired_item_dup((DICTIONARY_ITEM *)ria);
  210. }
  211. static inline RRDINSTANCE *rrdinstance_acquired_value(RRDINSTANCE_ACQUIRED *ria) {
  212. return dictionary_acquired_item_value((DICTIONARY_ITEM *)ria);
  213. }
  214. static inline const char *rrdinstance_acquired_name(RRDINSTANCE_ACQUIRED *ria) {
  215. return dictionary_acquired_item_name((DICTIONARY_ITEM *)ria);
  216. }
  217. static inline void rrdinstance_release(RRDINSTANCE_ACQUIRED *ria) {
  218. RRDINSTANCE *ri = rrdinstance_acquired_value(ria);
  219. dictionary_acquired_item_release(ri->rc->rrdinstances, (DICTIONARY_ITEM *)ria);
  220. }
  221. // ----------------------------------------------------------------------------
  222. // helper one-liners for RRDCONTEXT
  223. static inline RRDCONTEXT_ACQUIRED *rrdcontext_dup(RRDCONTEXT_ACQUIRED *rca) {
  224. return (RRDCONTEXT_ACQUIRED *)dictionary_acquired_item_dup((DICTIONARY_ITEM *)rca);
  225. }
  226. static inline const char *rrdcontext_acquired_name(RRDCONTEXT_ACQUIRED *rca) {
  227. return dictionary_acquired_item_name((DICTIONARY_ITEM *)rca);
  228. }
  229. static inline RRDCONTEXT *rrdcontext_acquired_value(RRDCONTEXT_ACQUIRED *rca) {
  230. return dictionary_acquired_item_value((DICTIONARY_ITEM *)rca);
  231. }
  232. static inline RRDCONTEXT_ACQUIRED *rrdcontext_acquire(RRDHOST *host, const char *name) {
  233. return (RRDCONTEXT_ACQUIRED *)dictionary_get_and_acquire_item((DICTIONARY *)host->rrdctx, name);
  234. }
  235. static inline void rrdcontext_release(RRDCONTEXT_ACQUIRED *rca) {
  236. RRDCONTEXT *rc = rrdcontext_acquired_value(rca);
  237. dictionary_acquired_item_release((DICTIONARY *)rc->rrdhost->rrdctx, (DICTIONARY_ITEM *)rca);
  238. }
  239. static void rrdcontext_recalculate_context_retention(RRDCONTEXT *rc, RRD_FLAGS reason, int job_id);
  240. static void rrdcontext_recalculate_host_retention(RRDHOST *host, RRD_FLAGS reason, int job_id);
  241. #define rrdcontext_version_hash(host) rrdcontext_version_hash_with_callback(host, NULL, false, NULL)
  242. static uint64_t rrdcontext_version_hash_with_callback(RRDHOST *host, void (*callback)(RRDCONTEXT *, bool, void *), bool snapshot, void *bundle);
  243. void rrdcontext_delete_from_sql_unsafe(RRDCONTEXT *rc);
  244. #define rrdcontext_lock(rc) netdata_mutex_lock(&((rc)->mutex))
  245. #define rrdcontext_unlock(rc) netdata_mutex_unlock(&((rc)->mutex))
  246. // ----------------------------------------------------------------------------
  247. // Updates triggers
  248. static void rrdmetric_trigger_updates(RRDMETRIC *rm, bool force, bool escalate);
  249. static void rrdinstance_trigger_updates(RRDINSTANCE *ri, bool force, bool escalate);
  250. static void rrdcontext_trigger_updates(RRDCONTEXT *rc, bool force);
  251. // ----------------------------------------------------------------------------
  252. // visualizing flags
  253. static void rrd_flags_to_buffer(RRD_FLAGS flags, BUFFER *wb) {
  254. if(flags & RRD_FLAG_QUEUED)
  255. buffer_strcat(wb, "QUEUED ");
  256. if(flags & RRD_FLAG_DELETED)
  257. buffer_strcat(wb, "DELETED ");
  258. if(flags & RRD_FLAG_COLLECTED)
  259. buffer_strcat(wb, "COLLECTED ");
  260. if(flags & RRD_FLAG_UPDATED)
  261. buffer_strcat(wb, "UPDATED ");
  262. if(flags & RRD_FLAG_ARCHIVED)
  263. buffer_strcat(wb, "ARCHIVED ");
  264. if(flags & RRD_FLAG_OWN_LABELS)
  265. buffer_strcat(wb, "OWN_LABELS ");
  266. if(flags & RRD_FLAG_LIVE_RETENTION)
  267. buffer_strcat(wb, "LIVE_RETENTION ");
  268. if(flags & RRD_FLAG_DONT_PROCESS)
  269. buffer_strcat(wb, "DONT_PROCESS ");
  270. if(flags & RRD_FLAG_HIDDEN)
  271. buffer_strcat(wb, "HIDDEN ");
  272. }
  273. static void rrd_reasons_to_buffer(RRD_FLAGS flags, BUFFER *wb) {
  274. for(int i = 0, added = 0; rrdcontext_reasons[i].name ; i++) {
  275. if (flags & rrdcontext_reasons[i].flag) {
  276. if (added)
  277. buffer_strcat(wb, ", ");
  278. buffer_strcat(wb, rrdcontext_reasons[i].name);
  279. added++;
  280. }
  281. }
  282. }
  283. // ----------------------------------------------------------------------------
  284. // logging of all data collected
  285. #ifdef LOG_TRANSITIONS
  286. static void log_transition(STRING *metric, STRING *instance, STRING *context, RRD_FLAGS flags, const char *msg) {
  287. BUFFER *wb = buffer_create(1000);
  288. buffer_sprintf(wb, "RRD TRANSITION: context '%s'", string2str(context));
  289. if(instance)
  290. buffer_sprintf(wb, ", instance '%s'", string2str(instance));
  291. if(metric)
  292. buffer_sprintf(wb, ", metric '%s'", string2str(metric));
  293. buffer_sprintf(wb, ", triggered by %s: ", msg);
  294. rrd_flags_to_buffer(flags, wb);
  295. buffer_strcat(wb, ", reasons: ");
  296. rrd_reasons_to_buffer(flags, wb);
  297. internal_error(true, "%s", buffer_tostring(wb));
  298. buffer_free(wb);
  299. }
  300. #else
  301. #define log_transition(metric, instance, context, flags, msg) debug_dummy()
  302. #endif
  303. #ifdef LOG_RRDINSTANCES
  304. static void rrdinstance_log(RRDINSTANCE *ri, const char *msg) {
  305. char uuid[UUID_STR_LEN];
  306. uuid_unparse(ri->uuid, uuid);
  307. BUFFER *wb = buffer_create(1000);
  308. buffer_sprintf(wb,
  309. "RRDINSTANCE: %s id '%s' (host '%s'), uuid '%s', name '%s', context '%s', title '%s', units '%s', family '%s', priority %zu, chart type '%s', update every %d, rrdset '%s', flags %s%s%s%s%s%s%s%s, first_time_t %ld, last_time_t %ld",
  310. msg,
  311. string2str(ri->id),
  312. ri->rc->rrdhost->hostname,
  313. uuid,
  314. string2str(ri->name),
  315. string2str(ri->rc->id),
  316. string2str(ri->title),
  317. string2str(ri->units),
  318. string2str(ri->family),
  319. ri->priority,
  320. rrdset_type_name(ri->chart_type),
  321. ri->update_every,
  322. ri->rrdset?ri->rrdset->id:"NONE",
  323. ri->flags & RRD_FLAG_DELETED ?"DELETED ":"",
  324. ri->flags & RRD_FLAG_UPDATED ?"UPDATED ":"",
  325. rrd_flag_is_collected(ri) ?"COLLECTED ":"",
  326. rrd_flag_is_archived(ri) ?"ARCHIVED ":"",
  327. ri->flags & RRD_FLAG_OWNLABELS ?"OWNLABELS ":"",
  328. ri->flags & RRD_FLAG_LIVE_RETENTION ?"LIVE ":"",
  329. ri->flags & RRD_FLAG_QUEUED ?"QUEUED ":"",
  330. ri->flags & RRD_FLAG_DONT_TRIGGER ?"BLOCKED ":"",
  331. ri->first_time_t,
  332. ri->last_time_t
  333. );
  334. buffer_strcat(wb, ", update reasons: { ");
  335. for(int i = 0, added = 0; rrdcontext_reasons[i].name ;i++)
  336. if(ri->flags & rrdcontext_reasons[i].flag) {
  337. if(added) buffer_strcat(wb, ", ");
  338. buffer_strcat(wb, rrdcontext_reasons[i].name);
  339. added++;
  340. }
  341. buffer_strcat(wb, " }");
  342. buffer_strcat(wb, ", labels: { ");
  343. if(ri->rrdlabels) {
  344. if(!rrdlabels_to_buffer(ri->rrdlabels, wb, "", "=", "'", ", ", NULL, NULL, NULL, NULL))
  345. buffer_strcat(wb, "EMPTY }");
  346. else
  347. buffer_strcat(wb, " }");
  348. }
  349. else
  350. buffer_strcat(wb, "NONE }");
  351. buffer_strcat(wb, ", metrics: { ");
  352. if(ri->rrdmetrics) {
  353. RRDMETRIC *v;
  354. int i = 0;
  355. dfe_start_read((DICTIONARY *)ri->rrdmetrics, v) {
  356. buffer_sprintf(wb, "%s%s", i?",":"", v_name);
  357. i++;
  358. }
  359. dfe_done(v);
  360. if(!i)
  361. buffer_strcat(wb, "EMPTY }");
  362. else
  363. buffer_strcat(wb, " }");
  364. }
  365. else
  366. buffer_strcat(wb, "NONE }");
  367. internal_error(true, "%s", buffer_tostring(wb));
  368. buffer_free(wb);
  369. }
  370. #else
  371. #define rrdinstance_log(ir, msg) debug_dummy()
  372. #endif
  373. // ----------------------------------------------------------------------------
  374. // RRDMETRIC
  375. static void rrdmetric_free(RRDMETRIC *rm) {
  376. string_freez(rm->id);
  377. string_freez(rm->name);
  378. rm->id = NULL;
  379. rm->name = NULL;
  380. rm->ri = NULL;
  381. }
  382. static void rrdmetric_update_retention(RRDMETRIC *rm) {
  383. time_t min_first_time_t = LONG_MAX, max_last_time_t = 0;
  384. if(rm->rrddim) {
  385. min_first_time_t = rrddim_first_entry_t(rm->rrddim);
  386. max_last_time_t = rrddim_last_entry_t(rm->rrddim);
  387. }
  388. #ifdef ENABLE_DBENGINE
  389. else {
  390. RRDHOST *rrdhost = rm->ri->rc->rrdhost;
  391. for (int tier = 0; tier < storage_tiers; tier++) {
  392. if(!rrdhost->storage_instance[tier]) continue;
  393. time_t first_time_t, last_time_t;
  394. if (rrdeng_metric_retention_by_uuid(rrdhost->storage_instance[tier], &rm->uuid, &first_time_t, &last_time_t) == 0) {
  395. if (first_time_t < min_first_time_t)
  396. min_first_time_t = first_time_t;
  397. if (last_time_t > max_last_time_t)
  398. max_last_time_t = last_time_t;
  399. }
  400. }
  401. }
  402. #endif
  403. if(min_first_time_t == LONG_MAX)
  404. min_first_time_t = 0;
  405. if(min_first_time_t > max_last_time_t) {
  406. internal_error(true, "RRDMETRIC: retention of '%s' is flipped", string2str(rm->id));
  407. time_t tmp = min_first_time_t;
  408. min_first_time_t = max_last_time_t;
  409. max_last_time_t = tmp;
  410. }
  411. // check if retention changed
  412. if (min_first_time_t != rm->first_time_t) {
  413. rm->first_time_t = min_first_time_t;
  414. rrd_flag_set_updated(rm, RRD_FLAG_UPDATE_REASON_CHANGED_FIRST_TIME_T);
  415. }
  416. if (max_last_time_t != rm->last_time_t) {
  417. rm->last_time_t = max_last_time_t;
  418. rrd_flag_set_updated(rm, RRD_FLAG_UPDATE_REASON_CHANGED_LAST_TIME_T);
  419. }
  420. if(unlikely(!rm->first_time_t && !rm->last_time_t))
  421. rrd_flag_set_deleted(rm, RRD_FLAG_UPDATE_REASON_ZERO_RETENTION);
  422. rm->flags |= RRD_FLAG_LIVE_RETENTION;
  423. }
  424. // called when this rrdmetric is inserted to the rrdmetrics dictionary of a rrdinstance
  425. static void rrdmetric_insert_callback(const char *id __maybe_unused, void *value, void *data) {
  426. RRDMETRIC *rm = value;
  427. // link it to its parent
  428. rm->ri = data;
  429. // remove flags that we need to figure out at runtime
  430. rm->flags = rm->flags & RRD_FLAGS_ALLOWED_EXTERNALLY_ON_NEW_OBJECTS;
  431. rm->created_ut = now_realtime_usec();
  432. // signal the react callback to do the job
  433. rrd_flag_set_updated(rm, RRD_FLAG_UPDATE_REASON_NEW_OBJECT);
  434. }
  435. // called when this rrdmetric is deleted from the rrdmetrics dictionary of a rrdinstance
  436. static void rrdmetric_delete_callback(const char *id __maybe_unused, void *value, void *data __maybe_unused) {
  437. RRDMETRIC *rm = value;
  438. internal_error(rm->rrddim, "RRDMETRIC: '%s' is freed but there is a RRDDIM linked to it.", string2str(rm->id));
  439. // free the resources
  440. rrdmetric_free(rm);
  441. }
  442. // called when the same rrdmetric is inserted again to the rrdmetrics dictionary of a rrdinstance
  443. static void rrdmetric_conflict_callback(const char *id __maybe_unused, void *oldv, void *newv, void *data __maybe_unused) {
  444. RRDMETRIC *rm = oldv;
  445. RRDMETRIC *rm_new = newv;
  446. internal_error(rm->id != rm_new->id,
  447. "RRDMETRIC: '%s' cannot change id to '%s'",
  448. string2str(rm->id), string2str(rm_new->id));
  449. if(uuid_compare(rm->uuid, rm_new->uuid) != 0) {
  450. char uuid1[UUID_STR_LEN], uuid2[UUID_STR_LEN];
  451. uuid_unparse(rm->uuid, uuid1);
  452. uuid_unparse(rm_new->uuid, uuid2);
  453. internal_error(true, "RRDMETRIC: '%s' of instance '%s' changed uuid from '%s' to '%s'", string2str(rm->id), string2str(rm->ri->id), uuid1, uuid2);
  454. uuid_copy(rm->uuid, rm_new->uuid);
  455. rrd_flag_set_updated(rm, RRD_FLAG_UPDATE_REASON_CHANGED_UUID);
  456. }
  457. if(rm->rrddim && rm_new->rrddim && rm->rrddim != rm_new->rrddim) {
  458. rm->rrddim = rm_new->rrddim;
  459. rrd_flag_set_updated(rm, RRD_FLAG_UPDATE_REASON_CHANGED_LINKING);
  460. }
  461. if(rm->rrddim && uuid_compare(rm->uuid, rm->rrddim->metric_uuid) != 0) {
  462. char uuid1[UUID_STR_LEN], uuid2[UUID_STR_LEN];
  463. uuid_unparse(rm->uuid, uuid1);
  464. uuid_unparse(rm_new->uuid, uuid2);
  465. internal_error(true, "RRDMETRIC: '%s' is linked to RRDDIM '%s' but they have different UUIDs. RRDMETRIC has '%s', RRDDIM has '%s'", string2str(rm->id), rm->rrddim->id, uuid1, uuid2);
  466. }
  467. if(rm->rrddim != rm_new->rrddim)
  468. rm->rrddim = rm_new->rrddim;
  469. if(rm->name != rm_new->name) {
  470. STRING *old = rm->name;
  471. rm->name = string_dup(rm_new->name);
  472. string_freez(old);
  473. rrd_flag_set_updated(rm, RRD_FLAG_UPDATE_REASON_CHANGED_NAME);
  474. }
  475. if(!rm->first_time_t || (rm_new->first_time_t && rm_new->first_time_t < rm->first_time_t)) {
  476. rm->first_time_t = rm_new->first_time_t;
  477. rrd_flag_set_updated(rm, RRD_FLAG_UPDATE_REASON_CHANGED_FIRST_TIME_T);
  478. }
  479. if(!rm->last_time_t || (rm_new->last_time_t && rm_new->last_time_t > rm->last_time_t)) {
  480. rm->last_time_t = rm_new->last_time_t;
  481. rrd_flag_set_updated(rm, RRD_FLAG_UPDATE_REASON_CHANGED_LAST_TIME_T);
  482. }
  483. rm->flags |= (rm_new->flags & RRD_FLAGS_ALLOWED_EXTERNALLY_ON_NEW_OBJECTS);
  484. if(rrd_flag_is_collected(rm) && rrd_flag_is_archived(rm))
  485. rrd_flag_set_collected(rm);
  486. if(rm->flags & RRD_FLAG_UPDATED)
  487. rm->flags |= RRD_FLAG_UPDATE_REASON_UPDATED_OBJECT;
  488. rrdmetric_free(rm_new);
  489. // the react callback will continue from here
  490. }
  491. static void rrdmetric_react_callback(const char *id __maybe_unused, void *value, void *data __maybe_unused) {
  492. RRDMETRIC *rm = value;
  493. rrdmetric_trigger_updates(rm, false, true);
  494. }
  495. static void rrdmetrics_create(RRDINSTANCE *ri) {
  496. if(unlikely(!ri)) return;
  497. if(likely(ri->rrdmetrics)) return;
  498. ri->rrdmetrics = dictionary_create(DICTIONARY_FLAG_DONT_OVERWRITE_VALUE);
  499. dictionary_register_insert_callback(ri->rrdmetrics, rrdmetric_insert_callback, (void *)ri);
  500. dictionary_register_delete_callback(ri->rrdmetrics, rrdmetric_delete_callback, (void *)ri);
  501. dictionary_register_conflict_callback(ri->rrdmetrics, rrdmetric_conflict_callback, (void *)ri);
  502. dictionary_register_react_callback(ri->rrdmetrics, rrdmetric_react_callback, (void *)ri);
  503. }
  504. static void rrdmetrics_destroy(RRDINSTANCE *ri) {
  505. if(unlikely(!ri || !ri->rrdmetrics)) return;
  506. dictionary_destroy(ri->rrdmetrics);
  507. ri->rrdmetrics = NULL;
  508. }
  509. static inline bool rrdmetric_should_be_deleted(RRDMETRIC *rm) {
  510. if(likely(!(rm->flags & RRD_FLAG_DELETED)))
  511. return false;
  512. if(likely(!(rm->flags & RRD_FLAG_LIVE_RETENTION)))
  513. return false;
  514. if(unlikely(rm->flags & RRD_FLAGS_PREVENTING_DELETIONS))
  515. return false;
  516. if(likely(rm->rrddim))
  517. return false;
  518. if((now_realtime_usec() - rm->created_ut) < 600 * USEC_PER_SEC)
  519. return false;
  520. rrdmetric_update_retention(rm);
  521. if(rm->first_time_t || rm->last_time_t)
  522. return false;
  523. return true;
  524. }
  525. static void rrdmetric_trigger_updates(RRDMETRIC *rm, bool force, bool escalate) {
  526. if(likely(!force && !(rm->flags & RRD_FLAG_UPDATED))) return;
  527. if(unlikely(rrd_flag_is_collected(rm) && !rm->rrddim))
  528. rrd_flag_set_archived(rm);
  529. if(unlikely((rm->flags & RRD_FLAG_UPDATE_REASON_DISCONNECTED_CHILD) && rrd_flag_is_collected(rm)))
  530. rrd_flag_set_archived(rm);
  531. rrdmetric_update_retention(rm);
  532. if(unlikely(escalate && rm->flags & RRD_FLAG_UPDATED && !(rm->ri->flags & RRD_FLAG_DONT_PROCESS))) {
  533. log_transition(rm->id, rm->ri->id, rm->ri->rc->id, rm->flags, "RRDMETRIC");
  534. rrdinstance_trigger_updates(rm->ri, true, true);
  535. }
  536. }
  537. static inline void rrdmetric_from_rrddim(RRDDIM *rd) {
  538. if(unlikely(!rd->rrdset))
  539. fatal("RRDMETRIC: rrddim '%s' does not have a rrdset.", rd->id);
  540. if(unlikely(!rd->rrdset->rrdhost))
  541. fatal("RRDMETRIC: rrdset '%s' does not have a rrdhost", rd->rrdset->id);
  542. if(unlikely(!rd->rrdset->rrdinstance))
  543. fatal("RRDMETRIC: rrdset '%s' does not have a rrdinstance", rd->rrdset->id);
  544. RRDINSTANCE *ri = rrdinstance_acquired_value(rd->rrdset->rrdinstance);
  545. RRDMETRIC trm = {
  546. .id = string_strdupz(rd->id),
  547. .name = string_strdupz(rd->name),
  548. .flags = RRD_FLAG_NONE,
  549. .rrddim = rd,
  550. };
  551. uuid_copy(trm.uuid, rd->metric_uuid);
  552. RRDMETRIC_ACQUIRED *rma = (RRDMETRIC_ACQUIRED *)dictionary_set_and_acquire_item(ri->rrdmetrics, string2str(trm.id), &trm, sizeof(trm));
  553. if(rd->rrdmetric)
  554. rrdmetric_release(rd->rrdmetric);
  555. rd->rrdmetric = rma;
  556. }
  557. #define rrddim_get_rrdmetric(rd) rrddim_get_rrdmetric_with_trace(rd, __FUNCTION__)
  558. static inline RRDMETRIC *rrddim_get_rrdmetric_with_trace(RRDDIM *rd, const char *function) {
  559. if(unlikely(!rd->rrdmetric)) {
  560. error("RRDMETRIC: RRDDIM '%s' is not linked to an RRDMETRIC at %s()", rd->id, function);
  561. return NULL;
  562. }
  563. RRDMETRIC *rm = rrdmetric_acquired_value(rd->rrdmetric);
  564. if(unlikely(rm->rrddim != rd))
  565. fatal("RRDMETRIC: '%s' is not linked to RRDDIM '%s' at %s()", string2str(rm->id), rd->id, function);
  566. return rm;
  567. }
  568. static inline void rrdmetric_rrddim_is_freed(RRDDIM *rd) {
  569. RRDMETRIC *rm = rrddim_get_rrdmetric(rd);
  570. if(unlikely(!rm)) return;
  571. if(unlikely(rrd_flag_is_collected(rm)))
  572. rrd_flag_set_archived(rm);
  573. rm->rrddim = NULL;
  574. rrdmetric_trigger_updates(rm, false, true);
  575. rrdmetric_release(rd->rrdmetric);
  576. rd->rrdmetric = NULL;
  577. }
  578. static inline void rrdmetric_updated_rrddim_flags(RRDDIM *rd) {
  579. RRDMETRIC *rm = rrddim_get_rrdmetric(rd);
  580. if(unlikely(!rm)) return;
  581. if(unlikely(rd->flags & (RRDDIM_FLAG_ARCHIVED | RRDDIM_FLAG_OBSOLETE))) {
  582. if(unlikely(rrd_flag_is_collected(rm)))
  583. rrd_flag_set_archived(rm);
  584. }
  585. rrdmetric_trigger_updates(rm, false, true);
  586. }
  587. static inline void rrdmetric_collected_rrddim(RRDDIM *rd) {
  588. RRDMETRIC *rm = rrddim_get_rrdmetric(rd);
  589. if(unlikely(!rm)) return;
  590. if(unlikely(!rrd_flag_is_collected(rm)))
  591. rrd_flag_set_collected(rm);
  592. rrdmetric_trigger_updates(rm, false, true);
  593. }
  594. // ----------------------------------------------------------------------------
  595. // RRDINSTANCE
  596. static void rrdinstance_free(RRDINSTANCE *ri) {
  597. if(ri->flags & RRD_FLAG_OWN_LABELS)
  598. dictionary_destroy(ri->rrdlabels);
  599. rrdmetrics_destroy(ri);
  600. string_freez(ri->id);
  601. string_freez(ri->name);
  602. string_freez(ri->title);
  603. string_freez(ri->units);
  604. string_freez(ri->family);
  605. ri->id = NULL;
  606. ri->name = NULL;
  607. ri->title = NULL;
  608. ri->units = NULL;
  609. ri->family = NULL;
  610. ri->rc = NULL;
  611. ri->rrdlabels = NULL;
  612. ri->rrdmetrics = NULL;
  613. ri->rrdset = NULL;
  614. }
  615. static void rrdinstance_insert_callback(const char *id __maybe_unused, void *value, void *data) {
  616. static STRING *ml_anomaly_rates_id = NULL;
  617. if(unlikely(!ml_anomaly_rates_id))
  618. ml_anomaly_rates_id = string_strdupz(ML_ANOMALY_RATES_CHART_ID);
  619. RRDINSTANCE *ri = value;
  620. // link it to its parent
  621. ri->rc = data;
  622. ri->flags = ri->flags & RRD_FLAGS_ALLOWED_EXTERNALLY_ON_NEW_OBJECTS;
  623. if(!ri->name)
  624. ri->name = string_dup(ri->id);
  625. if(ri->rrdset && ri->rrdset->state) {
  626. ri->rrdlabels = ri->rrdset->state->chart_labels;
  627. if(ri->flags & RRD_FLAG_OWN_LABELS)
  628. ri->flags &= ~RRD_FLAG_OWN_LABELS;
  629. }
  630. else {
  631. ri->rrdlabels = rrdlabels_create();
  632. ri->flags |= RRD_FLAG_OWN_LABELS;
  633. }
  634. if(ri->rrdset) {
  635. if(unlikely((ri->rrdset->flags & RRDSET_FLAG_HIDDEN) || (ri->rrdset->state && ri->rrdset->state->is_ar_chart)))
  636. ri->flags |= RRD_FLAG_HIDDEN;
  637. else
  638. ri->flags &= ~RRD_FLAG_HIDDEN;
  639. }
  640. // we need this when loading from SQL
  641. if(unlikely(ri->id == ml_anomaly_rates_id))
  642. ri->flags |= RRD_FLAG_HIDDEN;
  643. rrdmetrics_create(ri);
  644. rrdinstance_log(ri, "INSERT");
  645. // signal the react callback to do the job
  646. rrd_flag_set_updated(ri, RRD_FLAG_UPDATE_REASON_NEW_OBJECT);
  647. }
  648. static void rrdinstance_delete_callback(const char *id, void *value, void *data) {
  649. (void)id;
  650. RRDCONTEXT *rc = data; (void)rc;
  651. RRDINSTANCE *ri = (RRDINSTANCE *)value;
  652. rrdinstance_log(ri, "DELETE");
  653. internal_error(ri->rrdset, "RRDINSTANCE: '%s' is freed but there is a RRDSET linked to it.", string2str(ri->id));
  654. rrdinstance_free(ri);
  655. }
  656. static void rrdinstance_conflict_callback(const char *id __maybe_unused, void *oldv, void *newv, void *data __maybe_unused) {
  657. RRDINSTANCE *ri = (RRDINSTANCE *)oldv;
  658. RRDINSTANCE *ri_new = (RRDINSTANCE *)newv;
  659. internal_error(ri->id != ri_new->id,
  660. "RRDINSTANCE: '%s' cannot change id to '%s'",
  661. string2str(ri->id), string2str(ri_new->id));
  662. if(uuid_compare(ri->uuid, ri_new->uuid) != 0) {
  663. uuid_copy(ri->uuid, ri_new->uuid);
  664. rrd_flag_set_updated(ri, RRD_FLAG_UPDATE_REASON_CHANGED_UUID);
  665. }
  666. if(ri->rrdset && ri_new->rrdset && ri->rrdset != ri_new->rrdset) {
  667. ri->rrdset = ri_new->rrdset;
  668. rrd_flag_set_updated(ri, RRD_FLAG_UPDATE_REASON_CHANGED_LINKING);
  669. }
  670. if(ri->rrdset && ri->rrdset->chart_uuid && uuid_compare(ri->uuid, *ri->rrdset->chart_uuid) != 0) {
  671. char uuid1[UUID_STR_LEN], uuid2[UUID_STR_LEN];
  672. uuid_unparse(ri->uuid, uuid1);
  673. uuid_unparse(*ri->rrdset->chart_uuid, uuid2);
  674. internal_error(true, "RRDINSTANCE: '%s' is linked to RRDSET '%s' but they have different UUIDs. RRDINSTANCE has '%s', RRDSET has '%s'", string2str(ri->id), ri->rrdset->id, uuid1, uuid2);
  675. }
  676. if(ri->name != ri_new->name) {
  677. STRING *old = ri->name;
  678. ri->name = string_dup(ri_new->name);
  679. string_freez(old);
  680. rrd_flag_set_updated(ri, RRD_FLAG_UPDATE_REASON_CHANGED_NAME);
  681. }
  682. if(ri->title != ri_new->title) {
  683. STRING *old = ri->title;
  684. ri->title = string_dup(ri_new->title);
  685. string_freez(old);
  686. rrd_flag_set_updated(ri, RRD_FLAG_UPDATE_REASON_CHANGED_TITLE);
  687. }
  688. if(ri->units != ri_new->units) {
  689. STRING *old = ri->units;
  690. ri->units = string_dup(ri_new->units);
  691. string_freez(old);
  692. rrd_flag_set_updated(ri, RRD_FLAG_UPDATE_REASON_CHANGED_UNITS);
  693. }
  694. if(ri->family != ri_new->family) {
  695. STRING *old = ri->family;
  696. ri->family = string_dup(ri_new->family);
  697. string_freez(old);
  698. rrd_flag_set_updated(ri, RRD_FLAG_UPDATE_REASON_CHANGED_FAMILY);
  699. }
  700. if(ri->chart_type != ri_new->chart_type) {
  701. ri->chart_type = ri_new->chart_type;
  702. rrd_flag_set_updated(ri, RRD_FLAG_UPDATE_REASON_CHANGED_CHART_TYPE);
  703. }
  704. if(ri->priority != ri_new->priority) {
  705. ri->priority = ri_new->priority;
  706. rrd_flag_set_updated(ri, RRD_FLAG_UPDATE_REASON_CHANGED_PRIORITY);
  707. }
  708. if(ri->update_every != ri_new->update_every) {
  709. ri->update_every = ri_new->update_every;
  710. rrd_flag_set_updated(ri, RRD_FLAG_UPDATE_REASON_CHANGED_UPDATE_EVERY);
  711. }
  712. if(ri->rrdset != ri_new->rrdset) {
  713. ri->rrdset = ri_new->rrdset;
  714. if(ri->rrdset && (ri->flags & RRD_FLAG_OWN_LABELS)) {
  715. DICTIONARY *old = ri->rrdlabels;
  716. ri->rrdlabels = ri->rrdset->state->chart_labels;
  717. ri->flags &= ~RRD_FLAG_OWN_LABELS;
  718. rrdlabels_destroy(old);
  719. }
  720. else if(!ri->rrdset && !(ri->flags & RRD_FLAG_OWN_LABELS)) {
  721. ri->rrdlabels = rrdlabels_create();
  722. ri->flags |= RRD_FLAG_OWN_LABELS;
  723. }
  724. }
  725. if(ri->rrdset) {
  726. if(unlikely((ri->rrdset->flags & RRDSET_FLAG_HIDDEN) || (ri->rrdset->state && ri->rrdset->state->is_ar_chart)))
  727. ri->flags |= RRD_FLAG_HIDDEN;
  728. else
  729. ri->flags &= ~RRD_FLAG_HIDDEN;
  730. }
  731. ri->flags |= (ri_new->flags & RRD_FLAGS_ALLOWED_EXTERNALLY_ON_NEW_OBJECTS);
  732. if(rrd_flag_is_collected(ri) && rrd_flag_is_archived(ri))
  733. rrd_flag_set_collected(ri);
  734. if(ri->flags & RRD_FLAG_UPDATED)
  735. ri->flags |= RRD_FLAG_UPDATE_REASON_UPDATED_OBJECT;
  736. rrdinstance_log(ri, "CONFLICT");
  737. // free the new one
  738. rrdinstance_free(ri_new);
  739. // the react callback will continue from here
  740. }
  741. static void rrdinstance_react_callback(const char *id __maybe_unused, void *value, void *data __maybe_unused) {
  742. RRDINSTANCE *ri = value;
  743. rrdinstance_trigger_updates(ri, false, true);
  744. }
  745. void rrdinstances_create(RRDCONTEXT *rc) {
  746. if(unlikely(rrdcontext_enabled == CONFIG_BOOLEAN_NO))
  747. return;
  748. if(unlikely(!rc || rc->rrdinstances)) return;
  749. rc->rrdinstances = dictionary_create(DICTIONARY_FLAG_DONT_OVERWRITE_VALUE);
  750. dictionary_register_insert_callback(rc->rrdinstances, rrdinstance_insert_callback, (void *)rc);
  751. dictionary_register_delete_callback(rc->rrdinstances, rrdinstance_delete_callback, (void *)rc);
  752. dictionary_register_conflict_callback(rc->rrdinstances, rrdinstance_conflict_callback, (void *)rc);
  753. dictionary_register_react_callback(rc->rrdinstances, rrdinstance_react_callback, (void *)rc);
  754. }
  755. void rrdinstances_destroy(RRDCONTEXT *rc) {
  756. if(unlikely(!rc || !rc->rrdinstances)) return;
  757. dictionary_destroy(rc->rrdinstances);
  758. rc->rrdinstances = NULL;
  759. }
  760. static inline bool rrdinstance_should_be_deleted(RRDINSTANCE *ri) {
  761. if(likely(!(ri->flags & RRD_FLAG_DELETED)))
  762. return false;
  763. if(likely(!(ri->flags & RRD_FLAG_LIVE_RETENTION)))
  764. return false;
  765. if(unlikely(ri->flags & RRD_FLAGS_PREVENTING_DELETIONS))
  766. return false;
  767. if(likely(ri->rrdset))
  768. return false;
  769. if(unlikely(dictionary_stats_referenced_items(ri->rrdmetrics) != 0))
  770. return false;
  771. if(unlikely(dictionary_stats_entries(ri->rrdmetrics) != 0))
  772. return false;
  773. if(ri->first_time_t || ri->last_time_t)
  774. return false;
  775. return true;
  776. }
  777. static void rrdinstance_trigger_updates(RRDINSTANCE *ri, bool force, bool escalate) {
  778. if(unlikely(ri->flags & RRD_FLAG_DONT_PROCESS)) return;
  779. if(unlikely(!force && !(ri->flags & RRD_FLAG_UPDATED))) return;
  780. if(likely(ri->rrdset)) {
  781. if(unlikely(ri->rrdset->priority != ri->priority)) {
  782. ri->priority = ri->rrdset->priority;
  783. rrd_flag_set_updated(ri, RRD_FLAG_UPDATE_REASON_CHANGED_PRIORITY);
  784. }
  785. if(unlikely(ri->rrdset->update_every != ri->update_every)) {
  786. ri->update_every = ri->rrdset->update_every;
  787. rrd_flag_set_updated(ri, RRD_FLAG_UPDATE_REASON_CHANGED_UPDATE_EVERY);
  788. }
  789. }
  790. else if(unlikely(rrd_flag_is_collected(ri))) {
  791. rrd_flag_set_archived(ri);
  792. rrd_flag_set_updated(ri, RRD_FLAG_UPDATE_REASON_CHANGED_LINKING);
  793. }
  794. time_t min_first_time_t = LONG_MAX, max_last_time_t = 0;
  795. size_t metrics_active = 0, metrics_deleted = 0;
  796. bool live_retention = true, currently_collected = false;
  797. {
  798. RRDMETRIC *rm;
  799. dfe_start_read((DICTIONARY *)ri->rrdmetrics, rm) {
  800. if(!(rm->flags & RRD_FLAG_LIVE_RETENTION))
  801. live_retention = false;
  802. if (unlikely((rrdmetric_should_be_deleted(rm)))) {
  803. metrics_deleted++;
  804. rrd_flag_unset_updated(rm);
  805. continue;
  806. }
  807. if(rm->flags & RRD_FLAG_COLLECTED)
  808. currently_collected = true;
  809. metrics_active++;
  810. if (rm->first_time_t && rm->first_time_t < min_first_time_t)
  811. min_first_time_t = rm->first_time_t;
  812. if (rm->last_time_t && rm->last_time_t > max_last_time_t)
  813. max_last_time_t = rm->last_time_t;
  814. rrd_flag_unset_updated(rm);
  815. }
  816. dfe_done(rm);
  817. }
  818. if(live_retention && !(ri->flags & RRD_FLAG_LIVE_RETENTION))
  819. ri->flags |= RRD_FLAG_LIVE_RETENTION;
  820. else if(!live_retention && (ri->flags & RRD_FLAG_LIVE_RETENTION))
  821. ri->flags &= ~RRD_FLAG_LIVE_RETENTION;
  822. if(unlikely(!metrics_active)) {
  823. // no metrics available
  824. if(ri->first_time_t) {
  825. ri->first_time_t = 0;
  826. rrd_flag_set_updated(ri, RRD_FLAG_UPDATE_REASON_CHANGED_FIRST_TIME_T);
  827. }
  828. if(ri->last_time_t) {
  829. ri->last_time_t = 0;
  830. rrd_flag_set_updated(ri, RRD_FLAG_UPDATE_REASON_CHANGED_LAST_TIME_T);
  831. }
  832. rrd_flag_set_deleted(ri, RRD_FLAG_UPDATE_REASON_ZERO_RETENTION);
  833. }
  834. else {
  835. // we have active metrics...
  836. if (unlikely(min_first_time_t == LONG_MAX))
  837. min_first_time_t = 0;
  838. if (unlikely(min_first_time_t == 0 || max_last_time_t == 0)) {
  839. if(ri->first_time_t) {
  840. ri->first_time_t = 0;
  841. rrd_flag_set_updated(ri, RRD_FLAG_UPDATE_REASON_CHANGED_FIRST_TIME_T);
  842. }
  843. if(ri->last_time_t) {
  844. ri->last_time_t = 0;
  845. rrd_flag_set_updated(ri, RRD_FLAG_UPDATE_REASON_CHANGED_LAST_TIME_T);
  846. }
  847. if(unlikely(live_retention))
  848. rrd_flag_set_deleted(ri, RRD_FLAG_UPDATE_REASON_ZERO_RETENTION);
  849. }
  850. else {
  851. ri->flags &= ~RRD_FLAG_UPDATE_REASON_ZERO_RETENTION;
  852. if (unlikely(ri->first_time_t != min_first_time_t)) {
  853. ri->first_time_t = min_first_time_t;
  854. rrd_flag_set_updated(ri, RRD_FLAG_UPDATE_REASON_CHANGED_FIRST_TIME_T);
  855. }
  856. if (unlikely(ri->last_time_t != max_last_time_t)) {
  857. ri->last_time_t = max_last_time_t;
  858. rrd_flag_set_updated(ri, RRD_FLAG_UPDATE_REASON_CHANGED_LAST_TIME_T);
  859. }
  860. if(likely(currently_collected))
  861. rrd_flag_set_collected(ri);
  862. else
  863. rrd_flag_set_archived(ri);
  864. }
  865. }
  866. if(unlikely(escalate && ri->flags & RRD_FLAG_UPDATED && !(ri->rc->flags & RRD_FLAG_DONT_PROCESS))) {
  867. log_transition(NULL, ri->id, ri->rc->id, ri->flags, "RRDINSTANCE");
  868. rrdcontext_trigger_updates(ri->rc, true);
  869. }
  870. }
  871. static inline void rrdinstance_from_rrdset(RRDSET *st) {
  872. RRDCONTEXT trc = {
  873. .id = string_strdupz(st->context),
  874. .title = string_strdupz(st->title),
  875. .units = string_strdupz(st->units),
  876. .family = string_strdupz(st->family),
  877. .priority = st->priority,
  878. .chart_type = st->chart_type,
  879. .flags = RRD_FLAG_NONE,
  880. .rrdhost = st->rrdhost,
  881. };
  882. RRDCONTEXT_ACQUIRED *rca = (RRDCONTEXT_ACQUIRED *)dictionary_set_and_acquire_item((DICTIONARY *)st->rrdhost->rrdctx, string2str(trc.id), &trc, sizeof(trc));
  883. RRDCONTEXT *rc = rrdcontext_acquired_value(rca);
  884. RRDINSTANCE tri = {
  885. .id = string_strdupz(st->id),
  886. .name = string_strdupz(st->name),
  887. .units = string_strdupz(st->units),
  888. .family = string_strdupz(st->family),
  889. .title = string_strdupz(st->title),
  890. .chart_type = st->chart_type,
  891. .priority = st->priority,
  892. .update_every = st->update_every,
  893. .flags = RRD_FLAG_DONT_PROCESS,
  894. .rrdset = st,
  895. };
  896. uuid_copy(tri.uuid, *st->chart_uuid);
  897. RRDINSTANCE_ACQUIRED *ria = (RRDINSTANCE_ACQUIRED *)dictionary_set_and_acquire_item(rc->rrdinstances, string2str(tri.id), &tri, sizeof(tri));
  898. RRDCONTEXT_ACQUIRED *rca_old = st->rrdcontext;
  899. RRDINSTANCE_ACQUIRED *ria_old = st->rrdinstance;
  900. st->rrdcontext = rca;
  901. st->rrdinstance = ria;
  902. if(rca == rca_old) {
  903. rrdcontext_release(rca_old);
  904. rca_old = NULL;
  905. }
  906. if(ria == ria_old) {
  907. rrdinstance_release(ria_old);
  908. ria_old = NULL;
  909. }
  910. if(rca_old && ria_old) {
  911. // the chart changed context
  912. RRDCONTEXT *rc_old = rrdcontext_acquired_value(rca_old);
  913. RRDINSTANCE *ri_old = rrdinstance_acquired_value(ria_old);
  914. // migrate all dimensions to the new metrics
  915. rrdset_rdlock(st);
  916. RRDDIM *rd;
  917. rrddim_foreach_read(rd, st) {
  918. if (!rd->rrdmetric) continue;
  919. RRDMETRIC *rm_old = rrdmetric_acquired_value(rd->rrdmetric);
  920. rm_old->flags = RRD_FLAG_DELETED|RRD_FLAG_UPDATED|RRD_FLAG_LIVE_RETENTION|RRD_FLAG_UPDATE_REASON_UNUSED|RRD_FLAG_UPDATE_REASON_ZERO_RETENTION;
  921. rm_old->rrddim = NULL;
  922. rm_old->first_time_t = 0;
  923. rm_old->last_time_t = 0;
  924. rrdmetric_release(rd->rrdmetric);
  925. rd->rrdmetric = NULL;
  926. rrdmetric_from_rrddim(rd);
  927. }
  928. rrdset_unlock(st);
  929. // mark the old instance, ready to be deleted
  930. if(!(ri_old->flags & RRD_FLAG_OWN_LABELS))
  931. ri_old->rrdlabels = rrdlabels_create();
  932. ri_old->flags = RRD_FLAG_OWN_LABELS|RRD_FLAG_DELETED|RRD_FLAG_UPDATED|RRD_FLAG_LIVE_RETENTION|RRD_FLAG_UPDATE_REASON_UNUSED|RRD_FLAG_UPDATE_REASON_ZERO_RETENTION;
  933. ri_old->rrdset = NULL;
  934. ri_old->first_time_t = 0;
  935. ri_old->last_time_t = 0;
  936. ri_old->flags &= ~RRD_FLAG_DONT_PROCESS;
  937. rc_old->flags &= ~RRD_FLAG_DONT_PROCESS;
  938. rrdinstance_trigger_updates(ri_old, true, true);
  939. ri_old->flags |= RRD_FLAG_DONT_PROCESS;
  940. rrdinstance_release(ria_old);
  941. /*
  942. // trigger updates on the old context
  943. if(!dictionary_stats_entries(rc_old->rrdinstances) && !dictionary_stats_referenced_items(rc_old->rrdinstances)) {
  944. rrdcontext_lock(rc_old);
  945. rc_old->flags = ((rc_old->flags & RRD_FLAG_QUEUED)?RRD_FLAG_QUEUED:RRD_FLAG_NONE)|RRD_FLAG_DELETED|RRD_FLAG_UPDATED|RRD_FLAG_LIVE_RETENTION|RRD_FLAG_UPDATE_REASON_UNUSED|RRD_FLAG_UPDATE_REASON_ZERO_RETENTION;
  946. rc_old->first_time_t = 0;
  947. rc_old->last_time_t = 0;
  948. rrdcontext_unlock(rc_old);
  949. rrdcontext_trigger_updates(rc_old, true);
  950. }
  951. else
  952. rrdcontext_trigger_updates(rc_old, true);
  953. */
  954. rrdcontext_release(rca_old);
  955. rca_old = NULL;
  956. ria_old = NULL;
  957. }
  958. if(rca_old || ria_old)
  959. fatal("RRDCONTEXT: cannot switch rrdcontext without switching rrdinstance too");
  960. }
  961. #define rrdset_get_rrdinstance(st) rrdset_get_rrdinstance_with_trace(st, __FUNCTION__);
  962. static inline RRDINSTANCE *rrdset_get_rrdinstance_with_trace(RRDSET *st, const char *function) {
  963. if(unlikely(!st->rrdinstance)) {
  964. error("RRDINSTANCE: RRDSET '%s' is not linked to an RRDINSTANCE at %s()", st->id, function);
  965. return NULL;
  966. }
  967. RRDINSTANCE *ri = rrdinstance_acquired_value(st->rrdinstance);
  968. if(unlikely(ri->rrdset != st))
  969. fatal("RRDINSTANCE: '%s' is not linked to RRDSET '%s' at %s()", string2str(ri->id), st->id, function);
  970. return ri;
  971. }
  972. static inline void rrdinstance_rrdset_is_freed(RRDSET *st) {
  973. RRDINSTANCE *ri = rrdset_get_rrdinstance(st);
  974. if(unlikely(!ri)) return;
  975. rrd_flag_set_archived(ri);
  976. if(!(ri->flags & RRD_FLAG_OWN_LABELS)) {
  977. ri->flags |= RRD_FLAG_OWN_LABELS;
  978. ri->rrdlabels = rrdlabels_create();
  979. rrdlabels_copy(ri->rrdlabels, st->state->chart_labels);
  980. }
  981. ri->rrdset = NULL;
  982. ri->flags &= ~RRD_FLAG_DONT_PROCESS;
  983. rrdinstance_trigger_updates(ri, false, true);
  984. ri->flags |= RRD_FLAG_DONT_PROCESS;
  985. rrdinstance_release(st->rrdinstance);
  986. st->rrdinstance = NULL;
  987. rrdcontext_release(st->rrdcontext);
  988. st->rrdcontext = NULL;
  989. }
  990. static inline void rrdinstance_updated_rrdset_name(RRDSET *st) {
  991. // the chart may not be initialized when this is called
  992. if(unlikely(!st->rrdinstance)) return;
  993. RRDINSTANCE *ri = rrdset_get_rrdinstance(st);
  994. if(unlikely(!ri)) return;
  995. STRING *old = ri->name;
  996. ri->name = string_strdupz(st->name);
  997. if(ri->name != old)
  998. rrd_flag_set_updated(ri, RRD_FLAG_UPDATE_REASON_CHANGED_NAME);
  999. string_freez(old);
  1000. rrdinstance_trigger_updates(ri, false, true);
  1001. }
  1002. static inline void rrdinstance_updated_rrdset_flags_no_action(RRDINSTANCE *ri, RRDSET *st) {
  1003. if(unlikely(st->flags & (RRDSET_FLAG_ARCHIVED | RRDSET_FLAG_OBSOLETE)))
  1004. rrd_flag_set_archived(ri);
  1005. if(unlikely((st->flags & RRDSET_FLAG_HIDDEN) && !(ri->flags & RRD_FLAG_HIDDEN))) {
  1006. ri->flags |= RRD_FLAG_HIDDEN;
  1007. rrd_flag_set_updated(ri, RRD_FLAG_UPDATE_REASON_CHANGED_FLAGS);
  1008. }
  1009. else if(unlikely(!(st->flags & RRDSET_FLAG_HIDDEN) && (ri->flags & RRD_FLAG_HIDDEN))) {
  1010. ri->flags &= ~RRD_FLAG_HIDDEN;
  1011. rrd_flag_set_updated(ri, RRD_FLAG_UPDATE_REASON_CHANGED_FLAGS);
  1012. }
  1013. }
  1014. static inline void rrdinstance_updated_rrdset_flags(RRDSET *st) {
  1015. RRDINSTANCE *ri = rrdset_get_rrdinstance(st);
  1016. if(unlikely(!ri)) return;
  1017. rrdinstance_updated_rrdset_flags_no_action(ri, st);
  1018. ri->flags &= ~RRD_FLAG_DONT_PROCESS;
  1019. rrdinstance_trigger_updates(ri, false, true);
  1020. ri->flags |= RRD_FLAG_DONT_PROCESS;
  1021. }
  1022. static inline void rrdinstance_collected_rrdset(RRDSET *st) {
  1023. RRDINSTANCE *ri = rrdset_get_rrdinstance(st);
  1024. if(unlikely(!ri)) return;
  1025. rrdinstance_updated_rrdset_flags_no_action(ri, st);
  1026. if(unlikely(!rrd_flag_is_collected(ri)))
  1027. rrd_flag_set_collected(ri);
  1028. if(unlikely(ri->flags & RRD_FLAG_DONT_PROCESS))
  1029. ri->flags &= ~RRD_FLAG_DONT_PROCESS;
  1030. rrdinstance_trigger_updates(ri, false, true);
  1031. }
  1032. // ----------------------------------------------------------------------------
  1033. // RRDCONTEXT
  1034. static void rrdcontext_freez(RRDCONTEXT *rc) {
  1035. string_freez(rc->id);
  1036. string_freez(rc->title);
  1037. string_freez(rc->units);
  1038. string_freez(rc->family);
  1039. }
  1040. static uint64_t rrdcontext_get_next_version(RRDCONTEXT *rc) {
  1041. time_t now = now_realtime_sec();
  1042. uint64_t version = MAX(rc->version, rc->hub.version);
  1043. version = MAX((uint64_t)now, version);
  1044. version++;
  1045. return version;
  1046. }
  1047. static void rrdcontext_message_send_unsafe(RRDCONTEXT *rc, bool snapshot __maybe_unused, void *bundle __maybe_unused) {
  1048. // save it, so that we know the last version we sent to hub
  1049. rc->version = rc->hub.version = rrdcontext_get_next_version(rc);
  1050. rc->hub.id = string2str(rc->id);
  1051. rc->hub.title = string2str(rc->title);
  1052. rc->hub.units = string2str(rc->units);
  1053. rc->hub.family = string2str(rc->family);
  1054. rc->hub.chart_type = rrdset_type_name(rc->chart_type);
  1055. rc->hub.priority = rc->priority;
  1056. rc->hub.first_time_t = rc->first_time_t;
  1057. rc->hub.last_time_t = rrd_flag_is_collected(rc) ? 0 : rc->last_time_t;
  1058. rc->hub.deleted = (rc->flags & RRD_FLAG_DELETED) ? true : false;
  1059. #ifdef ENABLE_ACLK
  1060. struct context_updated message = {
  1061. .id = rc->hub.id,
  1062. .version = rc->hub.version,
  1063. .title = rc->hub.title,
  1064. .units = rc->hub.units,
  1065. .family = rc->hub.family,
  1066. .chart_type = rc->hub.chart_type,
  1067. .priority = rc->hub.priority,
  1068. .first_entry = rc->hub.first_time_t,
  1069. .last_entry = rc->hub.last_time_t,
  1070. .deleted = rc->hub.deleted,
  1071. };
  1072. if(likely(!(rc->flags & RRD_FLAG_HIDDEN))) {
  1073. if (snapshot) {
  1074. if (!rc->hub.deleted)
  1075. contexts_snapshot_add_ctx_update(bundle, &message);
  1076. }
  1077. else
  1078. contexts_updated_add_ctx_update(bundle, &message);
  1079. }
  1080. #endif
  1081. // store it to SQL
  1082. if(rc->flags & RRD_FLAG_DELETED) {
  1083. rrdcontext_delete_from_sql_unsafe(rc);
  1084. }
  1085. else {
  1086. if (ctx_store_context(&rc->rrdhost->host_uuid, &rc->hub) != 0)
  1087. error("RRDCONTEXT: failed to save context '%s' version %"PRIu64" to SQL.", rc->hub.id, rc->hub.version);
  1088. }
  1089. }
  1090. static bool check_if_cloud_version_changed_unsafe(RRDCONTEXT *rc, bool sending __maybe_unused) {
  1091. bool id_changed = false,
  1092. title_changed = false,
  1093. units_changed = false,
  1094. family_changed = false,
  1095. chart_type_changed = false,
  1096. priority_changed = false,
  1097. first_time_changed = false,
  1098. last_time_changed = false,
  1099. deleted_changed = false;
  1100. if(unlikely(string2str(rc->id) != rc->hub.id))
  1101. id_changed = true;
  1102. if(unlikely(string2str(rc->title) != rc->hub.title))
  1103. title_changed = true;
  1104. if(unlikely(string2str(rc->units) != rc->hub.units))
  1105. units_changed = true;
  1106. if(unlikely(string2str(rc->family) != rc->hub.family))
  1107. family_changed = true;
  1108. if(unlikely(rrdset_type_name(rc->chart_type) != rc->hub.chart_type))
  1109. chart_type_changed = true;
  1110. if(unlikely(rc->priority != rc->hub.priority))
  1111. priority_changed = true;
  1112. if(unlikely((uint64_t)rc->first_time_t != rc->hub.first_time_t))
  1113. first_time_changed = true;
  1114. if(unlikely((uint64_t)(rrd_flag_is_collected(rc) ? 0 : rc->last_time_t) != rc->hub.last_time_t))
  1115. last_time_changed = true;
  1116. if(unlikely(((rc->flags & RRD_FLAG_DELETED) ? true : false) != rc->hub.deleted))
  1117. deleted_changed = true;
  1118. if(unlikely(id_changed || title_changed || units_changed || family_changed || chart_type_changed || priority_changed || first_time_changed || last_time_changed || deleted_changed)) {
  1119. internal_error(true, "RRDCONTEXT: %s NEW VERSION '%s'%s, version %"PRIu64", title '%s'%s, units '%s'%s, family '%s'%s, chart type '%s'%s, priority %u%s, first_time_t %ld%s, last_time_t %ld%s, deleted '%s'%s, (queued for %llu ms, expected %llu ms)",
  1120. sending?"SENDING":"QUEUE",
  1121. string2str(rc->id), id_changed ? " (CHANGED)" : "",
  1122. rc->version,
  1123. string2str(rc->title), title_changed ? " (CHANGED)" : "",
  1124. string2str(rc->units), units_changed ? " (CHANGED)" : "",
  1125. string2str(rc->family), family_changed ? " (CHANGED)" : "",
  1126. rrdset_type_name(rc->chart_type), chart_type_changed ? " (CHANGED)" : "",
  1127. rc->priority, priority_changed ? " (CHANGED)" : "",
  1128. rc->first_time_t, first_time_changed ? " (CHANGED)" : "",
  1129. rrd_flag_is_collected(rc) ? 0 : rc->last_time_t, last_time_changed ? " (CHANGED)" : "",
  1130. (rc->flags & RRD_FLAG_DELETED) ? "true" : "false", deleted_changed ? " (CHANGED)" : "",
  1131. sending ? (now_realtime_usec() - rc->queue.queued_ut) / USEC_PER_MS : 0,
  1132. sending ? (rc->queue.scheduled_dispatch_ut - rc->queue.queued_ut) / USEC_PER_SEC : 0
  1133. );
  1134. return true;
  1135. }
  1136. return false;
  1137. }
  1138. static void rrdcontext_insert_callback(const char *id, void *value, void *data) {
  1139. (void)id;
  1140. RRDHOST *host = (RRDHOST *)data;
  1141. RRDCONTEXT *rc = (RRDCONTEXT *)value;
  1142. rc->rrdhost = host;
  1143. rc->flags = rc->flags & RRD_FLAGS_ALLOWED_EXTERNALLY_ON_NEW_OBJECTS;
  1144. if(rc->hub.version) {
  1145. // we are loading data from the SQL database
  1146. if(rc->version)
  1147. error("RRDCONTEXT: context '%s' is already initialized with version %"PRIu64", but it is loaded again from SQL with version %"PRIu64"", string2str(rc->id), rc->version, rc->hub.version);
  1148. // IMPORTANT
  1149. // replace all string pointers in rc->hub with our own versions
  1150. // the originals are coming from a tmp allocation of sqlite
  1151. string_freez(rc->id);
  1152. rc->id = string_strdupz(rc->hub.id);
  1153. rc->hub.id = string2str(rc->id);
  1154. string_freez(rc->title);
  1155. rc->title = string_strdupz(rc->hub.title);
  1156. rc->hub.title = string2str(rc->title);
  1157. string_freez(rc->units);
  1158. rc->units = string_strdupz(rc->hub.units);
  1159. rc->hub.units = string2str(rc->units);
  1160. string_freez(rc->family);
  1161. rc->family = string_strdupz(rc->hub.family);
  1162. rc->hub.family = string2str(rc->family);
  1163. rc->chart_type = rrdset_type_id(rc->hub.chart_type);
  1164. rc->hub.chart_type = rrdset_type_name(rc->chart_type);
  1165. rc->version = rc->hub.version;
  1166. rc->priority = rc->hub.priority;
  1167. rc->first_time_t = rc->hub.first_time_t;
  1168. rc->last_time_t = rc->hub.last_time_t;
  1169. if(rc->hub.deleted || !rc->hub.first_time_t)
  1170. rrd_flag_set_deleted(rc, 0);
  1171. else {
  1172. if (rc->last_time_t == 0)
  1173. rrd_flag_set_collected(rc);
  1174. else
  1175. rrd_flag_set_archived(rc);
  1176. }
  1177. rc->flags |= RRD_FLAG_UPDATE_REASON_LOAD_SQL;
  1178. }
  1179. else {
  1180. // we are adding this context now for the first time
  1181. rc->version = now_realtime_sec();
  1182. }
  1183. rrdinstances_create(rc);
  1184. netdata_mutex_init(&rc->mutex);
  1185. // signal the react callback to do the job
  1186. rrd_flag_set_updated(rc, RRD_FLAG_UPDATE_REASON_NEW_OBJECT);
  1187. }
  1188. static void rrdcontext_delete_callback(const char *id, void *value, void *data) {
  1189. (void)id;
  1190. RRDHOST *host = (RRDHOST *)data;
  1191. (void)host;
  1192. RRDCONTEXT *rc = (RRDCONTEXT *)value;
  1193. rrdinstances_destroy(rc);
  1194. netdata_mutex_destroy(&rc->mutex);
  1195. rrdcontext_freez(rc);
  1196. }
  1197. static void rrdcontext_conflict_callback(const char *id, void *oldv, void *newv, void *data) {
  1198. (void)id;
  1199. RRDHOST *host = (RRDHOST *)data;
  1200. (void)host;
  1201. RRDCONTEXT *rc = (RRDCONTEXT *)oldv;
  1202. RRDCONTEXT *rc_new = (RRDCONTEXT *)newv;
  1203. rrdcontext_lock(rc);
  1204. if(rc->title != rc_new->title) {
  1205. STRING *old_title = rc->title;
  1206. rc->title = string_2way_merge(rc->title, rc_new->title);
  1207. string_freez(old_title);
  1208. rrd_flag_set_updated(rc, RRD_FLAG_UPDATE_REASON_CHANGED_TITLE);
  1209. }
  1210. if(rc->units != rc_new->units) {
  1211. STRING *old_units = rc->units;
  1212. rc->units = string_dup(rc_new->units);
  1213. string_freez(old_units);
  1214. rrd_flag_set_updated(rc, RRD_FLAG_UPDATE_REASON_CHANGED_UNITS);
  1215. }
  1216. if(rc->family != rc_new->family) {
  1217. STRING *old_family = rc->family;
  1218. rc->family = string_2way_merge(rc->family, rc_new->family);
  1219. string_freez(old_family);
  1220. rrd_flag_set_updated(rc, RRD_FLAG_UPDATE_REASON_CHANGED_FAMILY);
  1221. }
  1222. if(rc->chart_type != rc_new->chart_type) {
  1223. rc->chart_type = rc_new->chart_type;
  1224. rrd_flag_set_updated(rc, RRD_FLAG_UPDATE_REASON_CHANGED_CHART_TYPE);
  1225. }
  1226. if(rc->priority != rc_new->priority) {
  1227. rc->priority = rc_new->priority;
  1228. rrd_flag_set_updated(rc, RRD_FLAG_UPDATE_REASON_CHANGED_PRIORITY);
  1229. }
  1230. rc->flags |= (rc_new->flags & RRD_FLAGS_ALLOWED_EXTERNALLY_ON_NEW_OBJECTS);
  1231. if(rrd_flag_is_collected(rc) && rrd_flag_is_archived(rc))
  1232. rrd_flag_set_collected(rc);
  1233. if(rc->flags & RRD_FLAG_UPDATED)
  1234. rc->flags |= RRD_FLAG_UPDATE_REASON_UPDATED_OBJECT;
  1235. rrdcontext_unlock(rc);
  1236. // free the resources of the new one
  1237. rrdcontext_freez(rc_new);
  1238. // the react callback will continue from here
  1239. }
  1240. static void rrdcontext_react_callback(const char *id __maybe_unused, void *value, void *data __maybe_unused) {
  1241. RRDCONTEXT *rc = (RRDCONTEXT *)value;
  1242. rrdcontext_trigger_updates(rc, false);
  1243. }
  1244. void rrdhost_create_rrdcontexts(RRDHOST *host) {
  1245. if(unlikely(rrdcontext_enabled == CONFIG_BOOLEAN_NO))
  1246. return;
  1247. if(unlikely(!host)) return;
  1248. if(likely(host->rrdctx)) return;
  1249. host->rrdctx = (RRDCONTEXTS *)dictionary_create(DICTIONARY_FLAG_DONT_OVERWRITE_VALUE);
  1250. dictionary_register_insert_callback((DICTIONARY *)host->rrdctx, rrdcontext_insert_callback, (void *)host);
  1251. dictionary_register_delete_callback((DICTIONARY *)host->rrdctx, rrdcontext_delete_callback, (void *)host);
  1252. dictionary_register_conflict_callback((DICTIONARY *)host->rrdctx, rrdcontext_conflict_callback, (void *)host);
  1253. dictionary_register_react_callback((DICTIONARY *)host->rrdctx, rrdcontext_react_callback, (void *)host);
  1254. host->rrdctx_queue = (RRDCONTEXTS *)dictionary_create(DICTIONARY_FLAG_DONT_OVERWRITE_VALUE | DICTIONARY_FLAG_VALUE_LINK_DONT_CLONE);
  1255. }
  1256. void rrdhost_destroy_rrdcontexts(RRDHOST *host) {
  1257. if(unlikely(!host)) return;
  1258. if(unlikely(!host->rrdctx)) return;
  1259. if(host->rrdctx_queue) {
  1260. dictionary_destroy((DICTIONARY *)host->rrdctx_queue);
  1261. host->rrdctx_queue = NULL;
  1262. }
  1263. dictionary_destroy((DICTIONARY *)host->rrdctx);
  1264. host->rrdctx = NULL;
  1265. }
  1266. static inline bool rrdcontext_should_be_deleted(RRDCONTEXT *rc) {
  1267. if(likely(!(rc->flags & RRD_FLAG_DELETED)))
  1268. return false;
  1269. if(likely(!(rc->flags & RRD_FLAG_LIVE_RETENTION)))
  1270. return false;
  1271. if(unlikely(rc->flags & RRD_FLAGS_PREVENTING_DELETIONS))
  1272. return false;
  1273. if(unlikely(dictionary_stats_referenced_items(rc->rrdinstances) != 0))
  1274. return false;
  1275. if(unlikely(dictionary_stats_entries(rc->rrdinstances) != 0))
  1276. return false;
  1277. if(unlikely(rc->first_time_t || rc->last_time_t))
  1278. return false;
  1279. return true;
  1280. }
  1281. static void rrdcontext_trigger_updates(RRDCONTEXT *rc, bool force) {
  1282. if(unlikely(rc->flags & RRD_FLAG_DONT_PROCESS)) return;
  1283. if(unlikely(!force && !(rc->flags & RRD_FLAG_UPDATED))) return;
  1284. rrdcontext_lock(rc);
  1285. size_t min_priority = LONG_MAX;
  1286. time_t min_first_time_t = LONG_MAX, max_last_time_t = 0;
  1287. size_t instances_active = 0, instances_deleted = 0;
  1288. bool live_retention = true, currently_collected = false, hidden = true;
  1289. {
  1290. RRDINSTANCE *ri;
  1291. dfe_start_read(rc->rrdinstances, ri) {
  1292. if(likely(!(ri->flags & RRD_FLAG_HIDDEN)))
  1293. hidden = false;
  1294. if(!(ri->flags & RRD_FLAG_LIVE_RETENTION))
  1295. live_retention = false;
  1296. if (unlikely(rrdinstance_should_be_deleted(ri))) {
  1297. instances_deleted++;
  1298. rrd_flag_unset_updated(ri);
  1299. continue;
  1300. }
  1301. if(ri->flags & RRD_FLAG_COLLECTED)
  1302. currently_collected = true;
  1303. internal_error(rc->units != ri->units,
  1304. "RRDCONTEXT: '%s' rrdinstance '%s' has different units, context '%s', instance '%s'",
  1305. string2str(rc->id), string2str(ri->id),
  1306. string2str(rc->units), string2str(ri->units));
  1307. instances_active++;
  1308. if (ri->priority >= RRDCONTEXT_MINIMUM_ALLOWED_PRIORITY && ri->priority < min_priority)
  1309. min_priority = ri->priority;
  1310. if (ri->first_time_t && ri->first_time_t < min_first_time_t)
  1311. min_first_time_t = ri->first_time_t;
  1312. if (ri->last_time_t && ri->last_time_t > max_last_time_t)
  1313. max_last_time_t = ri->last_time_t;
  1314. rrd_flag_unset_updated(ri);
  1315. }
  1316. dfe_done(ri);
  1317. }
  1318. if(hidden && !(rc->flags & RRD_FLAG_HIDDEN))
  1319. rc->flags |= RRD_FLAG_HIDDEN;
  1320. else if(!hidden && (rc->flags & RRD_FLAG_HIDDEN))
  1321. rc->flags &= ~RRD_FLAG_HIDDEN;
  1322. if(live_retention && !(rc->flags & RRD_FLAG_LIVE_RETENTION))
  1323. rc->flags |= RRD_FLAG_LIVE_RETENTION;
  1324. else if(!live_retention && (rc->flags & RRD_FLAG_LIVE_RETENTION))
  1325. rc->flags &= ~RRD_FLAG_LIVE_RETENTION;
  1326. if(unlikely(!instances_active)) {
  1327. // we had some instances, but they are gone now...
  1328. if(rc->first_time_t) {
  1329. rc->first_time_t = 0;
  1330. rrd_flag_set_updated(rc, RRD_FLAG_UPDATE_REASON_CHANGED_FIRST_TIME_T);
  1331. }
  1332. if(rc->last_time_t) {
  1333. rc->last_time_t = 0;
  1334. rrd_flag_set_updated(rc, RRD_FLAG_UPDATE_REASON_CHANGED_LAST_TIME_T);
  1335. }
  1336. rrd_flag_set_deleted(rc, RRD_FLAG_UPDATE_REASON_ZERO_RETENTION);
  1337. }
  1338. else {
  1339. // we have some active instances...
  1340. if (unlikely(min_first_time_t == LONG_MAX))
  1341. min_first_time_t = 0;
  1342. if (unlikely(min_first_time_t == 0 && max_last_time_t == 0)) {
  1343. if(rc->first_time_t) {
  1344. rc->first_time_t = 0;
  1345. rrd_flag_set_updated(rc, RRD_FLAG_UPDATE_REASON_CHANGED_FIRST_TIME_T);
  1346. }
  1347. if(rc->last_time_t) {
  1348. rc->last_time_t = 0;
  1349. rrd_flag_set_updated(rc, RRD_FLAG_UPDATE_REASON_CHANGED_LAST_TIME_T);
  1350. }
  1351. rrd_flag_set_deleted(rc, RRD_FLAG_UPDATE_REASON_ZERO_RETENTION);
  1352. }
  1353. else {
  1354. rc->flags &= ~RRD_FLAG_UPDATE_REASON_ZERO_RETENTION;
  1355. if (unlikely(rc->first_time_t != min_first_time_t)) {
  1356. rc->first_time_t = min_first_time_t;
  1357. rrd_flag_set_updated(rc, RRD_FLAG_UPDATE_REASON_CHANGED_FIRST_TIME_T);
  1358. }
  1359. if (rc->last_time_t != max_last_time_t) {
  1360. rc->last_time_t = max_last_time_t;
  1361. rrd_flag_set_updated(rc, RRD_FLAG_UPDATE_REASON_CHANGED_LAST_TIME_T);
  1362. }
  1363. if(likely(currently_collected))
  1364. rrd_flag_set_collected(rc);
  1365. else
  1366. rrd_flag_set_archived(rc);
  1367. }
  1368. if (min_priority != LONG_MAX && rc->priority != min_priority) {
  1369. rc->priority = min_priority;
  1370. rrd_flag_set_updated(rc, RRD_FLAG_UPDATE_REASON_CHANGED_PRIORITY);
  1371. }
  1372. }
  1373. if(unlikely(rc->flags & RRD_FLAG_UPDATED)) {
  1374. log_transition(NULL, NULL, rc->id, rc->flags, "RRDCONTEXT");
  1375. if(check_if_cloud_version_changed_unsafe(rc, false)) {
  1376. rc->version = rrdcontext_get_next_version(rc);
  1377. if(rc->flags & RRD_FLAG_QUEUED) {
  1378. rc->queue.queued_ut = now_realtime_usec();
  1379. rc->queue.queued_flags |= rc->flags;
  1380. }
  1381. else {
  1382. rc->queue.queued_ut = now_realtime_usec();
  1383. rc->queue.queued_flags = rc->flags;
  1384. rc->flags |= RRD_FLAG_QUEUED;
  1385. dictionary_set((DICTIONARY *)rc->rrdhost->rrdctx_queue, string2str(rc->id), rc, sizeof(*rc));
  1386. }
  1387. }
  1388. rrd_flag_unset_updated(rc);
  1389. }
  1390. rrdcontext_unlock(rc);
  1391. }
  1392. // ----------------------------------------------------------------------------
  1393. // public API
  1394. void rrdcontext_updated_rrddim(RRDDIM *rd) {
  1395. if(unlikely(rrdcontext_enabled == CONFIG_BOOLEAN_NO))
  1396. return;
  1397. rrdmetric_from_rrddim(rd);
  1398. }
  1399. void rrdcontext_removed_rrddim(RRDDIM *rd) {
  1400. if(unlikely(rrdcontext_enabled == CONFIG_BOOLEAN_NO))
  1401. return;
  1402. rrdmetric_rrddim_is_freed(rd);
  1403. }
  1404. void rrdcontext_updated_rrddim_algorithm(RRDDIM *rd) {
  1405. if(unlikely(rrdcontext_enabled == CONFIG_BOOLEAN_NO))
  1406. return;
  1407. rrdmetric_updated_rrddim_flags(rd);
  1408. }
  1409. void rrdcontext_updated_rrddim_multiplier(RRDDIM *rd) {
  1410. if(unlikely(rrdcontext_enabled == CONFIG_BOOLEAN_NO))
  1411. return;
  1412. rrdmetric_updated_rrddim_flags(rd);
  1413. }
  1414. void rrdcontext_updated_rrddim_divisor(RRDDIM *rd) {
  1415. if(unlikely(rrdcontext_enabled == CONFIG_BOOLEAN_NO))
  1416. return;
  1417. rrdmetric_updated_rrddim_flags(rd);
  1418. }
  1419. void rrdcontext_updated_rrddim_flags(RRDDIM *rd) {
  1420. if(unlikely(rrdcontext_enabled == CONFIG_BOOLEAN_NO))
  1421. return;
  1422. rrdmetric_updated_rrddim_flags(rd);
  1423. }
  1424. void rrdcontext_collected_rrddim(RRDDIM *rd) {
  1425. if(unlikely(rrdcontext_enabled == CONFIG_BOOLEAN_NO))
  1426. return;
  1427. rrdmetric_collected_rrddim(rd);
  1428. }
  1429. void rrdcontext_updated_rrdset(RRDSET *st) {
  1430. if(unlikely(rrdcontext_enabled == CONFIG_BOOLEAN_NO))
  1431. return;
  1432. rrdinstance_from_rrdset(st);
  1433. }
  1434. void rrdcontext_removed_rrdset(RRDSET *st) {
  1435. if(unlikely(rrdcontext_enabled == CONFIG_BOOLEAN_NO))
  1436. return;
  1437. rrdinstance_rrdset_is_freed(st);
  1438. }
  1439. void rrdcontext_updated_rrdset_name(RRDSET *st) {
  1440. if(unlikely(rrdcontext_enabled == CONFIG_BOOLEAN_NO))
  1441. return;
  1442. rrdinstance_updated_rrdset_name(st);
  1443. }
  1444. void rrdcontext_updated_rrdset_flags(RRDSET *st) {
  1445. if(unlikely(rrdcontext_enabled == CONFIG_BOOLEAN_NO))
  1446. return;
  1447. rrdinstance_updated_rrdset_flags(st);
  1448. }
  1449. void rrdcontext_collected_rrdset(RRDSET *st) {
  1450. if(unlikely(rrdcontext_enabled == CONFIG_BOOLEAN_NO))
  1451. return;
  1452. rrdinstance_collected_rrdset(st);
  1453. }
  1454. void rrdcontext_host_child_connected(RRDHOST *host) {
  1455. (void)host;
  1456. if(unlikely(rrdcontext_enabled == CONFIG_BOOLEAN_NO))
  1457. return;
  1458. // no need to do anything here
  1459. ;
  1460. }
  1461. void rrdcontext_host_child_disconnected(RRDHOST *host) {
  1462. if(unlikely(rrdcontext_enabled == CONFIG_BOOLEAN_NO))
  1463. return;
  1464. rrdcontext_recalculate_host_retention(host, RRD_FLAG_UPDATE_REASON_DISCONNECTED_CHILD, -1);
  1465. }
  1466. // ----------------------------------------------------------------------------
  1467. // ACLK interface
  1468. static bool rrdhost_check_our_claim_id(const char *claim_id) {
  1469. if(!localhost->aclk_state.claimed_id) return false;
  1470. return (strcasecmp(claim_id, localhost->aclk_state.claimed_id) == 0) ? true : false;
  1471. }
  1472. static RRDHOST *rrdhost_find_by_node_id(const char *node_id) {
  1473. uuid_t uuid;
  1474. if (uuid_parse(node_id, uuid))
  1475. return NULL;
  1476. RRDHOST *host = NULL;
  1477. rrd_rdlock();
  1478. rrdhost_foreach_read(host) {
  1479. if(!host->node_id) continue;
  1480. if(uuid_compare(uuid, *host->node_id) == 0)
  1481. break;
  1482. }
  1483. rrd_unlock();
  1484. return host;
  1485. }
  1486. void rrdcontext_hub_checkpoint_command(void *ptr) {
  1487. struct ctxs_checkpoint *cmd = ptr;
  1488. if(!rrdhost_check_our_claim_id(cmd->claim_id)) {
  1489. error("RRDCONTEXT: received checkpoint command for claim_id '%s', node id '%s', but this is not our claim id. Ours '%s', received '%s'. Ignoring command.",
  1490. cmd->claim_id, cmd->node_id,
  1491. localhost->aclk_state.claimed_id?localhost->aclk_state.claimed_id:"NOT SET",
  1492. cmd->claim_id);
  1493. return;
  1494. }
  1495. RRDHOST *host = rrdhost_find_by_node_id(cmd->node_id);
  1496. if(!host) {
  1497. error("RRDCONTEXT: received checkpoint command for claim id '%s', node id '%s', but there is no node with such node id here. Ignoring command.",
  1498. cmd->claim_id, cmd->node_id);
  1499. return;
  1500. }
  1501. if(rrdhost_flag_check(host, RRDHOST_FLAG_ACLK_STREAM_CONTEXTS)) {
  1502. info("RRDCONTEXT: received checkpoint command for claim id '%s', node id '%s', while node '%s' has an active context streaming.",
  1503. cmd->claim_id, cmd->node_id, host->hostname);
  1504. // disable it temporarily, so that our worker will not attempt to send messages in parallel
  1505. rrdhost_flag_clear(host, RRDHOST_FLAG_ACLK_STREAM_CONTEXTS);
  1506. }
  1507. uint64_t our_version_hash = rrdcontext_version_hash(host);
  1508. if(cmd->version_hash != our_version_hash) {
  1509. error("RRDCONTEXT: received version hash %"PRIu64" for host '%s', does not match our version hash %"PRIu64". Sending snapshot of all contexts.",
  1510. cmd->version_hash, host->hostname, our_version_hash);
  1511. #ifdef ENABLE_ACLK
  1512. // prepare the snapshot
  1513. char uuid[UUID_STR_LEN];
  1514. uuid_unparse_lower(*host->node_id, uuid);
  1515. contexts_snapshot_t bundle = contexts_snapshot_new(cmd->claim_id, uuid, our_version_hash);
  1516. // do a deep scan on every metric of the host to make sure all our data are updated
  1517. rrdcontext_recalculate_host_retention(host, RRD_FLAG_NONE, -1);
  1518. // calculate version hash and pack all the messages together in one go
  1519. our_version_hash = rrdcontext_version_hash_with_callback(host, rrdcontext_message_send_unsafe, true, bundle);
  1520. // update the version
  1521. contexts_snapshot_set_version(bundle, our_version_hash);
  1522. // send it
  1523. aclk_send_contexts_snapshot(bundle);
  1524. #endif
  1525. }
  1526. internal_error(true, "RRDCONTEXT: host '%s' enabling streaming of contexts", host->hostname);
  1527. rrdhost_flag_set(host, RRDHOST_FLAG_ACLK_STREAM_CONTEXTS);
  1528. char node_str[UUID_STR_LEN];
  1529. uuid_unparse_lower(*host->node_id, node_str);
  1530. log_access("ACLK REQ [%s (%s)]: STREAM CONTEXTS ENABLED", node_str, host->hostname);
  1531. }
  1532. void rrdcontext_hub_stop_streaming_command(void *ptr) {
  1533. struct stop_streaming_ctxs *cmd = ptr;
  1534. if(!rrdhost_check_our_claim_id(cmd->claim_id)) {
  1535. error("RRDCONTEXT: received stop streaming command for claim_id '%s', node id '%s', but this is not our claim id. Ours '%s', received '%s'. Ignoring command.",
  1536. cmd->claim_id, cmd->node_id,
  1537. localhost->aclk_state.claimed_id?localhost->aclk_state.claimed_id:"NOT SET",
  1538. cmd->claim_id);
  1539. return;
  1540. }
  1541. RRDHOST *host = rrdhost_find_by_node_id(cmd->node_id);
  1542. if(!host) {
  1543. error("RRDCONTEXT: received stop streaming command for claim id '%s', node id '%s', but there is no node with such node id here. Ignoring command.",
  1544. cmd->claim_id, cmd->node_id);
  1545. return;
  1546. }
  1547. if(!rrdhost_flag_check(host, RRDHOST_FLAG_ACLK_STREAM_CONTEXTS)) {
  1548. error("RRDCONTEXT: received stop streaming command for claim id '%s', node id '%s', but node '%s' does not have active context streaming. Ignoring command.",
  1549. cmd->claim_id, cmd->node_id, host->hostname);
  1550. return;
  1551. }
  1552. internal_error(true, "RRDCONTEXT: host '%s' disabling streaming of contexts", host->hostname);
  1553. rrdhost_flag_clear(host, RRDHOST_FLAG_ACLK_STREAM_CONTEXTS);
  1554. }
  1555. // ----------------------------------------------------------------------------
  1556. // web API
  1557. struct rrdcontext_to_json {
  1558. BUFFER *wb;
  1559. RRDCONTEXT_TO_JSON_OPTIONS options;
  1560. time_t after;
  1561. time_t before;
  1562. SIMPLE_PATTERN *chart_label_key;
  1563. SIMPLE_PATTERN *chart_labels_filter;
  1564. SIMPLE_PATTERN *chart_dimensions;
  1565. size_t written;
  1566. time_t now;
  1567. time_t combined_first_time_t;
  1568. time_t combined_last_time_t;
  1569. RRD_FLAGS combined_flags;
  1570. };
  1571. static inline int rrdmetric_to_json_callback(const char *id, void *value, void *data) {
  1572. struct rrdcontext_to_json * t = data;
  1573. RRDMETRIC *rm = value;
  1574. BUFFER *wb = t->wb;
  1575. RRDCONTEXT_TO_JSON_OPTIONS options = t->options;
  1576. time_t after = t->after;
  1577. time_t before = t->before;
  1578. if((rm->flags & RRD_FLAG_DELETED) && !(options & RRDCONTEXT_OPTION_SHOW_DELETED))
  1579. return 0;
  1580. if(after && (!rm->last_time_t || after > rm->last_time_t))
  1581. return 0;
  1582. if(before && (!rm->first_time_t || before < rm->first_time_t))
  1583. return 0;
  1584. if(t->chart_dimensions
  1585. && !simple_pattern_matches(t->chart_dimensions, string2str(rm->id))
  1586. && !simple_pattern_matches(t->chart_dimensions, string2str(rm->name)))
  1587. return 0;
  1588. if(t->written) {
  1589. buffer_strcat(wb, ",\n");
  1590. t->combined_first_time_t = MIN(t->combined_first_time_t, rm->first_time_t);
  1591. t->combined_last_time_t = MAX(t->combined_last_time_t, rm->last_time_t);
  1592. t->combined_flags |= rm->flags;
  1593. }
  1594. else {
  1595. buffer_strcat(wb, "\n");
  1596. t->combined_first_time_t = rm->first_time_t;
  1597. t->combined_last_time_t = rm->last_time_t;
  1598. t->combined_flags = rm->flags;
  1599. }
  1600. buffer_sprintf(wb, "\t\t\t\t\t\t\"%s\": {", id);
  1601. if(options & RRDCONTEXT_OPTION_SHOW_UUIDS) {
  1602. char uuid[UUID_STR_LEN];
  1603. uuid_unparse(rm->uuid, uuid);
  1604. buffer_sprintf(wb, "\n\t\t\t\t\t\t\t\"uuid\":\"%s\",", uuid);
  1605. }
  1606. buffer_sprintf(wb,
  1607. "\n\t\t\t\t\t\t\t\"name\":\"%s\""
  1608. ",\n\t\t\t\t\t\t\t\"first_time_t\":%ld"
  1609. ",\n\t\t\t\t\t\t\t\"last_time_t\":%ld"
  1610. ",\n\t\t\t\t\t\t\t\"collected\":%s"
  1611. , string2str(rm->name)
  1612. , rm->first_time_t
  1613. , rrd_flag_is_collected(rm) ? t->now : rm->last_time_t
  1614. , rm->flags & RRD_FLAG_COLLECTED ? "true" : "false"
  1615. );
  1616. if(options & RRDCONTEXT_OPTION_SHOW_DELETED) {
  1617. buffer_sprintf(wb,
  1618. ",\n\t\t\t\t\t\t\t\"deleted\":%s"
  1619. , rm->flags & RRD_FLAG_DELETED ? "true" : "false"
  1620. );
  1621. }
  1622. if(options & RRDCONTEXT_OPTION_SHOW_FLAGS) {
  1623. buffer_strcat(wb, ",\n\t\t\t\t\t\t\t\"flags\":\"");
  1624. rrd_flags_to_buffer(rm->flags, wb);
  1625. buffer_strcat(wb, "\"");
  1626. }
  1627. buffer_strcat(wb, "\n\t\t\t\t\t\t}");
  1628. t->written++;
  1629. return 1;
  1630. }
  1631. static inline int rrdinstance_to_json_callback(const char *id, void *value, void *data) {
  1632. struct rrdcontext_to_json *t_parent = data;
  1633. RRDINSTANCE *ri = value;
  1634. BUFFER *wb = t_parent->wb;
  1635. RRDCONTEXT_TO_JSON_OPTIONS options = t_parent->options;
  1636. time_t after = t_parent->after;
  1637. time_t before = t_parent->before;
  1638. bool has_filter = t_parent->chart_label_key || t_parent->chart_labels_filter || t_parent->chart_dimensions;
  1639. if((ri->flags & RRD_FLAG_DELETED) && !(options & RRDCONTEXT_OPTION_SHOW_DELETED))
  1640. return 0;
  1641. if(after && (!ri->last_time_t || after > ri->last_time_t))
  1642. return 0;
  1643. if(before && (!ri->first_time_t || before < ri->first_time_t))
  1644. return 0;
  1645. if(t_parent->chart_label_key && !rrdlabels_match_simple_pattern_parsed(ri->rrdlabels, t_parent->chart_label_key, '\0'))
  1646. return 0;
  1647. if(t_parent->chart_labels_filter && !rrdlabels_match_simple_pattern_parsed(ri->rrdlabels, t_parent->chart_labels_filter, ':'))
  1648. return 0;
  1649. time_t first_time_t = ri->first_time_t;
  1650. time_t last_time_t = ri->last_time_t;
  1651. RRD_FLAGS flags = ri->flags;
  1652. BUFFER *wb_metrics = NULL;
  1653. if(options & RRDCONTEXT_OPTION_SHOW_METRICS || t_parent->chart_dimensions) {
  1654. wb_metrics = buffer_create(4096);
  1655. struct rrdcontext_to_json t_metrics = {
  1656. .wb = wb_metrics,
  1657. .options = options,
  1658. .chart_label_key = t_parent->chart_label_key,
  1659. .chart_labels_filter = t_parent->chart_labels_filter,
  1660. .chart_dimensions = t_parent->chart_dimensions,
  1661. .after = after,
  1662. .before = before,
  1663. .written = 0,
  1664. .now = t_parent->now,
  1665. };
  1666. dictionary_walkthrough_read(ri->rrdmetrics, rrdmetric_to_json_callback, &t_metrics);
  1667. if(has_filter && !t_metrics.written) {
  1668. buffer_free(wb_metrics);
  1669. return 0;
  1670. }
  1671. first_time_t = t_metrics.combined_first_time_t;
  1672. last_time_t = t_metrics.combined_last_time_t;
  1673. flags = t_metrics.combined_flags;
  1674. }
  1675. if(t_parent->written) {
  1676. buffer_strcat(wb, ",\n");
  1677. t_parent->combined_first_time_t = MIN(t_parent->combined_first_time_t, first_time_t);
  1678. t_parent->combined_last_time_t = MAX(t_parent->combined_last_time_t, last_time_t);
  1679. t_parent->combined_flags |= flags;
  1680. }
  1681. else {
  1682. buffer_strcat(wb, "\n");
  1683. t_parent->combined_first_time_t = first_time_t;
  1684. t_parent->combined_last_time_t = last_time_t;
  1685. t_parent->combined_flags = flags;
  1686. }
  1687. buffer_sprintf(wb, "\t\t\t\t\"%s\": {", id);
  1688. if(options & RRDCONTEXT_OPTION_SHOW_UUIDS) {
  1689. char uuid[UUID_STR_LEN];
  1690. uuid_unparse(ri->uuid, uuid);
  1691. buffer_sprintf(wb,"\n\t\t\t\t\t\"uuid\":\"%s\",", uuid);
  1692. }
  1693. buffer_sprintf(wb,
  1694. "\n\t\t\t\t\t\"name\":\"%s\""
  1695. ",\n\t\t\t\t\t\"context\":\"%s\""
  1696. ",\n\t\t\t\t\t\"title\":\"%s\""
  1697. ",\n\t\t\t\t\t\"units\":\"%s\""
  1698. ",\n\t\t\t\t\t\"family\":\"%s\""
  1699. ",\n\t\t\t\t\t\"chart_type\":\"%s\""
  1700. ",\n\t\t\t\t\t\"priority\":%u"
  1701. ",\n\t\t\t\t\t\"update_every\":%d"
  1702. ",\n\t\t\t\t\t\"first_time_t\":%ld"
  1703. ",\n\t\t\t\t\t\"last_time_t\":%ld"
  1704. ",\n\t\t\t\t\t\"collected\":%s"
  1705. , string2str(ri->name)
  1706. , string2str(ri->rc->id)
  1707. , string2str(ri->title)
  1708. , string2str(ri->units)
  1709. , string2str(ri->family)
  1710. , rrdset_type_name(ri->chart_type)
  1711. , ri->priority
  1712. , ri->update_every
  1713. , first_time_t
  1714. , (flags & RRD_FLAG_COLLECTED) ? t_parent->now : last_time_t
  1715. , (flags & RRD_FLAG_COLLECTED) ? "true" : "false"
  1716. );
  1717. if(options & RRDCONTEXT_OPTION_SHOW_DELETED) {
  1718. buffer_sprintf(wb,
  1719. ",\n\t\t\t\t\t\"deleted\":%s"
  1720. , (ri->flags & RRD_FLAG_DELETED) ? "true" : "false"
  1721. );
  1722. }
  1723. if(options & RRDCONTEXT_OPTION_SHOW_FLAGS) {
  1724. buffer_strcat(wb, ",\n\t\t\t\t\t\"flags\":\"");
  1725. rrd_flags_to_buffer(ri->flags, wb);
  1726. buffer_strcat(wb, "\"");
  1727. }
  1728. if(options & RRDCONTEXT_OPTION_SHOW_LABELS && ri->rrdlabels && dictionary_stats_entries(ri->rrdlabels)) {
  1729. buffer_sprintf(wb, ",\n\t\t\t\t\t\"labels\": {\n");
  1730. rrdlabels_to_buffer(ri->rrdlabels, wb, "\t\t\t\t\t\t", ":", "\"", ",\n", NULL, NULL, NULL, NULL);
  1731. buffer_strcat(wb, "\n\t\t\t\t\t}");
  1732. }
  1733. if(wb_metrics) {
  1734. buffer_sprintf(wb, ",\n\t\t\t\t\t\"dimensions\": {");
  1735. buffer_fast_strcat(wb, buffer_tostring(wb_metrics), buffer_strlen(wb_metrics));
  1736. buffer_strcat(wb, "\n\t\t\t\t\t}");
  1737. buffer_free(wb_metrics);
  1738. }
  1739. buffer_strcat(wb, "\n\t\t\t\t}");
  1740. t_parent->written++;
  1741. return 1;
  1742. }
  1743. static inline int rrdcontext_to_json_callback(const char *id, void *value, void *data) {
  1744. struct rrdcontext_to_json *t_parent = data;
  1745. RRDCONTEXT *rc = value;
  1746. BUFFER *wb = t_parent->wb;
  1747. RRDCONTEXT_TO_JSON_OPTIONS options = t_parent->options;
  1748. time_t after = t_parent->after;
  1749. time_t before = t_parent->before;
  1750. bool has_filter = t_parent->chart_label_key || t_parent->chart_labels_filter || t_parent->chart_dimensions;
  1751. if(unlikely((rc->flags & RRD_FLAG_HIDDEN) && !(options & RRDCONTEXT_OPTION_SHOW_HIDDEN)))
  1752. return 0;
  1753. if((rc->flags & RRD_FLAG_DELETED) && !(options & RRDCONTEXT_OPTION_SHOW_DELETED))
  1754. return 0;
  1755. if(options & RRDCONTEXT_OPTION_DEEPSCAN)
  1756. rrdcontext_recalculate_context_retention(rc, RRD_FLAG_NONE, -1);
  1757. if(after && (!rc->last_time_t || after > rc->last_time_t))
  1758. return 0;
  1759. if(before && (!rc->first_time_t || before < rc->first_time_t))
  1760. return 0;
  1761. time_t first_time_t = rc->first_time_t;
  1762. time_t last_time_t = rc->last_time_t;
  1763. RRD_FLAGS flags = rc->flags;
  1764. BUFFER *wb_instances = NULL;
  1765. if((options & (RRDCONTEXT_OPTION_SHOW_LABELS|RRDCONTEXT_OPTION_SHOW_INSTANCES|RRDCONTEXT_OPTION_SHOW_METRICS))
  1766. || t_parent->chart_label_key
  1767. || t_parent->chart_labels_filter
  1768. || t_parent->chart_dimensions) {
  1769. wb_instances = buffer_create(4096);
  1770. struct rrdcontext_to_json t_instances = {
  1771. .wb = wb_instances,
  1772. .options = options,
  1773. .chart_label_key = t_parent->chart_label_key,
  1774. .chart_labels_filter = t_parent->chart_labels_filter,
  1775. .chart_dimensions = t_parent->chart_dimensions,
  1776. .after = after,
  1777. .before = before,
  1778. .written = 0,
  1779. .now = t_parent->now,
  1780. };
  1781. dictionary_walkthrough_read(rc->rrdinstances, rrdinstance_to_json_callback, &t_instances);
  1782. if(has_filter && !t_instances.written) {
  1783. buffer_free(wb_instances);
  1784. return 0;
  1785. }
  1786. first_time_t = t_instances.combined_first_time_t;
  1787. last_time_t = t_instances.combined_last_time_t;
  1788. flags = t_instances.combined_flags;
  1789. }
  1790. if(t_parent->written)
  1791. buffer_strcat(wb, ",\n");
  1792. else
  1793. buffer_strcat(wb, "\n");
  1794. if(options & RRDCONTEXT_OPTION_SKIP_ID)
  1795. buffer_sprintf(wb, "\t\t\{");
  1796. else
  1797. buffer_sprintf(wb, "\t\t\"%s\": {", id);
  1798. rrdcontext_lock(rc);
  1799. buffer_sprintf(wb,
  1800. "\n\t\t\t\"title\":\"%s\""
  1801. ",\n\t\t\t\"units\":\"%s\""
  1802. ",\n\t\t\t\"family\":\"%s\""
  1803. ",\n\t\t\t\"chart_type\":\"%s\""
  1804. ",\n\t\t\t\"priority\":%u"
  1805. ",\n\t\t\t\"first_time_t\":%ld"
  1806. ",\n\t\t\t\"last_time_t\":%ld"
  1807. ",\n\t\t\t\"collected\":%s"
  1808. , string2str(rc->title)
  1809. , string2str(rc->units)
  1810. , string2str(rc->family)
  1811. , rrdset_type_name(rc->chart_type)
  1812. , rc->priority
  1813. , first_time_t
  1814. , (flags & RRD_FLAG_COLLECTED) ? t_parent->now : last_time_t
  1815. , (flags & RRD_FLAG_COLLECTED) ? "true" : "false"
  1816. );
  1817. if(options & RRDCONTEXT_OPTION_SHOW_DELETED) {
  1818. buffer_sprintf(wb,
  1819. ",\n\t\t\t\"deleted\":%s"
  1820. , (rc->flags & RRD_FLAG_DELETED) ? "true" : "false"
  1821. );
  1822. }
  1823. if(options & RRDCONTEXT_OPTION_SHOW_FLAGS) {
  1824. buffer_strcat(wb, ",\n\t\t\t\"flags\":\"");
  1825. rrd_flags_to_buffer(rc->flags, wb);
  1826. buffer_strcat(wb, "\"");
  1827. }
  1828. if(options & RRDCONTEXT_OPTION_SHOW_QUEUED) {
  1829. buffer_strcat(wb, ",\n\t\t\t\"queued_reasons\":\"");
  1830. rrd_reasons_to_buffer(rc->queue.queued_flags, wb);
  1831. buffer_strcat(wb, "\"");
  1832. buffer_sprintf(wb,
  1833. ",\n\t\t\t\"last_queued\":%llu"
  1834. ",\n\t\t\t\"scheduled_dispatch\":%llu"
  1835. ",\n\t\t\t\"last_dequeued\":%llu"
  1836. ",\n\t\t\t\"hub_version\":%"PRIu64""
  1837. ",\n\t\t\t\"version\":%"PRIu64""
  1838. , rc->queue.queued_ut / USEC_PER_SEC
  1839. , rc->queue.scheduled_dispatch_ut / USEC_PER_SEC
  1840. , rc->queue.dequeued_ut / USEC_PER_SEC
  1841. , rc->hub.version
  1842. , rc->version
  1843. );
  1844. }
  1845. rrdcontext_unlock(rc);
  1846. if(wb_instances) {
  1847. buffer_sprintf(wb, ",\n\t\t\t\"charts\": {");
  1848. buffer_fast_strcat(wb, buffer_tostring(wb_instances), buffer_strlen(wb_instances));
  1849. buffer_strcat(wb, "\n\t\t\t}");
  1850. buffer_free(wb_instances);
  1851. }
  1852. buffer_strcat(wb, "\n\t\t}");
  1853. t_parent->written++;
  1854. return 1;
  1855. }
  1856. int rrdcontext_to_json(RRDHOST *host, BUFFER *wb, time_t after, time_t before, RRDCONTEXT_TO_JSON_OPTIONS options, const char *context, SIMPLE_PATTERN *chart_label_key, SIMPLE_PATTERN *chart_labels_filter, SIMPLE_PATTERN *chart_dimensions) {
  1857. RRDCONTEXT_ACQUIRED *rca = (RRDCONTEXT_ACQUIRED *)dictionary_get_and_acquire_item((DICTIONARY *)host->rrdctx, context);
  1858. if(!rca) return HTTP_RESP_NOT_FOUND;
  1859. RRDCONTEXT *rc = rrdcontext_acquired_value(rca);
  1860. if(after != 0 && before != 0) {
  1861. long long after_wanted = after;
  1862. long long before_wanted = before;
  1863. rrdr_relative_window_to_absolute(&after_wanted, &before_wanted);
  1864. after = after_wanted;
  1865. before = before_wanted;
  1866. }
  1867. struct rrdcontext_to_json t_contexts = {
  1868. .wb = wb,
  1869. .options = options|RRDCONTEXT_OPTION_SKIP_ID,
  1870. .chart_label_key = chart_label_key,
  1871. .chart_labels_filter = chart_labels_filter,
  1872. .chart_dimensions = chart_dimensions,
  1873. .after = after,
  1874. .before = before,
  1875. .written = 0,
  1876. .now = now_realtime_sec(),
  1877. };
  1878. rrdcontext_to_json_callback(context, rc, &t_contexts);
  1879. rrdcontext_release(rca);
  1880. if(!t_contexts.written)
  1881. return HTTP_RESP_NOT_FOUND;
  1882. return HTTP_RESP_OK;
  1883. }
  1884. int rrdcontexts_to_json(RRDHOST *host, BUFFER *wb, time_t after, time_t before, RRDCONTEXT_TO_JSON_OPTIONS options, SIMPLE_PATTERN *chart_label_key, SIMPLE_PATTERN *chart_labels_filter, SIMPLE_PATTERN *chart_dimensions) {
  1885. char node_uuid[UUID_STR_LEN] = "";
  1886. if(host->node_id)
  1887. uuid_unparse(*host->node_id, node_uuid);
  1888. if(after != 0 && before != 0) {
  1889. long long after_wanted = after;
  1890. long long before_wanted = before;
  1891. rrdr_relative_window_to_absolute(&after_wanted, &before_wanted);
  1892. after = after_wanted;
  1893. before = before_wanted;
  1894. }
  1895. buffer_sprintf(wb, "{\n"
  1896. "\t\"hostname\": \"%s\""
  1897. ",\n\t\"machine_guid\": \"%s\""
  1898. ",\n\t\"node_id\": \"%s\""
  1899. ",\n\t\"claim_id\": \"%s\""
  1900. , host->hostname
  1901. , host->machine_guid
  1902. , node_uuid
  1903. , host->aclk_state.claimed_id ? host->aclk_state.claimed_id : ""
  1904. );
  1905. if(options & RRDCONTEXT_OPTION_SHOW_LABELS) {
  1906. buffer_sprintf(wb, ",\n\t\"host_labels\": {\n");
  1907. rrdlabels_to_buffer(host->host_labels, wb, "\t\t", ":", "\"", ",\n", NULL, NULL, NULL, NULL);
  1908. buffer_strcat(wb, "\n\t}");
  1909. }
  1910. buffer_sprintf(wb, ",\n\t\"contexts\": {");
  1911. struct rrdcontext_to_json t_contexts = {
  1912. .wb = wb,
  1913. .options = options,
  1914. .chart_label_key = chart_label_key,
  1915. .chart_labels_filter = chart_labels_filter,
  1916. .chart_dimensions = chart_dimensions,
  1917. .after = after,
  1918. .before = before,
  1919. .written = 0,
  1920. .now = now_realtime_sec(),
  1921. };
  1922. dictionary_walkthrough_read((DICTIONARY *)host->rrdctx, rrdcontext_to_json_callback, &t_contexts);
  1923. // close contexts, close main
  1924. buffer_strcat(wb, "\n\t}\n}");
  1925. return HTTP_RESP_OK;
  1926. }
  1927. // ----------------------------------------------------------------------------
  1928. // load from SQL
  1929. static void rrdinstance_load_clabel(SQL_CLABEL_DATA *sld, void *data) {
  1930. RRDINSTANCE *ri = data;
  1931. rrdlabels_add(ri->rrdlabels, sld->label_key, sld->label_value, sld->label_source);
  1932. }
  1933. static void rrdinstance_load_dimension(SQL_DIMENSION_DATA *sd, void *data) {
  1934. RRDINSTANCE *ri = data;
  1935. RRDMETRIC trm = {
  1936. .id = string_strdupz(sd->id),
  1937. .name = string_strdupz(sd->name),
  1938. .flags = RRD_FLAG_ARCHIVED | RRD_FLAG_UPDATE_REASON_LOAD_SQL,
  1939. };
  1940. uuid_copy(trm.uuid, sd->dim_id);
  1941. dictionary_set(ri->rrdmetrics, string2str(trm.id), &trm, sizeof(trm));
  1942. }
  1943. static void rrdinstance_load_chart_callback(SQL_CHART_DATA *sc, void *data) {
  1944. RRDHOST *host = data;
  1945. RRDCONTEXT tc = {
  1946. .id = string_strdupz(sc->context),
  1947. .title = string_strdupz(sc->title),
  1948. .units = string_strdupz(sc->units),
  1949. .family = string_strdupz(sc->family),
  1950. .priority = sc->priority,
  1951. .chart_type = sc->chart_type,
  1952. .flags = RRD_FLAG_ARCHIVED | RRD_FLAG_DONT_PROCESS | RRD_FLAG_UPDATE_REASON_LOAD_SQL,
  1953. .rrdhost = host,
  1954. };
  1955. RRDCONTEXT_ACQUIRED *rca = (RRDCONTEXT_ACQUIRED *)dictionary_set_and_acquire_item((DICTIONARY *)host->rrdctx, string2str(tc.id), &tc, sizeof(tc));
  1956. RRDCONTEXT *rc = rrdcontext_acquired_value(rca);
  1957. RRDINSTANCE tri = {
  1958. .id = string_strdupz(sc->id),
  1959. .name = string_strdupz(sc->name),
  1960. .title = string_strdupz(sc->title),
  1961. .units = string_strdupz(sc->units),
  1962. .family = string_strdupz(sc->family),
  1963. .chart_type = sc->chart_type,
  1964. .priority = sc->priority,
  1965. .update_every = sc->update_every,
  1966. .flags = RRD_FLAG_ARCHIVED | RRD_FLAG_DONT_PROCESS | RRD_FLAG_UPDATE_REASON_LOAD_SQL,
  1967. };
  1968. uuid_copy(tri.uuid, sc->chart_id);
  1969. RRDINSTANCE_ACQUIRED *ria = (RRDINSTANCE_ACQUIRED *)dictionary_set_and_acquire_item(rc->rrdinstances, sc->id, &tri, sizeof(tri));
  1970. RRDINSTANCE *ri = rrdinstance_acquired_value(ria);
  1971. ctx_get_dimension_list(&ri->uuid, rrdinstance_load_dimension, ri);
  1972. ctx_get_label_list(&ri->uuid, rrdinstance_load_clabel, ri);
  1973. ri->flags &= ~RRD_FLAG_DONT_PROCESS;
  1974. rrdinstance_trigger_updates(ri, true, true);
  1975. // let the instance be in "don't process" mode
  1976. // so that we process it once, when it is collected
  1977. ri->flags |= RRD_FLAG_DONT_PROCESS;
  1978. rrdinstance_release(ria);
  1979. rrdcontext_release(rca);
  1980. }
  1981. static void rrdcontext_load_context_callback(VERSIONED_CONTEXT_DATA *ctx_data, void *data) {
  1982. RRDHOST *host = data;
  1983. (void)host;
  1984. RRDCONTEXT trc = {
  1985. .id = string_strdupz(ctx_data->id),
  1986. .flags = RRD_FLAG_ARCHIVED | RRD_FLAG_DONT_PROCESS | RRD_FLAG_UPDATE_REASON_LOAD_SQL,
  1987. // no need to set more data here
  1988. // we only need the hub data
  1989. .hub = *ctx_data,
  1990. };
  1991. dictionary_set((DICTIONARY *)host->rrdctx, string2str(trc.id), &trc, sizeof(trc));
  1992. }
  1993. void rrdhost_load_rrdcontext_data(RRDHOST *host) {
  1994. if(unlikely(rrdcontext_enabled == CONFIG_BOOLEAN_NO))
  1995. return;
  1996. if(host->rrdctx) return;
  1997. rrdhost_create_rrdcontexts(host);
  1998. ctx_get_context_list(&host->host_uuid, rrdcontext_load_context_callback, host);
  1999. ctx_get_chart_list(&host->host_uuid, rrdinstance_load_chart_callback, host);
  2000. RRDCONTEXT *rc;
  2001. dfe_start_read((DICTIONARY *)host->rrdctx, rc) {
  2002. rc->flags &= ~RRD_FLAG_DONT_PROCESS;
  2003. rrdcontext_trigger_updates(rc, true);
  2004. }
  2005. dfe_done(rc);
  2006. }
  2007. // ----------------------------------------------------------------------------
  2008. // the worker thread
  2009. static inline usec_t rrdcontext_calculate_queued_dispatch_time_ut(RRDCONTEXT *rc, usec_t now_ut) {
  2010. if(likely(rc->queue.delay_calc_ut >= rc->queue.queued_ut))
  2011. return rc->queue.scheduled_dispatch_ut;
  2012. RRD_FLAGS flags = rc->queue.queued_flags;
  2013. usec_t delay = LONG_MAX;
  2014. int i;
  2015. struct rrdcontext_reason *reason;
  2016. for(i = 0, reason = &rrdcontext_reasons[i]; reason->name ; reason = &rrdcontext_reasons[++i]) {
  2017. if(unlikely(flags & reason->flag)) {
  2018. if(reason->delay_ut < delay)
  2019. delay = reason->delay_ut;
  2020. }
  2021. }
  2022. if(unlikely(delay == LONG_MAX)) {
  2023. internal_error(true, "RRDCONTEXT: '%s', cannot find minimum delay of flags %x", string2str(rc->id), (unsigned int)flags);
  2024. delay = 60 * USEC_PER_SEC;
  2025. }
  2026. rc->queue.delay_calc_ut = now_ut;
  2027. usec_t dispatch_ut = rc->queue.scheduled_dispatch_ut = rc->queue.queued_ut + delay;
  2028. return dispatch_ut;
  2029. }
  2030. #define WORKER_JOB_HOSTS 1
  2031. #define WORKER_JOB_CHECK 2
  2032. #define WORKER_JOB_SEND 3
  2033. #define WORKER_JOB_DEQUEUE 4
  2034. #define WORKER_JOB_RETENTION 5
  2035. #define WORKER_JOB_QUEUED 6
  2036. #define WORKER_JOB_CLEANUP 7
  2037. #define WORKER_JOB_CLEANUP_DELETE 8
  2038. static usec_t rrdcontext_next_db_rotation_ut = 0;
  2039. void rrdcontext_db_rotation(void) {
  2040. // called when the db rotates its database
  2041. rrdcontext_next_db_rotation_ut = now_realtime_usec() + FULL_RETENTION_SCAN_DELAY_AFTER_DB_ROTATION_SECS * USEC_PER_SEC;
  2042. }
  2043. static uint64_t rrdcontext_version_hash_with_callback(
  2044. RRDHOST *host,
  2045. void (*callback)(RRDCONTEXT *, bool, void *),
  2046. bool snapshot,
  2047. void *bundle) {
  2048. if(unlikely(!host || !host->rrdctx)) return 0;
  2049. RRDCONTEXT *rc;
  2050. uint64_t hash = 0;
  2051. // loop through all contexts of the host
  2052. dfe_start_read((DICTIONARY *)host->rrdctx, rc) {
  2053. rrdcontext_lock(rc);
  2054. if(unlikely(rc->flags & RRD_FLAG_HIDDEN)) {
  2055. rrdcontext_unlock(rc);
  2056. continue;
  2057. }
  2058. if(unlikely(callback))
  2059. callback(rc, snapshot, bundle);
  2060. // skip any deleted contexts
  2061. if(unlikely(rc->flags & RRD_FLAG_DELETED)) {
  2062. rrdcontext_unlock(rc);
  2063. continue;
  2064. }
  2065. // we use rc->hub.* which has the latest
  2066. // metadata we have sent to the hub
  2067. // if a context is currently queued, rc->hub.* does NOT
  2068. // reflect the queued changes. rc->hub.* is updated with
  2069. // their metadata, after messages are dispatched to hub.
  2070. // when the context is being collected,
  2071. // rc->hub.last_time_t is already zero
  2072. hash += rc->hub.version + rc->hub.last_time_t - rc->hub.first_time_t;
  2073. rrdcontext_unlock(rc);
  2074. }
  2075. dfe_done(rc);
  2076. return hash;
  2077. }
  2078. static void rrdcontext_recalculate_context_retention(RRDCONTEXT *rc, RRD_FLAGS reason, int job_id) {
  2079. RRDINSTANCE *ri;
  2080. dfe_start_read(rc->rrdinstances, ri) {
  2081. RRDMETRIC *rm;
  2082. dfe_start_read(ri->rrdmetrics, rm) {
  2083. if(job_id >= 0)
  2084. worker_is_busy(job_id);
  2085. rrd_flag_set_updated(rm, reason);
  2086. rm->flags &= ~RRD_FLAG_DONT_PROCESS;
  2087. rrdmetric_trigger_updates(rm, true, false);
  2088. }
  2089. dfe_done(rm);
  2090. ri->flags &= ~RRD_FLAG_DONT_PROCESS;
  2091. rrdinstance_trigger_updates(ri, true, false);
  2092. ri->flags |= RRD_FLAG_DONT_PROCESS;
  2093. }
  2094. dfe_done(ri);
  2095. rc->flags &= ~RRD_FLAG_DONT_PROCESS;
  2096. rrdcontext_trigger_updates(rc, true);
  2097. }
  2098. static void rrdcontext_recalculate_host_retention(RRDHOST *host, RRD_FLAGS reason, int job_id) {
  2099. if(unlikely(!host || !host->rrdctx)) return;
  2100. RRDCONTEXT *rc;
  2101. dfe_start_read((DICTIONARY *)host->rrdctx, rc) {
  2102. rrdcontext_recalculate_context_retention(rc, reason, job_id);
  2103. }
  2104. dfe_done(rc);
  2105. }
  2106. static void rrdcontext_recalculate_retention(int job_id) {
  2107. rrdcontext_next_db_rotation_ut = 0;
  2108. rrd_rdlock();
  2109. RRDHOST *host;
  2110. rrdhost_foreach_read(host) {
  2111. rrdcontext_recalculate_host_retention(host, RRD_FLAG_UPDATE_REASON_DB_ROTATION, job_id);
  2112. }
  2113. rrd_unlock();
  2114. }
  2115. void rrdcontext_delete_from_sql_unsafe(RRDCONTEXT *rc) {
  2116. // we need to refresh the string pointers in rc->hub
  2117. // in case the context changed values
  2118. rc->hub.id = string2str(rc->id);
  2119. rc->hub.title = string2str(rc->title);
  2120. rc->hub.units = string2str(rc->units);
  2121. rc->hub.family = string2str(rc->family);
  2122. // delete it from SQL
  2123. if(ctx_delete_context(&rc->rrdhost->host_uuid, &rc->hub) != 0)
  2124. error("RRDCONTEXT: failed to delete context '%s' version %"PRIu64" from SQL.", rc->hub.id, rc->hub.version);
  2125. }
  2126. static void rrdcontext_garbage_collect(void) {
  2127. rrd_rdlock();
  2128. RRDHOST *host;
  2129. rrdhost_foreach_read(host) {
  2130. RRDCONTEXT *rc;
  2131. dfe_start_write((DICTIONARY *)host->rrdctx, rc) {
  2132. worker_is_busy(WORKER_JOB_CLEANUP);
  2133. rrdcontext_lock(rc);
  2134. if(unlikely(rrdcontext_should_be_deleted(rc))) {
  2135. worker_is_busy(WORKER_JOB_CLEANUP_DELETE);
  2136. rrdcontext_delete_from_sql_unsafe(rc);
  2137. if(dictionary_del_having_write_lock((DICTIONARY *)host->rrdctx, string2str(rc->id)) != 0)
  2138. error("RRDCONTEXT: '%s' of host '%s' failed to be deleted from rrdcontext dictionary.",
  2139. string2str(rc->id), host->hostname);
  2140. }
  2141. else {
  2142. RRDINSTANCE *ri;
  2143. dfe_start_write(rc->rrdinstances, ri) {
  2144. if(rrdinstance_should_be_deleted(ri)) {
  2145. worker_is_busy(WORKER_JOB_CLEANUP_DELETE);
  2146. dictionary_del_having_write_lock(rc->rrdinstances, string2str(ri->id));
  2147. }
  2148. else {
  2149. RRDMETRIC *rm;
  2150. dfe_start_write(ri->rrdmetrics, rm) {
  2151. if(rrdmetric_should_be_deleted(rm)) {
  2152. worker_is_busy(WORKER_JOB_CLEANUP_DELETE);
  2153. dictionary_del_having_write_lock(ri->rrdmetrics, string2str(rm->id));
  2154. }
  2155. }
  2156. dfe_done(rm);
  2157. }
  2158. }
  2159. dfe_done(ri);
  2160. }
  2161. // the item is referenced in the dictionary
  2162. // so, it is still here to unlock, even if we have deleted it
  2163. rrdcontext_unlock(rc);
  2164. }
  2165. dfe_done(rc);
  2166. }
  2167. rrd_unlock();
  2168. }
  2169. static void rrdcontext_main_cleanup(void *ptr) {
  2170. struct netdata_static_thread *static_thread = (struct netdata_static_thread *)ptr;
  2171. static_thread->enabled = NETDATA_MAIN_THREAD_EXITING;
  2172. // custom code
  2173. static_thread->enabled = NETDATA_MAIN_THREAD_EXITED;
  2174. }
  2175. void *rrdcontext_main(void *ptr) {
  2176. netdata_thread_cleanup_push(rrdcontext_main_cleanup, ptr);
  2177. if(unlikely(rrdcontext_enabled == CONFIG_BOOLEAN_NO))
  2178. goto exit;
  2179. worker_register("RRDCONTEXT");
  2180. worker_register_job_name(WORKER_JOB_HOSTS, "hosts");
  2181. worker_register_job_name(WORKER_JOB_CHECK, "dedup checks");
  2182. worker_register_job_name(WORKER_JOB_SEND, "sent contexts");
  2183. worker_register_job_name(WORKER_JOB_DEQUEUE, "deduped contexts");
  2184. worker_register_job_name(WORKER_JOB_RETENTION, "metrics retention");
  2185. worker_register_job_name(WORKER_JOB_QUEUED, "queued contexts");
  2186. worker_register_job_name(WORKER_JOB_CLEANUP, "cleanups");
  2187. worker_register_job_name(WORKER_JOB_CLEANUP_DELETE, "deletes");
  2188. heartbeat_t hb;
  2189. heartbeat_init(&hb);
  2190. usec_t step = USEC_PER_SEC * RRDCONTEXT_WORKER_THREAD_HEARTBEAT_SECS;
  2191. while (!netdata_exit) {
  2192. worker_is_idle();
  2193. heartbeat_next(&hb, step);
  2194. if(unlikely(netdata_exit)) break;
  2195. if(!aclk_connected) continue;
  2196. usec_t now_ut = now_realtime_usec();
  2197. if(rrdcontext_next_db_rotation_ut && now_ut > rrdcontext_next_db_rotation_ut) {
  2198. rrdcontext_recalculate_retention(WORKER_JOB_RETENTION);
  2199. rrdcontext_garbage_collect();
  2200. rrdcontext_next_db_rotation_ut = 0;
  2201. }
  2202. rrd_rdlock();
  2203. RRDHOST *host;
  2204. rrdhost_foreach_read(host) {
  2205. if(unlikely(netdata_exit)) break;
  2206. worker_is_busy(WORKER_JOB_HOSTS);
  2207. // check if we have received a streaming command for this host
  2208. if(!rrdhost_flag_check(host, RRDHOST_FLAG_ACLK_STREAM_CONTEXTS))
  2209. continue;
  2210. // check if there are queued items to send
  2211. if(!dictionary_stats_entries((DICTIONARY *)host->rrdctx_queue))
  2212. continue;
  2213. if(!host->node_id)
  2214. continue;
  2215. size_t messages_added = 0;
  2216. contexts_updated_t bundle = NULL;
  2217. RRDCONTEXT *rc;
  2218. dfe_start_write((DICTIONARY *)host->rrdctx_queue, rc) {
  2219. if(unlikely(netdata_exit)) break;
  2220. if(unlikely(messages_added >= MESSAGES_PER_BUNDLE_TO_SEND_TO_HUB_PER_HOST))
  2221. break;
  2222. worker_is_busy(WORKER_JOB_QUEUED);
  2223. usec_t dispatch_ut = rrdcontext_calculate_queued_dispatch_time_ut(rc, now_ut);
  2224. char *claim_id = get_agent_claimid();
  2225. if(unlikely(now_ut >= dispatch_ut) && claim_id) {
  2226. worker_is_busy(WORKER_JOB_CHECK);
  2227. rrdcontext_lock(rc);
  2228. if(check_if_cloud_version_changed_unsafe(rc, true)) {
  2229. worker_is_busy(WORKER_JOB_SEND);
  2230. #ifdef ENABLE_ACLK
  2231. if(!bundle) {
  2232. // prepare the bundle to send the messages
  2233. char uuid[UUID_STR_LEN];
  2234. uuid_unparse_lower(*host->node_id, uuid);
  2235. bundle = contexts_updated_new(claim_id, uuid, 0, now_ut);
  2236. }
  2237. #endif
  2238. // update the hub data of the context, give a new version, pack the message
  2239. // and save an update to SQL
  2240. rrdcontext_message_send_unsafe(rc, false, bundle);
  2241. messages_added++;
  2242. rc->queue.dequeued_ut = now_ut;
  2243. }
  2244. else
  2245. rc->version = rc->hub.version;
  2246. // remove the queued flag, so that it can be queued again
  2247. rc->flags &= ~RRD_FLAG_QUEUED;
  2248. // remove it from the queue
  2249. worker_is_busy(WORKER_JOB_DEQUEUE);
  2250. dictionary_del_having_write_lock((DICTIONARY *)host->rrdctx_queue, string2str(rc->id));
  2251. if(unlikely(rrdcontext_should_be_deleted(rc))) {
  2252. // this is a deleted context - delete it forever...
  2253. worker_is_busy(WORKER_JOB_CLEANUP_DELETE);
  2254. rrdcontext_delete_from_sql_unsafe(rc);
  2255. STRING *id = string_dup(rc->id);
  2256. rrdcontext_unlock(rc);
  2257. // delete it from the master dictionary
  2258. if(dictionary_del((DICTIONARY *)host->rrdctx, string2str(rc->id)) != 0)
  2259. error("RRDCONTEXT: '%s' of host '%s' failed to be deleted from rrdcontext dictionary.",
  2260. string2str(id), host->hostname);
  2261. string_freez(id);
  2262. }
  2263. else
  2264. rrdcontext_unlock(rc);
  2265. }
  2266. freez(claim_id);
  2267. }
  2268. dfe_done(rc);
  2269. #ifdef ENABLE_ACLK
  2270. if(!netdata_exit && bundle) {
  2271. // we have a bundle to send messages
  2272. // update the version hash
  2273. contexts_updated_update_version_hash(bundle, rrdcontext_version_hash(host));
  2274. // send it
  2275. aclk_send_contexts_updated(bundle);
  2276. }
  2277. else if(bundle)
  2278. contexts_updated_delete(bundle);
  2279. #endif
  2280. }
  2281. rrd_unlock();
  2282. }
  2283. exit:
  2284. netdata_thread_cleanup_pop(1);
  2285. return NULL;
  2286. }