replication.c 15 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352
  1. // SPDX-License-Identifier: GPL-3.0-or-later
  2. #include "replication.h"
  3. static time_t replicate_chart_timeframe(BUFFER *wb, RRDSET *st, time_t after, time_t before, bool enable_streaming) {
  4. size_t dimensions = rrdset_number_of_dimensions(st);
  5. struct storage_engine_query_ops *ops = &st->rrdhost->db[0].eng->api.query_ops;
  6. struct {
  7. DICTIONARY *dict;
  8. const DICTIONARY_ITEM *rda;
  9. RRDDIM *rd;
  10. struct storage_engine_query_handle handle;
  11. STORAGE_POINT sp;
  12. } data[dimensions];
  13. memset(data, 0, sizeof(data));
  14. if(enable_streaming && st->last_updated.tv_sec > before) {
  15. internal_error(true, "REPLAY: '%s' overwriting replication before from %llu to %llu",
  16. rrdset_id(st),
  17. (unsigned long long)before,
  18. (unsigned long long)st->last_updated.tv_sec
  19. );
  20. before = st->last_updated.tv_sec;
  21. }
  22. // prepare our array of dimensions
  23. {
  24. RRDDIM *rd;
  25. rrddim_foreach_read(rd, st) {
  26. if (rd_dfe.counter >= dimensions)
  27. break;
  28. data[rd_dfe.counter].dict = rd_dfe.dict;
  29. data[rd_dfe.counter].rda = dictionary_acquired_item_dup(rd_dfe.dict, rd_dfe.item);
  30. data[rd_dfe.counter].rd = rd;
  31. ops->init(rd->tiers[0]->db_metric_handle, &data[rd_dfe.counter].handle, after, before);
  32. }
  33. rrddim_foreach_done(rd);
  34. }
  35. time_t now = after, actual_after = 0, actual_before = 0;
  36. while(now <= before) {
  37. time_t min_start_time = 0, min_end_time = 0;
  38. for (size_t i = 0; i < dimensions && data[i].rd; i++) {
  39. // fetch the first valid point for the dimension
  40. int max_skip = 100;
  41. while(data[i].sp.end_time < now && !ops->is_finished(&data[i].handle) && max_skip-- > 0)
  42. data[i].sp = ops->next_metric(&data[i].handle);
  43. if(max_skip <= 0)
  44. error("REPLAY: host '%s', chart '%s', dimension '%s': db does not advance the query beyond time %llu",
  45. rrdhost_hostname(st->rrdhost), rrdset_id(st), rrddim_id(data[i].rd), (unsigned long long)now);
  46. if(data[i].sp.end_time < now)
  47. continue;
  48. if(!min_start_time) {
  49. min_start_time = data[i].sp.start_time;
  50. min_end_time = data[i].sp.end_time;
  51. }
  52. else {
  53. min_start_time = MIN(min_start_time, data[i].sp.start_time);
  54. min_end_time = MIN(min_end_time, data[i].sp.end_time);
  55. }
  56. }
  57. if(min_end_time < now) {
  58. internal_error(true,
  59. "REPLAY: host '%s', chart '%s': no data on any dimension beyond time %llu",
  60. rrdhost_hostname(st->rrdhost), rrdset_id(st), (unsigned long long)now);
  61. break;
  62. }
  63. if(min_end_time <= min_start_time)
  64. min_start_time = min_end_time - st->update_every;
  65. if(!actual_after) {
  66. actual_after = min_end_time;
  67. actual_before = min_end_time;
  68. }
  69. else
  70. actual_before = min_end_time;
  71. buffer_sprintf(wb, PLUGINSD_KEYWORD_REPLAY_BEGIN " '' %llu %llu\n"
  72. , (unsigned long long)min_start_time
  73. , (unsigned long long)min_end_time);
  74. // output the replay values for this time
  75. for (size_t i = 0; i < dimensions && data[i].rd; i++) {
  76. if(data[i].sp.start_time <= min_end_time && data[i].sp.end_time >= min_end_time)
  77. buffer_sprintf(wb, PLUGINSD_KEYWORD_REPLAY_SET " \"%s\" " NETDATA_DOUBLE_FORMAT_AUTO " \"%s\"\n",
  78. rrddim_id(data[i].rd), data[i].sp.sum, data[i].sp.flags & SN_FLAG_RESET ? "R" : "");
  79. else
  80. buffer_sprintf(wb, PLUGINSD_KEYWORD_REPLAY_SET " \"%s\" NAN \"E\"\n",
  81. rrddim_id(data[i].rd));
  82. }
  83. now = min_end_time + 1;
  84. }
  85. #ifdef NETDATA_INTERNAL_CHECKS
  86. if(actual_after) {
  87. char actual_after_buf[LOG_DATE_LENGTH + 1], actual_before_buf[LOG_DATE_LENGTH + 1];
  88. log_date(actual_after_buf, LOG_DATE_LENGTH, actual_after);
  89. log_date(actual_before_buf, LOG_DATE_LENGTH, actual_before);
  90. internal_error(true,
  91. "REPLAY: host '%s', chart '%s': sending data %llu [%s] to %llu [%s] (requested %llu [delta %lld] to %llu [delta %lld])",
  92. rrdhost_hostname(st->rrdhost), rrdset_id(st),
  93. (unsigned long long)actual_after, actual_after_buf, (unsigned long long)actual_before, actual_before_buf,
  94. (unsigned long long)after, (long long)(actual_after - after), (unsigned long long)before, (long long)(actual_before - before));
  95. }
  96. else
  97. internal_error(true,
  98. "REPLAY: host '%s', chart '%s': nothing to send (requested %llu to %llu)",
  99. rrdhost_hostname(st->rrdhost), rrdset_id(st),
  100. (unsigned long long)after, (unsigned long long)before);
  101. #endif
  102. // release all the dictionary items acquired
  103. // finalize the queries
  104. for(size_t i = 0; i < dimensions && data[i].rda ;i++) {
  105. ops->finalize(&data[i].handle);
  106. dictionary_acquired_item_release(data[i].dict, data[i].rda);
  107. }
  108. return before;
  109. }
  110. static void replicate_chart_collection_state(BUFFER *wb, RRDSET *st) {
  111. RRDDIM *rd;
  112. rrddim_foreach_read(rd, st) {
  113. buffer_sprintf(wb, PLUGINSD_KEYWORD_REPLAY_RRDDIM_STATE " \"%s\" %llu %lld " NETDATA_DOUBLE_FORMAT_AUTO " " NETDATA_DOUBLE_FORMAT_AUTO "\n",
  114. rrddim_id(rd),
  115. (usec_t)rd->last_collected_time.tv_sec * USEC_PER_SEC + (usec_t)rd->last_collected_time.tv_usec,
  116. rd->last_collected_value,
  117. rd->last_calculated_value,
  118. rd->last_stored_value
  119. );
  120. }
  121. rrddim_foreach_done(rd);
  122. buffer_sprintf(wb, PLUGINSD_KEYWORD_REPLAY_RRDSET_STATE " %llu %llu\n",
  123. (usec_t)st->last_collected_time.tv_sec * USEC_PER_SEC + (usec_t)st->last_collected_time.tv_usec,
  124. (usec_t)st->last_updated.tv_sec * USEC_PER_SEC + (usec_t)st->last_updated.tv_usec
  125. );
  126. }
  127. bool replicate_chart_response(RRDHOST *host, RRDSET *st, bool start_streaming, time_t after, time_t before) {
  128. time_t query_after = after;
  129. time_t query_before = before;
  130. time_t now = now_realtime_sec();
  131. time_t tolerance = 2; // sometimes from the time we get this value, to the time we check,
  132. // a data collection has been made
  133. // so, we give this tolerance to detect invalid timestamps
  134. // find the first entry we have
  135. time_t first_entry_local = rrdset_first_entry_t(st);
  136. if(first_entry_local > now + tolerance) {
  137. internal_error(true,
  138. "RRDSET: '%s' first time %llu is in the future (now is %llu)",
  139. rrdset_id(st), (unsigned long long)first_entry_local, (unsigned long long)now);
  140. first_entry_local = now;
  141. }
  142. if (query_after < first_entry_local)
  143. query_after = first_entry_local;
  144. // find the latest entry we have
  145. time_t last_entry_local = st->last_updated.tv_sec;
  146. if(!last_entry_local) {
  147. internal_error(true,
  148. "RRDSET: '%s' last updated time zero. Querying db for last updated time.",
  149. rrdset_id(st));
  150. last_entry_local = rrdset_last_entry_t(st);
  151. }
  152. if(last_entry_local > now + tolerance) {
  153. internal_error(true,
  154. "RRDSET: '%s' last updated time %llu is in the future (now is %llu)",
  155. rrdset_id(st), (unsigned long long)last_entry_local, (unsigned long long)now);
  156. last_entry_local = now;
  157. }
  158. if (query_before > last_entry_local)
  159. query_before = last_entry_local;
  160. // if the parent asked us to start streaming, then fill the rest with the data that we have
  161. if (start_streaming)
  162. query_before = last_entry_local;
  163. if (query_after > query_before) {
  164. time_t tmp = query_before;
  165. query_before = query_after;
  166. query_after = tmp;
  167. }
  168. bool enable_streaming = (start_streaming || query_before == last_entry_local || !after || !before) ? true : false;
  169. // we might want to optimize this by filling a temporary buffer
  170. // and copying the result to the host's buffer in order to avoid
  171. // holding the host's buffer lock for too long
  172. BUFFER *wb = sender_start(host->sender);
  173. {
  174. // pass the original after/before so that the parent knows about
  175. // which time range we responded
  176. buffer_sprintf(wb, PLUGINSD_KEYWORD_REPLAY_BEGIN " \"%s\"\n", rrdset_id(st));
  177. if(after != 0 && before != 0)
  178. before = replicate_chart_timeframe(wb, st, query_after, query_before, enable_streaming);
  179. else {
  180. after = 0;
  181. before = 0;
  182. enable_streaming = true;
  183. }
  184. if(enable_streaming)
  185. replicate_chart_collection_state(wb, st);
  186. // end with first/last entries we have, and the first start time and
  187. // last end time of the data we sent
  188. buffer_sprintf(wb, PLUGINSD_KEYWORD_REPLAY_END " %d %llu %llu %s %llu %llu\n",
  189. (int)st->update_every, (unsigned long long)first_entry_local, (unsigned long long)last_entry_local,
  190. enable_streaming ? "true" : "false", (unsigned long long)after, (unsigned long long)before);
  191. }
  192. sender_commit(host->sender, wb);
  193. return enable_streaming;
  194. }
  195. static bool send_replay_chart_cmd(send_command callback, void *callback_data, RRDSET *st, bool start_streaming, time_t after, time_t before) {
  196. #ifdef NETDATA_INTERNAL_CHECKS
  197. if(after && before) {
  198. char after_buf[LOG_DATE_LENGTH + 1], before_buf[LOG_DATE_LENGTH + 1];
  199. log_date(after_buf, LOG_DATE_LENGTH, after);
  200. log_date(before_buf, LOG_DATE_LENGTH, before);
  201. internal_error(true,
  202. "REPLAY: host '%s', chart '%s': sending replication request %llu [%s] to %llu [%s], start streaming: %s",
  203. rrdhost_hostname(st->rrdhost), rrdset_id(st),
  204. (unsigned long long)after, after_buf, (unsigned long long)before, before_buf,
  205. start_streaming?"true":"false");
  206. }
  207. else {
  208. internal_error(true,
  209. "REPLAY: host '%s', chart '%s': sending empty replication request, start streaming: %s",
  210. rrdhost_hostname(st->rrdhost), rrdset_id(st),
  211. start_streaming?"true":"false");
  212. }
  213. #endif
  214. debug(D_REPLICATION, PLUGINSD_KEYWORD_REPLAY_CHART " \"%s\" \"%s\" %llu %llu\n",
  215. rrdset_id(st), start_streaming ? "true" : "false", (unsigned long long)after, (unsigned long long)before);
  216. char buffer[2048 + 1];
  217. snprintfz(buffer, 2048, PLUGINSD_KEYWORD_REPLAY_CHART " \"%s\" \"%s\" %llu %llu\n",
  218. rrdset_id(st), start_streaming ? "true" : "false",
  219. (unsigned long long)after, (unsigned long long)before);
  220. int ret = callback(buffer, callback_data);
  221. if (ret < 0) {
  222. error("failed to send replay request to child (ret=%d)", ret);
  223. return false;
  224. }
  225. return true;
  226. }
  227. bool replicate_chart_request(send_command callback, void *callback_data, RRDHOST *host, RRDSET *st,
  228. time_t first_entry_child, time_t last_entry_child,
  229. time_t prev_first_entry_wanted, time_t prev_last_entry_wanted)
  230. {
  231. time_t now = now_realtime_sec();
  232. // if replication is disabled, send an empty replication request
  233. // asking no data
  234. if (!host->rrdpush_enable_replication) {
  235. internal_error(true,
  236. "REPLAY: host '%s', chart '%s': sending empty replication request because replication is disabled",
  237. rrdhost_hostname(host), rrdset_id(st));
  238. return send_replay_chart_cmd(callback, callback_data, st, true, 0, 0);
  239. }
  240. // Child has no stored data
  241. if (!last_entry_child) {
  242. error("REPLAY: host '%s', chart '%s': sending empty replication request because child has no stored data",
  243. rrdhost_hostname(host), rrdset_id(st));
  244. return send_replay_chart_cmd(callback, callback_data, st, true, 0, 0);
  245. }
  246. // Nothing to get if the chart has not dimensions
  247. if (!rrdset_number_of_dimensions(st)) {
  248. error("REPLAY: host '%s', chart '%s': sending empty replication request because chart has no dimensions",
  249. rrdhost_hostname(host), rrdset_id(st));
  250. return send_replay_chart_cmd(callback, callback_data, st, true, 0, 0);
  251. }
  252. // if the child's first/last entries are nonsensical, resume streaming
  253. // without asking for any data
  254. if (first_entry_child <= 0) {
  255. error("REPLAY: host '%s', chart '%s': sending empty replication because first entry of the child is invalid (%llu)",
  256. rrdhost_hostname(host), rrdset_id(st), (unsigned long long)first_entry_child);
  257. return send_replay_chart_cmd(callback, callback_data, st, true, 0, 0);
  258. }
  259. if (first_entry_child > last_entry_child) {
  260. error("REPLAY: host '%s', chart '%s': sending empty replication because child timings are invalid (first entry %llu > last entry %llu)",
  261. rrdhost_hostname(host), rrdset_id(st), (unsigned long long)first_entry_child, (unsigned long long)last_entry_child);
  262. return send_replay_chart_cmd(callback, callback_data, st, true, 0, 0);
  263. }
  264. time_t last_entry_local = rrdset_last_entry_t(st);
  265. if(last_entry_local > now) {
  266. internal_error(true,
  267. "REPLAY: host '%s', chart '%s': local last entry time %llu is in the future (now is %llu). Adjusting it.",
  268. rrdhost_hostname(host), rrdset_id(st), (unsigned long long)last_entry_local, (unsigned long long)now);
  269. last_entry_local = now;
  270. }
  271. // should never happen but it if does, start streaming without asking
  272. // for any data
  273. if (last_entry_local > last_entry_child) {
  274. error("REPLAY: host '%s', chart '%s': sending empty replication request because our last entry (%llu) in later than the child one (%llu)",
  275. rrdhost_hostname(host), rrdset_id(st), (unsigned long long)last_entry_local, (unsigned long long)last_entry_child);
  276. return send_replay_chart_cmd(callback, callback_data, st, true, 0, 0);
  277. }
  278. time_t first_entry_wanted;
  279. if (prev_first_entry_wanted && prev_last_entry_wanted) {
  280. first_entry_wanted = prev_last_entry_wanted;
  281. if ((now - first_entry_wanted) > host->rrdpush_seconds_to_replicate)
  282. first_entry_wanted = now - host->rrdpush_seconds_to_replicate;
  283. }
  284. else
  285. first_entry_wanted = MAX(last_entry_local, first_entry_child);
  286. time_t last_entry_wanted = first_entry_wanted + host->rrdpush_replication_step;
  287. last_entry_wanted = MIN(last_entry_wanted, last_entry_child);
  288. bool start_streaming = (last_entry_wanted == last_entry_child);
  289. return send_replay_chart_cmd(callback, callback_data, st, start_streaming, first_entry_wanted, last_entry_wanted);
  290. }