123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352 |
- // SPDX-License-Identifier: GPL-3.0-or-later
- #include "replication.h"
- static time_t replicate_chart_timeframe(BUFFER *wb, RRDSET *st, time_t after, time_t before, bool enable_streaming) {
- size_t dimensions = rrdset_number_of_dimensions(st);
- struct storage_engine_query_ops *ops = &st->rrdhost->db[0].eng->api.query_ops;
- struct {
- DICTIONARY *dict;
- const DICTIONARY_ITEM *rda;
- RRDDIM *rd;
- struct storage_engine_query_handle handle;
- STORAGE_POINT sp;
- } data[dimensions];
- memset(data, 0, sizeof(data));
- if(enable_streaming && st->last_updated.tv_sec > before) {
- internal_error(true, "REPLAY: '%s' overwriting replication before from %llu to %llu",
- rrdset_id(st),
- (unsigned long long)before,
- (unsigned long long)st->last_updated.tv_sec
- );
- before = st->last_updated.tv_sec;
- }
- // prepare our array of dimensions
- {
- RRDDIM *rd;
- rrddim_foreach_read(rd, st) {
- if (rd_dfe.counter >= dimensions)
- break;
- data[rd_dfe.counter].dict = rd_dfe.dict;
- data[rd_dfe.counter].rda = dictionary_acquired_item_dup(rd_dfe.dict, rd_dfe.item);
- data[rd_dfe.counter].rd = rd;
- ops->init(rd->tiers[0]->db_metric_handle, &data[rd_dfe.counter].handle, after, before);
- }
- rrddim_foreach_done(rd);
- }
- time_t now = after, actual_after = 0, actual_before = 0;
- while(now <= before) {
- time_t min_start_time = 0, min_end_time = 0;
- for (size_t i = 0; i < dimensions && data[i].rd; i++) {
- // fetch the first valid point for the dimension
- int max_skip = 100;
- while(data[i].sp.end_time < now && !ops->is_finished(&data[i].handle) && max_skip-- > 0)
- data[i].sp = ops->next_metric(&data[i].handle);
- if(max_skip <= 0)
- error("REPLAY: host '%s', chart '%s', dimension '%s': db does not advance the query beyond time %llu",
- rrdhost_hostname(st->rrdhost), rrdset_id(st), rrddim_id(data[i].rd), (unsigned long long)now);
- if(data[i].sp.end_time < now)
- continue;
- if(!min_start_time) {
- min_start_time = data[i].sp.start_time;
- min_end_time = data[i].sp.end_time;
- }
- else {
- min_start_time = MIN(min_start_time, data[i].sp.start_time);
- min_end_time = MIN(min_end_time, data[i].sp.end_time);
- }
- }
- if(min_end_time < now) {
- internal_error(true,
- "REPLAY: host '%s', chart '%s': no data on any dimension beyond time %llu",
- rrdhost_hostname(st->rrdhost), rrdset_id(st), (unsigned long long)now);
- break;
- }
- if(min_end_time <= min_start_time)
- min_start_time = min_end_time - st->update_every;
- if(!actual_after) {
- actual_after = min_end_time;
- actual_before = min_end_time;
- }
- else
- actual_before = min_end_time;
- buffer_sprintf(wb, PLUGINSD_KEYWORD_REPLAY_BEGIN " '' %llu %llu\n"
- , (unsigned long long)min_start_time
- , (unsigned long long)min_end_time);
- // output the replay values for this time
- for (size_t i = 0; i < dimensions && data[i].rd; i++) {
- if(data[i].sp.start_time <= min_end_time && data[i].sp.end_time >= min_end_time)
- buffer_sprintf(wb, PLUGINSD_KEYWORD_REPLAY_SET " \"%s\" " NETDATA_DOUBLE_FORMAT_AUTO " \"%s\"\n",
- rrddim_id(data[i].rd), data[i].sp.sum, data[i].sp.flags & SN_FLAG_RESET ? "R" : "");
- else
- buffer_sprintf(wb, PLUGINSD_KEYWORD_REPLAY_SET " \"%s\" NAN \"E\"\n",
- rrddim_id(data[i].rd));
- }
- now = min_end_time + 1;
- }
- #ifdef NETDATA_INTERNAL_CHECKS
- if(actual_after) {
- char actual_after_buf[LOG_DATE_LENGTH + 1], actual_before_buf[LOG_DATE_LENGTH + 1];
- log_date(actual_after_buf, LOG_DATE_LENGTH, actual_after);
- log_date(actual_before_buf, LOG_DATE_LENGTH, actual_before);
- internal_error(true,
- "REPLAY: host '%s', chart '%s': sending data %llu [%s] to %llu [%s] (requested %llu [delta %lld] to %llu [delta %lld])",
- rrdhost_hostname(st->rrdhost), rrdset_id(st),
- (unsigned long long)actual_after, actual_after_buf, (unsigned long long)actual_before, actual_before_buf,
- (unsigned long long)after, (long long)(actual_after - after), (unsigned long long)before, (long long)(actual_before - before));
- }
- else
- internal_error(true,
- "REPLAY: host '%s', chart '%s': nothing to send (requested %llu to %llu)",
- rrdhost_hostname(st->rrdhost), rrdset_id(st),
- (unsigned long long)after, (unsigned long long)before);
- #endif
- // release all the dictionary items acquired
- // finalize the queries
- for(size_t i = 0; i < dimensions && data[i].rda ;i++) {
- ops->finalize(&data[i].handle);
- dictionary_acquired_item_release(data[i].dict, data[i].rda);
- }
- return before;
- }
- static void replicate_chart_collection_state(BUFFER *wb, RRDSET *st) {
- RRDDIM *rd;
- rrddim_foreach_read(rd, st) {
- buffer_sprintf(wb, PLUGINSD_KEYWORD_REPLAY_RRDDIM_STATE " \"%s\" %llu %lld " NETDATA_DOUBLE_FORMAT_AUTO " " NETDATA_DOUBLE_FORMAT_AUTO "\n",
- rrddim_id(rd),
- (usec_t)rd->last_collected_time.tv_sec * USEC_PER_SEC + (usec_t)rd->last_collected_time.tv_usec,
- rd->last_collected_value,
- rd->last_calculated_value,
- rd->last_stored_value
- );
- }
- rrddim_foreach_done(rd);
- buffer_sprintf(wb, PLUGINSD_KEYWORD_REPLAY_RRDSET_STATE " %llu %llu\n",
- (usec_t)st->last_collected_time.tv_sec * USEC_PER_SEC + (usec_t)st->last_collected_time.tv_usec,
- (usec_t)st->last_updated.tv_sec * USEC_PER_SEC + (usec_t)st->last_updated.tv_usec
- );
- }
- bool replicate_chart_response(RRDHOST *host, RRDSET *st, bool start_streaming, time_t after, time_t before) {
- time_t query_after = after;
- time_t query_before = before;
- time_t now = now_realtime_sec();
- time_t tolerance = 2; // sometimes from the time we get this value, to the time we check,
- // a data collection has been made
- // so, we give this tolerance to detect invalid timestamps
- // find the first entry we have
- time_t first_entry_local = rrdset_first_entry_t(st);
- if(first_entry_local > now + tolerance) {
- internal_error(true,
- "RRDSET: '%s' first time %llu is in the future (now is %llu)",
- rrdset_id(st), (unsigned long long)first_entry_local, (unsigned long long)now);
- first_entry_local = now;
- }
- if (query_after < first_entry_local)
- query_after = first_entry_local;
- // find the latest entry we have
- time_t last_entry_local = st->last_updated.tv_sec;
- if(!last_entry_local) {
- internal_error(true,
- "RRDSET: '%s' last updated time zero. Querying db for last updated time.",
- rrdset_id(st));
- last_entry_local = rrdset_last_entry_t(st);
- }
- if(last_entry_local > now + tolerance) {
- internal_error(true,
- "RRDSET: '%s' last updated time %llu is in the future (now is %llu)",
- rrdset_id(st), (unsigned long long)last_entry_local, (unsigned long long)now);
- last_entry_local = now;
- }
- if (query_before > last_entry_local)
- query_before = last_entry_local;
- // if the parent asked us to start streaming, then fill the rest with the data that we have
- if (start_streaming)
- query_before = last_entry_local;
- if (query_after > query_before) {
- time_t tmp = query_before;
- query_before = query_after;
- query_after = tmp;
- }
- bool enable_streaming = (start_streaming || query_before == last_entry_local || !after || !before) ? true : false;
- // we might want to optimize this by filling a temporary buffer
- // and copying the result to the host's buffer in order to avoid
- // holding the host's buffer lock for too long
- BUFFER *wb = sender_start(host->sender);
- {
- // pass the original after/before so that the parent knows about
- // which time range we responded
- buffer_sprintf(wb, PLUGINSD_KEYWORD_REPLAY_BEGIN " \"%s\"\n", rrdset_id(st));
- if(after != 0 && before != 0)
- before = replicate_chart_timeframe(wb, st, query_after, query_before, enable_streaming);
- else {
- after = 0;
- before = 0;
- enable_streaming = true;
- }
- if(enable_streaming)
- replicate_chart_collection_state(wb, st);
- // end with first/last entries we have, and the first start time and
- // last end time of the data we sent
- buffer_sprintf(wb, PLUGINSD_KEYWORD_REPLAY_END " %d %llu %llu %s %llu %llu\n",
- (int)st->update_every, (unsigned long long)first_entry_local, (unsigned long long)last_entry_local,
- enable_streaming ? "true" : "false", (unsigned long long)after, (unsigned long long)before);
- }
- sender_commit(host->sender, wb);
- return enable_streaming;
- }
- static bool send_replay_chart_cmd(send_command callback, void *callback_data, RRDSET *st, bool start_streaming, time_t after, time_t before) {
- #ifdef NETDATA_INTERNAL_CHECKS
- if(after && before) {
- char after_buf[LOG_DATE_LENGTH + 1], before_buf[LOG_DATE_LENGTH + 1];
- log_date(after_buf, LOG_DATE_LENGTH, after);
- log_date(before_buf, LOG_DATE_LENGTH, before);
- internal_error(true,
- "REPLAY: host '%s', chart '%s': sending replication request %llu [%s] to %llu [%s], start streaming: %s",
- rrdhost_hostname(st->rrdhost), rrdset_id(st),
- (unsigned long long)after, after_buf, (unsigned long long)before, before_buf,
- start_streaming?"true":"false");
- }
- else {
- internal_error(true,
- "REPLAY: host '%s', chart '%s': sending empty replication request, start streaming: %s",
- rrdhost_hostname(st->rrdhost), rrdset_id(st),
- start_streaming?"true":"false");
- }
- #endif
- debug(D_REPLICATION, PLUGINSD_KEYWORD_REPLAY_CHART " \"%s\" \"%s\" %llu %llu\n",
- rrdset_id(st), start_streaming ? "true" : "false", (unsigned long long)after, (unsigned long long)before);
- char buffer[2048 + 1];
- snprintfz(buffer, 2048, PLUGINSD_KEYWORD_REPLAY_CHART " \"%s\" \"%s\" %llu %llu\n",
- rrdset_id(st), start_streaming ? "true" : "false",
- (unsigned long long)after, (unsigned long long)before);
- int ret = callback(buffer, callback_data);
- if (ret < 0) {
- error("failed to send replay request to child (ret=%d)", ret);
- return false;
- }
- return true;
- }
- bool replicate_chart_request(send_command callback, void *callback_data, RRDHOST *host, RRDSET *st,
- time_t first_entry_child, time_t last_entry_child,
- time_t prev_first_entry_wanted, time_t prev_last_entry_wanted)
- {
- time_t now = now_realtime_sec();
- // if replication is disabled, send an empty replication request
- // asking no data
- if (!host->rrdpush_enable_replication) {
- internal_error(true,
- "REPLAY: host '%s', chart '%s': sending empty replication request because replication is disabled",
- rrdhost_hostname(host), rrdset_id(st));
- return send_replay_chart_cmd(callback, callback_data, st, true, 0, 0);
- }
- // Child has no stored data
- if (!last_entry_child) {
- error("REPLAY: host '%s', chart '%s': sending empty replication request because child has no stored data",
- rrdhost_hostname(host), rrdset_id(st));
- return send_replay_chart_cmd(callback, callback_data, st, true, 0, 0);
- }
- // Nothing to get if the chart has not dimensions
- if (!rrdset_number_of_dimensions(st)) {
- error("REPLAY: host '%s', chart '%s': sending empty replication request because chart has no dimensions",
- rrdhost_hostname(host), rrdset_id(st));
- return send_replay_chart_cmd(callback, callback_data, st, true, 0, 0);
- }
- // if the child's first/last entries are nonsensical, resume streaming
- // without asking for any data
- if (first_entry_child <= 0) {
- error("REPLAY: host '%s', chart '%s': sending empty replication because first entry of the child is invalid (%llu)",
- rrdhost_hostname(host), rrdset_id(st), (unsigned long long)first_entry_child);
- return send_replay_chart_cmd(callback, callback_data, st, true, 0, 0);
- }
- if (first_entry_child > last_entry_child) {
- error("REPLAY: host '%s', chart '%s': sending empty replication because child timings are invalid (first entry %llu > last entry %llu)",
- rrdhost_hostname(host), rrdset_id(st), (unsigned long long)first_entry_child, (unsigned long long)last_entry_child);
- return send_replay_chart_cmd(callback, callback_data, st, true, 0, 0);
- }
- time_t last_entry_local = rrdset_last_entry_t(st);
- if(last_entry_local > now) {
- internal_error(true,
- "REPLAY: host '%s', chart '%s': local last entry time %llu is in the future (now is %llu). Adjusting it.",
- rrdhost_hostname(host), rrdset_id(st), (unsigned long long)last_entry_local, (unsigned long long)now);
- last_entry_local = now;
- }
- // should never happen but it if does, start streaming without asking
- // for any data
- if (last_entry_local > last_entry_child) {
- error("REPLAY: host '%s', chart '%s': sending empty replication request because our last entry (%llu) in later than the child one (%llu)",
- rrdhost_hostname(host), rrdset_id(st), (unsigned long long)last_entry_local, (unsigned long long)last_entry_child);
- return send_replay_chart_cmd(callback, callback_data, st, true, 0, 0);
- }
- time_t first_entry_wanted;
- if (prev_first_entry_wanted && prev_last_entry_wanted) {
- first_entry_wanted = prev_last_entry_wanted;
- if ((now - first_entry_wanted) > host->rrdpush_seconds_to_replicate)
- first_entry_wanted = now - host->rrdpush_seconds_to_replicate;
- }
- else
- first_entry_wanted = MAX(last_entry_local, first_entry_child);
- time_t last_entry_wanted = first_entry_wanted + host->rrdpush_replication_step;
- last_entry_wanted = MIN(last_entry_wanted, last_entry_child);
- bool start_streaming = (last_entry_wanted == last_entry_child);
- return send_replay_chart_cmd(callback, callback_data, st, start_streaming, first_entry_wanted, last_entry_wanted);
- }
|