123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428 |
- // SPDX-License-Identifier: GPL-3.0-or-later
- #include "exporting_engine.h"
- /**
- * Normalize chart and dimension names
- *
- * Substitute '_' for any special character except '.'.
- *
- * @param dst where to copy name to.
- * @param src where to copy name from.
- * @param max_len the maximum size of copied name.
- * @return Returns the size of the copied name.
- */
- size_t exporting_name_copy(char *dst, const char *src, size_t max_len)
- {
- size_t n;
- for (n = 0; *src && n < max_len; dst++, src++, n++) {
- char c = *src;
- if (c != '.' && !isalnum(c))
- *dst = '_';
- else
- *dst = c;
- }
- *dst = '\0';
- return n;
- }
- /**
- * Mark scheduled instances
- *
- * Any instance can have its own update interval. On every exporting engine update only those instances are picked,
- * which are scheduled for the update.
- *
- * @param engine an engine data structure.
- * @return Returns 1 if there are instances to process
- */
- int mark_scheduled_instances(struct engine *engine)
- {
- int instances_were_scheduled = 0;
- for (struct instance *instance = engine->instance_root; instance; instance = instance->next) {
- if (!instance->disabled && (engine->now % instance->config.update_every >=
- instance->config.update_every - localhost->rrd_update_every)) {
- instance->scheduled = 1;
- instances_were_scheduled = 1;
- instance->before = engine->now;
- }
- }
- return instances_were_scheduled;
- }
- /**
- * Calculate the SUM or AVERAGE of a dimension, for any timeframe
- *
- * May return NAN if the database does not have any value in the give timeframe.
- *
- * @param instance an instance data structure.
- * @param rd a dimension(metric) in the Netdata database.
- * @param last_timestamp the timestamp that should be reported to the exporting connector instance.
- * @return Returns the value, calculated over the given period.
- */
- calculated_number exporting_calculate_value_from_stored_data(
- struct instance *instance,
- RRDDIM *rd,
- time_t *last_timestamp)
- {
- RRDSET *st = rd->rrdset;
- #ifdef NETDATA_INTERNAL_CHECKS
- RRDHOST *host = st->rrdhost;
- #endif
- time_t after = instance->after;
- time_t before = instance->before;
- // find the edges of the rrd database for this chart
- time_t first_t = rd->state->query_ops.oldest_time(rd);
- time_t last_t = rd->state->query_ops.latest_time(rd);
- time_t update_every = st->update_every;
- struct rrddim_query_handle handle;
- storage_number n;
- // step back a little, to make sure we have complete data collection
- // for all metrics
- after -= update_every * 2;
- before -= update_every * 2;
- // align the time-frame
- after = after - (after % update_every);
- before = before - (before % update_every);
- // for before, loose another iteration
- // the latest point will be reported the next time
- before -= update_every;
- if (unlikely(after > before))
- // this can happen when update_every > before - after
- after = before;
- if (unlikely(after < first_t))
- after = first_t;
- if (unlikely(before > last_t))
- before = last_t;
- if (unlikely(before < first_t || after > last_t)) {
- // the chart has not been updated in the wanted timeframe
- debug(
- D_EXPORTING,
- "EXPORTING: %s.%s.%s: aligned timeframe %lu to %lu is outside the chart's database range %lu to %lu",
- host->hostname,
- st->id,
- rd->id,
- (unsigned long)after,
- (unsigned long)before,
- (unsigned long)first_t,
- (unsigned long)last_t);
- return NAN;
- }
- *last_timestamp = before;
- size_t counter = 0;
- calculated_number sum = 0;
- for (rd->state->query_ops.init(rd, &handle, after, before); !rd->state->query_ops.is_finished(&handle);) {
- time_t curr_t;
- n = rd->state->query_ops.next_metric(&handle, &curr_t);
- if (unlikely(!does_storage_number_exist(n))) {
- // not collected
- continue;
- }
- calculated_number value = unpack_storage_number(n);
- sum += value;
- counter++;
- }
- rd->state->query_ops.finalize(&handle);
- if (unlikely(!counter)) {
- debug(
- D_EXPORTING,
- "EXPORTING: %s.%s.%s: no values stored in database for range %lu to %lu",
- host->hostname,
- st->id,
- rd->id,
- (unsigned long)after,
- (unsigned long)before);
- return NAN;
- }
- if (unlikely(EXPORTING_OPTIONS_DATA_SOURCE(instance->config.options) == EXPORTING_SOURCE_DATA_SUM))
- return sum;
- return sum / (calculated_number)counter;
- }
- /**
- * Start batch formatting for every connector instance's buffer
- *
- * @param engine an engine data structure.
- */
- void start_batch_formatting(struct engine *engine)
- {
- for (struct instance *instance = engine->instance_root; instance; instance = instance->next) {
- if (instance->scheduled) {
- uv_mutex_lock(&instance->mutex);
- if (instance->start_batch_formatting && instance->start_batch_formatting(instance) != 0) {
- error("EXPORTING: cannot start batch formatting for %s", instance->config.name);
- disable_instance(instance);
- }
- }
- }
- }
- /**
- * Start host formatting for every connector instance's buffer
- *
- * @param engine an engine data structure.
- * @param host a data collecting host.
- */
- void start_host_formatting(struct engine *engine, RRDHOST *host)
- {
- for (struct instance *instance = engine->instance_root; instance; instance = instance->next) {
- if (instance->scheduled) {
- if (rrdhost_is_exportable(instance, host)) {
- if (instance->start_host_formatting && instance->start_host_formatting(instance, host) != 0) {
- error("EXPORTING: cannot start host formatting for %s", instance->config.name);
- disable_instance(instance);
- }
- } else {
- instance->skip_host = 1;
- }
- }
- }
- }
- /**
- * Start chart formatting for every connector instance's buffer
- *
- * @param engine an engine data structure.
- * @param st a chart.
- */
- void start_chart_formatting(struct engine *engine, RRDSET *st)
- {
- for (struct instance *instance = engine->instance_root; instance; instance = instance->next) {
- if (instance->scheduled && !instance->skip_host) {
- if (rrdset_is_exportable(instance, st)) {
- if (instance->start_chart_formatting && instance->start_chart_formatting(instance, st) != 0) {
- error("EXPORTING: cannot start chart formatting for %s", instance->config.name);
- disable_instance(instance);
- }
- } else {
- instance->skip_chart = 1;
- }
- }
- }
- }
- /**
- * Format metric for every connector instance's buffer
- *
- * @param engine an engine data structure.
- * @param rd a dimension(metric) in the Netdata database.
- */
- void metric_formatting(struct engine *engine, RRDDIM *rd)
- {
- for (struct instance *instance = engine->instance_root; instance; instance = instance->next) {
- if (instance->scheduled && !instance->skip_host && !instance->skip_chart) {
- if (instance->metric_formatting && instance->metric_formatting(instance, rd) != 0) {
- error("EXPORTING: cannot format metric for %s", instance->config.name);
- disable_instance(instance);
- continue;
- }
- instance->stats.buffered_metrics++;
- }
- }
- }
- /**
- * End chart formatting for every connector instance's buffer
- *
- * @param engine an engine data structure.
- * @param a chart.
- */
- void end_chart_formatting(struct engine *engine, RRDSET *st)
- {
- for (struct instance *instance = engine->instance_root; instance; instance = instance->next) {
- if (instance->scheduled && !instance->skip_host && !instance->skip_chart) {
- if (instance->end_chart_formatting && instance->end_chart_formatting(instance, st) != 0) {
- error("EXPORTING: cannot end chart formatting for %s", instance->config.name);
- disable_instance(instance);
- continue;
- }
- }
- instance->skip_chart = 0;
- }
- }
- /**
- * End host formatting for every connector instance's buffer
- *
- * @param engine an engine data structure.
- * @param host a data collecting host.
- */
- void end_host_formatting(struct engine *engine, RRDHOST *host)
- {
- for (struct instance *instance = engine->instance_root; instance; instance = instance->next) {
- if (instance->scheduled && !instance->skip_host) {
- if (instance->end_host_formatting && instance->end_host_formatting(instance, host) != 0) {
- error("EXPORTING: cannot end host formatting for %s", instance->config.name);
- disable_instance(instance);
- continue;
- }
- }
- instance->skip_host = 0;
- }
- }
- /**
- * End batch formatting for every connector instance's buffer
- *
- * @param engine an engine data structure.
- */
- void end_batch_formatting(struct engine *engine)
- {
- for (struct instance *instance = engine->instance_root; instance; instance = instance->next) {
- if (instance->scheduled) {
- if (instance->end_batch_formatting && instance->end_batch_formatting(instance) != 0) {
- error("EXPORTING: cannot end batch formatting for %s", instance->config.name);
- disable_instance(instance);
- continue;
- }
- uv_mutex_unlock(&instance->mutex);
- instance->data_is_ready = 1;
- uv_cond_signal(&instance->cond_var);
- instance->scheduled = 0;
- instance->after = instance->before;
- }
- }
- }
- /**
- * Prepare buffers
- *
- * Walk through the Netdata database and fill buffers for every scheduled exporting connector instance according to
- * configured rules.
- *
- * @param engine an engine data structure.
- */
- void prepare_buffers(struct engine *engine)
- {
- netdata_thread_disable_cancelability();
- start_batch_formatting(engine);
- rrd_rdlock();
- RRDHOST *host;
- rrdhost_foreach_read(host)
- {
- rrdhost_rdlock(host);
- start_host_formatting(engine, host);
- RRDSET *st;
- rrdset_foreach_read(st, host)
- {
- rrdset_rdlock(st);
- start_chart_formatting(engine, st);
- RRDDIM *rd;
- rrddim_foreach_read(rd, st)
- metric_formatting(engine, rd);
- end_chart_formatting(engine, st);
- rrdset_unlock(st);
- }
- end_host_formatting(engine, host);
- rrdhost_unlock(host);
- }
- rrd_unlock();
- netdata_thread_enable_cancelability();
- end_batch_formatting(engine);
- }
- /**
- * Flush a buffer with host labels
- *
- * @param instance an instance data structure.
- * @param host a data collecting host.
- * @return Always returns 0.
- */
- int flush_host_labels(struct instance *instance, RRDHOST *host)
- {
- (void)host;
- if (instance->labels)
- buffer_flush(instance->labels);
- return 0;
- }
- /**
- * End a batch for a simple connector
- *
- * @param instance an instance data structure.
- * @return Returns 0 on success, 1 on failure.
- */
- int simple_connector_end_batch(struct instance *instance)
- {
- struct simple_connector_data *simple_connector_data =
- (struct simple_connector_data *)instance->connector_specific_data;
- struct stats *stats = &instance->stats;
- BUFFER *instance_buffer = (BUFFER *)instance->buffer;
- struct simple_connector_buffer *last_buffer = simple_connector_data->last_buffer;
- if (!last_buffer->buffer) {
- last_buffer->buffer = buffer_create(0);
- }
- if (last_buffer->used) {
- // ring buffer is full, reuse the oldest element
- simple_connector_data->first_buffer = simple_connector_data->first_buffer->next;
- stats->data_lost_events++;
- stats->lost_metrics += last_buffer->buffered_metrics;
- stats->lost_bytes += last_buffer->buffered_bytes;
- }
- // swap buffers
- BUFFER *tmp_buffer = last_buffer->buffer;
- last_buffer->buffer = instance_buffer;
- instance->buffer = instance_buffer = tmp_buffer;
- buffer_flush(instance_buffer);
- if (last_buffer->header)
- buffer_flush(last_buffer->header);
- else
- last_buffer->header = buffer_create(0);
- if (instance->prepare_header)
- instance->prepare_header(instance);
- // The stats->buffered_metrics is used in the simple connector batch formatting as a variable for the number
- // of metrics, added in the current iteration, so we are clearing it here. We will use the
- // simple_connector_data->total_buffered_metrics in the worker to show the statistics.
- size_t buffered_metrics = (size_t)stats->buffered_metrics;
- stats->buffered_metrics = 0;
- size_t buffered_bytes = buffer_strlen(last_buffer->buffer);
- last_buffer->buffered_metrics = buffered_metrics;
- last_buffer->buffered_bytes = buffered_bytes;
- last_buffer->used++;
- simple_connector_data->total_buffered_metrics += buffered_metrics;
- stats->buffered_bytes += buffered_bytes;
- simple_connector_data->last_buffer = simple_connector_data->last_buffer->next;
- return 0;
- }
|