12345678910111213141516171819202122232425262728293031323334353637383940414243444546474849505152535455565758596061626364656667686970717273747576777879808182838485868788899091929394959697989910010110210310410510610710810911011111211311411511611711811912012112212312412512612712812913013113213313413513613713813914014114214314414514614714814915015115215315415515615715815916016116216316416516616716816917017117217317417517617717817918018118218318418518618718818919019119219319419519619719819920020120220320420520620720820921021121221321421521621721821922022122222322422522622722822923023123223323423523623723823924024124224324424524624724824925025125225325425525625725825926026126226326426526626726826927027127227327427527627727827928028128228328428528628728828929029129229329429529629729829930030130230330430530630730830931031131231331431531631731831932032132232332432532632732832933033133233333433533633733833934034134234334434534634734834935035135235335435535635735835936036136236336436536636736836937037137237337437537637737837938038138238338438538638738838939039139239339439539639739839940040140240340440540640740840941041141241341441541641741841942042142242342442542642742842943043143243343443543643743843944044144244344444544644744844945045145245345445545645745845946046146246346446546646746846947047147247347447547647747847948048148248348448548648748848949049149249349449549649749849950050150250350450550650750850951051151251351451551651751851952052152252352452552652752852953053153253353453553653753853954054154254354454554654754854955055155255355455555655755855956056156256356456556656756856957057157257357457557657757857958058158258358458558658758858959059159259359459559659759859960060160260360460560660760860961061161261361461561661761861962062162262362462562662762862963063163263363463563663763863964064164264364464564664764864965065165265365465565665765865966066166266366466566666766866967067167267367467567667767867968068168268368468568668768868969069169269369469569669769869970070170270370470570670770870971071171271371471571671771871972072172272372472572672772872973073173273373473573673773873974074174274374474574674774874975075175275375475575675775875976076176276376476576676776876977077177277377477577677777877978078178278378478578678778878979079179279379479579679779879980080180280380480580680780880981081181281381481581681781881982082182282382482582682782882983083183283383483583683783883984084184284384484584684784884985085185285385485585685785885986086186286386486586686786886987087187287387487587687787887988088188288388488588688788888989089189289389489589689789889990090190290390490590690790890991091191291391491591691791891992092192292392492592692792892993093193293393493593693793893994094194294394494594694794894995095195295395495595695795895996096196296396496596696796896997097197297397497597697797897998098198298398498598698798898999099199299399499599699799899910001001100210031004100510061007100810091010101110121013101410151016101710181019102010211022102310241025102610271028102910301031103210331034103510361037103810391040104110421043104410451046104710481049105010511052105310541055105610571058105910601061106210631064106510661067106810691070107110721073107410751076107710781079108010811082108310841085108610871088108910901091109210931094109510961097109810991100110111021103110411051106110711081109111011111112111311141115111611171118111911201121112211231124112511261127112811291130113111321133113411351136113711381139114011411142114311441145114611471148114911501151115211531154115511561157115811591160116111621163116411651166116711681169117011711172117311741175117611771178117911801181118211831184118511861187118811891190119111921193119411951196119711981199120012011202120312041205120612071208120912101211121212131214121512161217121812191220122112221223122412251226122712281229123012311232123312341235123612371238123912401241124212431244124512461247124812491250125112521253125412551256125712581259126012611262126312641265126612671268126912701271127212731274127512761277127812791280128112821283128412851286128712881289129012911292129312941295129612971298129913001301130213031304130513061307130813091310131113121313131413151316131713181319132013211322132313241325132613271328132913301331133213331334133513361337133813391340134113421343134413451346134713481349135013511352135313541355135613571358135913601361136213631364136513661367136813691370137113721373137413751376137713781379138013811382138313841385138613871388138913901391139213931394139513961397139813991400140114021403140414051406140714081409141014111412141314141415141614171418141914201421142214231424142514261427142814291430143114321433143414351436143714381439144014411442144314441445144614471448144914501451145214531454145514561457145814591460146114621463146414651466146714681469147014711472147314741475147614771478147914801481148214831484148514861487148814891490149114921493149414951496149714981499150015011502150315041505150615071508150915101511151215131514151515161517151815191520152115221523152415251526152715281529153015311532153315341535153615371538153915401541154215431544154515461547154815491550155115521553155415551556155715581559156015611562156315641565156615671568156915701571157215731574157515761577157815791580158115821583158415851586158715881589159015911592159315941595159615971598159916001601160216031604160516061607160816091610161116121613161416151616161716181619162016211622162316241625162616271628162916301631163216331634163516361637163816391640164116421643164416451646164716481649165016511652165316541655165616571658165916601661166216631664166516661667166816691670167116721673167416751676167716781679168016811682168316841685168616871688168916901691169216931694169516961697169816991700170117021703170417051706170717081709171017111712171317141715171617171718171917201721172217231724172517261727172817291730173117321733173417351736173717381739174017411742174317441745174617471748174917501751175217531754175517561757175817591760176117621763176417651766176717681769177017711772177317741775177617771778177917801781178217831784178517861787178817891790179117921793179417951796179717981799180018011802180318041805180618071808180918101811181218131814181518161817181818191820182118221823182418251826182718281829183018311832183318341835183618371838183918401841184218431844184518461847184818491850185118521853185418551856185718581859186018611862186318641865186618671868186918701871187218731874187518761877187818791880188118821883188418851886188718881889189018911892189318941895189618971898189919001901190219031904190519061907190819091910191119121913191419151916 |
- // SPDX-License-Identifier: GPL-3.0-or-later
- #include "replication.h"
- #include "Judy.h"
- #define STREAMING_START_MAX_SENDER_BUFFER_PERCENTAGE_ALLOWED 50ULL
- #define MAX_REPLICATION_MESSAGE_PERCENT_SENDER_BUFFER 25ULL
- #define MAX_SENDER_BUFFER_PERCENTAGE_ALLOWED 50ULL
- #define MIN_SENDER_BUFFER_PERCENTAGE_ALLOWED 10ULL
- #define WORKER_JOB_FIND_NEXT 1
- #define WORKER_JOB_QUERYING 2
- #define WORKER_JOB_DELETE_ENTRY 3
- #define WORKER_JOB_FIND_CHART 4
- #define WORKER_JOB_PREPARE_QUERY 5
- #define WORKER_JOB_CHECK_CONSISTENCY 6
- #define WORKER_JOB_BUFFER_COMMIT 7
- #define WORKER_JOB_CLEANUP 8
- #define WORKER_JOB_WAIT 9
- // master thread worker jobs
- #define WORKER_JOB_STATISTICS 10
- #define WORKER_JOB_CUSTOM_METRIC_PENDING_REQUESTS 11
- #define WORKER_JOB_CUSTOM_METRIC_SKIPPED_NO_ROOM 12
- #define WORKER_JOB_CUSTOM_METRIC_COMPLETION 13
- #define WORKER_JOB_CUSTOM_METRIC_ADDED 14
- #define WORKER_JOB_CUSTOM_METRIC_DONE 15
- #define WORKER_JOB_CUSTOM_METRIC_SENDER_RESETS 16
- #define WORKER_JOB_CUSTOM_METRIC_SENDER_FULL 17
- #define ITERATIONS_IDLE_WITHOUT_PENDING_TO_RUN_SENDER_VERIFICATION 30
- #define SECONDS_TO_RESET_POINT_IN_TIME 10
- static struct replication_query_statistics replication_queries = {
- .spinlock = NETDATA_SPINLOCK_INITIALIZER,
- .queries_started = 0,
- .queries_finished = 0,
- .points_read = 0,
- .points_generated = 0,
- };
- struct replication_query_statistics replication_get_query_statistics(void) {
- netdata_spinlock_lock(&replication_queries.spinlock);
- struct replication_query_statistics ret = replication_queries;
- netdata_spinlock_unlock(&replication_queries.spinlock);
- return ret;
- }
- size_t replication_buffers_allocated = 0;
- size_t replication_allocated_buffers(void) {
- return __atomic_load_n(&replication_buffers_allocated, __ATOMIC_RELAXED);
- }
- // ----------------------------------------------------------------------------
- // sending replication replies
- struct replication_dimension {
- STORAGE_POINT sp;
- struct storage_engine_query_handle handle;
- bool enabled;
- bool skip;
- DICTIONARY *dict;
- const DICTIONARY_ITEM *rda;
- RRDDIM *rd;
- };
- struct replication_query {
- RRDSET *st;
- struct {
- time_t first_entry_t;
- time_t last_entry_t;
- } db;
- struct { // what the parent requested
- time_t after;
- time_t before;
- bool enable_streaming;
- } request;
- struct { // what the child will do
- time_t after;
- time_t before;
- bool enable_streaming;
- bool locked_data_collection;
- bool execute;
- bool interrupted;
- } query;
- time_t wall_clock_time;
- size_t points_read;
- size_t points_generated;
- struct storage_engine_query_ops *ops;
- struct replication_request *rq;
- size_t dimensions;
- struct replication_dimension data[];
- };
- static struct replication_query *replication_query_prepare(
- RRDSET *st,
- time_t db_first_entry,
- time_t db_last_entry,
- time_t requested_after,
- time_t requested_before,
- bool requested_enable_streaming,
- time_t query_after,
- time_t query_before,
- bool query_enable_streaming,
- time_t wall_clock_time
- ) {
- size_t dimensions = rrdset_number_of_dimensions(st);
- struct replication_query *q = callocz(1, sizeof(struct replication_query) + dimensions * sizeof(struct replication_dimension));
- __atomic_add_fetch(&replication_buffers_allocated, sizeof(struct replication_query) + dimensions * sizeof(struct replication_dimension), __ATOMIC_RELAXED);
- q->dimensions = dimensions;
- q->st = st;
- q->db.first_entry_t = db_first_entry;
- q->db.last_entry_t = db_last_entry;
- q->request.after = requested_after,
- q->request.before = requested_before,
- q->request.enable_streaming = requested_enable_streaming,
- q->query.after = query_after;
- q->query.before = query_before;
- q->query.enable_streaming = query_enable_streaming;
- q->wall_clock_time = wall_clock_time;
- if (!q->dimensions || !q->query.after || !q->query.before) {
- q->query.execute = false;
- q->dimensions = 0;
- return q;
- }
- if(q->query.enable_streaming) {
- netdata_spinlock_lock(&st->data_collection_lock);
- q->query.locked_data_collection = true;
- if (st->last_updated.tv_sec > q->query.before) {
- #ifdef NETDATA_LOG_REPLICATION_REQUESTS
- internal_error(true,
- "STREAM_SENDER REPLAY: 'host:%s/chart:%s' "
- "has start_streaming = true, "
- "adjusting replication before timestamp from %llu to %llu",
- rrdhost_hostname(st->rrdhost), rrdset_id(st),
- (unsigned long long) q->query.before,
- (unsigned long long) st->last_updated.tv_sec
- );
- #endif
- q->query.before = MIN(st->last_updated.tv_sec, wall_clock_time);
- }
- }
- q->ops = &st->rrdhost->db[0].eng->api.query_ops;
- // prepare our array of dimensions
- size_t count = 0;
- RRDDIM *rd;
- rrddim_foreach_read(rd, st) {
- if (unlikely(!rd || !rd_dfe.item || !rd->exposed))
- continue;
- if (unlikely(rd_dfe.counter >= q->dimensions)) {
- internal_error(true,
- "STREAM_SENDER REPLAY ERROR: 'host:%s/chart:%s' has more dimensions than the replicated ones",
- rrdhost_hostname(st->rrdhost), rrdset_id(st));
- break;
- }
- struct replication_dimension *d = &q->data[rd_dfe.counter];
- d->dict = rd_dfe.dict;
- d->rda = dictionary_acquired_item_dup(rd_dfe.dict, rd_dfe.item);
- d->rd = rd;
- q->ops->init(rd->tiers[0].db_metric_handle, &d->handle, q->query.after, q->query.before,
- q->query.locked_data_collection ? STORAGE_PRIORITY_HIGH : STORAGE_PRIORITY_LOW);
- d->enabled = true;
- d->skip = false;
- count++;
- }
- rrddim_foreach_done(rd);
- if(!count) {
- // no data for this chart
- q->query.execute = false;
- if(q->query.locked_data_collection) {
- netdata_spinlock_unlock(&st->data_collection_lock);
- q->query.locked_data_collection = false;
- }
- }
- else {
- // we have data for this chart
- q->query.execute = true;
- }
- return q;
- }
- static void replication_send_chart_collection_state(BUFFER *wb, RRDSET *st) {
- RRDDIM *rd;
- rrddim_foreach_read(rd, st) {
- if(!rd->exposed) continue;
- buffer_sprintf(wb, PLUGINSD_KEYWORD_REPLAY_RRDDIM_STATE " \"%s\" %llu %lld " NETDATA_DOUBLE_FORMAT " " NETDATA_DOUBLE_FORMAT "\n",
- rrddim_id(rd),
- (usec_t)rd->last_collected_time.tv_sec * USEC_PER_SEC + (usec_t)rd->last_collected_time.tv_usec,
- rd->last_collected_value,
- rd->last_calculated_value,
- rd->last_stored_value
- );
- }
- rrddim_foreach_done(rd);
- buffer_sprintf(wb, PLUGINSD_KEYWORD_REPLAY_RRDSET_STATE " %llu %llu\n",
- (usec_t)st->last_collected_time.tv_sec * USEC_PER_SEC + (usec_t)st->last_collected_time.tv_usec,
- (usec_t)st->last_updated.tv_sec * USEC_PER_SEC + (usec_t)st->last_updated.tv_usec
- );
- }
- static void replication_query_finalize(BUFFER *wb, struct replication_query *q, bool executed) {
- size_t dimensions = q->dimensions;
- if(wb && q->query.enable_streaming)
- replication_send_chart_collection_state(wb, q->st);
- if(q->query.locked_data_collection) {
- netdata_spinlock_unlock(&q->st->data_collection_lock);
- q->query.locked_data_collection = false;
- }
- // release all the dictionary items acquired
- // finalize the queries
- size_t queries = 0;
- for (size_t i = 0; i < dimensions; i++) {
- struct replication_dimension *d = &q->data[i];
- if (unlikely(!d->enabled)) continue;
- q->ops->finalize(&d->handle);
- dictionary_acquired_item_release(d->dict, d->rda);
- // update global statistics
- queries++;
- }
- if(executed) {
- netdata_spinlock_lock(&replication_queries.spinlock);
- replication_queries.queries_started += queries;
- replication_queries.queries_finished += queries;
- replication_queries.points_read += q->points_read;
- replication_queries.points_generated += q->points_generated;
- netdata_spinlock_unlock(&replication_queries.spinlock);
- }
- __atomic_sub_fetch(&replication_buffers_allocated, sizeof(struct replication_query) + dimensions * sizeof(struct replication_dimension), __ATOMIC_RELAXED);
- freez(q);
- }
- static void replication_query_align_to_optimal_before(struct replication_query *q) {
- if(!q->query.execute || q->query.enable_streaming)
- return;
- size_t dimensions = q->dimensions;
- time_t expanded_before = 0;
- for (size_t i = 0; i < dimensions; i++) {
- struct replication_dimension *d = &q->data[i];
- if(unlikely(!d->enabled)) continue;
- time_t new_before = q->ops->align_to_optimal_before(&d->handle);
- if (!expanded_before || new_before < expanded_before)
- expanded_before = new_before;
- }
- if(expanded_before > q->query.before && // it is later than the original
- (expanded_before - q->query.before) / q->st->update_every < 1024 && // it is reasonable (up to a page)
- expanded_before < q->st->last_updated.tv_sec && // it is not the chart's last updated time
- expanded_before < q->wall_clock_time) // it is not later than the wall clock time
- q->query.before = expanded_before;
- }
- static bool replication_query_execute(BUFFER *wb, struct replication_query *q, size_t max_msg_size) {
- replication_query_align_to_optimal_before(q);
- time_t after = q->query.after;
- time_t before = q->query.before;
- size_t dimensions = q->dimensions;
- struct storage_engine_query_ops *ops = q->ops;
- time_t wall_clock_time = q->wall_clock_time;
- size_t points_read = q->points_read, points_generated = q->points_generated;
- #ifdef NETDATA_LOG_REPLICATION_REQUESTS
- time_t actual_after = 0, actual_before = 0;
- #endif
- time_t now = after + 1;
- time_t last_end_time_in_buffer = 0;
- while(now <= before) {
- time_t min_start_time = 0, max_start_time = 0, min_end_time = 0, max_end_time = 0, min_update_every = 0, max_update_every = 0;
- for (size_t i = 0; i < dimensions ;i++) {
- struct replication_dimension *d = &q->data[i];
- if(unlikely(!d->enabled || d->skip)) continue;
- // fetch the first valid point for the dimension
- int max_skip = 1000;
- while(d->sp.end_time_s < now && !ops->is_finished(&d->handle) && max_skip-- >= 0) {
- d->sp = ops->next_metric(&d->handle);
- points_read++;
- }
- if(max_skip <= 0) {
- d->skip = true;
- error_limit_static_global_var(erl, 1, 0);
- error_limit(&erl,
- "STREAM_SENDER REPLAY ERROR: 'host:%s/chart:%s/dim:%s': db does not advance the query beyond time %llu (tried 1000 times to get the next point and always got back a point in the past)",
- rrdhost_hostname(q->st->rrdhost), rrdset_id(q->st), rrddim_id(d->rd),
- (unsigned long long) now);
- continue;
- }
- if(unlikely(d->sp.end_time_s < now || d->sp.end_time_s < d->sp.start_time_s))
- // this dimension does not provide any data
- continue;
- time_t update_every = d->sp.end_time_s - d->sp.start_time_s;
- if(unlikely(!update_every))
- update_every = q->st->update_every;
- if(unlikely(!min_update_every))
- min_update_every = update_every;
- if(unlikely(!min_start_time))
- min_start_time = d->sp.start_time_s;
- if(unlikely(!min_end_time))
- min_end_time = d->sp.end_time_s;
- min_update_every = MIN(min_update_every, update_every);
- max_update_every = MAX(max_update_every, update_every);
- min_start_time = MIN(min_start_time, d->sp.start_time_s);
- max_start_time = MAX(max_start_time, d->sp.start_time_s);
- min_end_time = MIN(min_end_time, d->sp.end_time_s);
- max_end_time = MAX(max_end_time, d->sp.end_time_s);
- }
- if (unlikely(min_update_every != max_update_every ||
- min_start_time != max_start_time)) {
- time_t fix_min_start_time;
- if(last_end_time_in_buffer &&
- last_end_time_in_buffer >= min_start_time &&
- last_end_time_in_buffer <= max_start_time) {
- fix_min_start_time = last_end_time_in_buffer;
- }
- else
- fix_min_start_time = min_end_time - min_update_every;
- error_limit_static_global_var(erl, 1, 0);
- error_limit(&erl, "REPLAY WARNING: 'host:%s/chart:%s' "
- "misaligned dimensions "
- "update every (min: %ld, max: %ld), "
- "start time (min: %ld, max: %ld), "
- "end time (min %ld, max %ld), "
- "now %ld, last end time sent %ld, "
- "min start time is fixed to %ld",
- rrdhost_hostname(q->st->rrdhost), rrdset_id(q->st),
- min_update_every, max_update_every,
- min_start_time, max_start_time,
- min_end_time, max_end_time,
- now, last_end_time_in_buffer,
- fix_min_start_time
- );
- min_start_time = fix_min_start_time;
- }
- if(likely(min_start_time <= now && min_end_time >= now)) {
- // we have a valid point
- if (unlikely(min_end_time == min_start_time))
- min_start_time = min_end_time - q->st->update_every;
- #ifdef NETDATA_LOG_REPLICATION_REQUESTS
- if (unlikely(!actual_after))
- actual_after = min_end_time;
- actual_before = min_end_time;
- #endif
- if(buffer_strlen(wb) > max_msg_size && last_end_time_in_buffer) {
- q->query.before = last_end_time_in_buffer;
- q->query.enable_streaming = false;
- internal_error(true, "REPLICATION: buffer size %zu is more than the max message size %zu for chart '%s' of host '%s'. "
- "Interrupting replication request (%ld to %ld, %s) at %ld to %ld, %s.",
- buffer_strlen(wb), max_msg_size, rrdset_id(q->st), rrdhost_hostname(q->st->rrdhost),
- q->request.after, q->request.before, q->request.enable_streaming?"true":"false",
- q->query.after, q->query.before, q->query.enable_streaming?"true":"false");
- q->query.interrupted = true;
- break;
- }
- last_end_time_in_buffer = min_end_time;
- buffer_sprintf(wb, PLUGINSD_KEYWORD_REPLAY_BEGIN " '' %llu %llu %llu\n",
- (unsigned long long) min_start_time,
- (unsigned long long) min_end_time,
- (unsigned long long) wall_clock_time
- );
- // output the replay values for this time
- for (size_t i = 0; i < dimensions; i++) {
- struct replication_dimension *d = &q->data[i];
- if (unlikely(!d->enabled)) continue;
- if (likely( d->sp.start_time_s <= min_end_time &&
- d->sp.end_time_s >= min_end_time &&
- !storage_point_is_unset(d->sp) &&
- !storage_point_is_gap(d->sp))) {
- buffer_sprintf(wb, PLUGINSD_KEYWORD_REPLAY_SET " \"%s\" " NETDATA_DOUBLE_FORMAT " \"%s\"\n",
- rrddim_id(d->rd), d->sp.sum, d->sp.flags & SN_FLAG_RESET ? "R" : "");
- points_generated++;
- }
- }
- now = min_end_time + 1;
- }
- else if(unlikely(min_end_time < now))
- // the query does not progress
- break;
- else
- // we have gap - all points are in the future
- now = min_start_time;
- }
- #ifdef NETDATA_LOG_REPLICATION_REQUESTS
- if(actual_after) {
- char actual_after_buf[LOG_DATE_LENGTH + 1], actual_before_buf[LOG_DATE_LENGTH + 1];
- log_date(actual_after_buf, LOG_DATE_LENGTH, actual_after);
- log_date(actual_before_buf, LOG_DATE_LENGTH, actual_before);
- internal_error(true,
- "STREAM_SENDER REPLAY: 'host:%s/chart:%s': sending data %llu [%s] to %llu [%s] (requested %llu [delta %lld] to %llu [delta %lld])",
- rrdhost_hostname(st->rrdhost), rrdset_id(st),
- (unsigned long long)actual_after, actual_after_buf, (unsigned long long)actual_before, actual_before_buf,
- (unsigned long long)after, (long long)(actual_after - after), (unsigned long long)before, (long long)(actual_before - before));
- }
- else
- internal_error(true,
- "STREAM_SENDER REPLAY: 'host:%s/chart:%s': nothing to send (requested %llu to %llu)",
- rrdhost_hostname(st->rrdhost), rrdset_id(st),
- (unsigned long long)after, (unsigned long long)before);
- #endif // NETDATA_LOG_REPLICATION_REQUESTS
- q->points_read = points_read;
- q->points_generated = points_generated;
- bool finished_with_gap = false;
- if(last_end_time_in_buffer < before - q->st->update_every)
- finished_with_gap = true;
- return finished_with_gap;
- }
- static struct replication_query *replication_response_prepare(RRDSET *st, bool requested_enable_streaming, time_t requested_after, time_t requested_before) {
- time_t wall_clock_time = now_realtime_sec();
- if(requested_after > requested_before) {
- // flip them
- time_t t = requested_before;
- requested_before = requested_after;
- requested_after = t;
- }
- if(requested_after > wall_clock_time) {
- requested_after = 0;
- requested_before = 0;
- requested_enable_streaming = true;
- }
- if(requested_before > wall_clock_time) {
- requested_before = wall_clock_time;
- requested_enable_streaming = true;
- }
- time_t query_after = requested_after;
- time_t query_before = requested_before;
- bool query_enable_streaming = requested_enable_streaming;
- time_t db_first_entry = 0, db_last_entry = 0;
- rrdset_get_retention_of_tier_for_collected_chart(st, &db_first_entry, &db_last_entry, wall_clock_time, 0);
- if(requested_after == 0 && requested_before == 0 && requested_enable_streaming == true) {
- // no data requested - just enable streaming
- ;
- }
- else {
- if (query_after < db_first_entry)
- query_after = db_first_entry;
- if (query_before > db_last_entry)
- query_before = db_last_entry;
- // if the parent asked us to start streaming, then fill the rest with the data that we have
- if (requested_enable_streaming)
- query_before = db_last_entry;
- if (query_after > query_before) {
- time_t tmp = query_before;
- query_before = query_after;
- query_after = tmp;
- }
- query_enable_streaming = (requested_enable_streaming ||
- query_before == db_last_entry ||
- !requested_after ||
- !requested_before) ? true : false;
- }
- return replication_query_prepare(
- st,
- db_first_entry, db_last_entry,
- requested_after, requested_before, requested_enable_streaming,
- query_after, query_before, query_enable_streaming,
- wall_clock_time);
- }
- void replication_response_cancel_and_finalize(struct replication_query *q) {
- replication_query_finalize(NULL, q, false);
- }
- static bool sender_is_still_connected_for_this_request(struct replication_request *rq);
- bool replication_response_execute_and_finalize(struct replication_query *q, size_t max_msg_size) {
- struct replication_request *rq = q->rq;
- RRDSET *st = q->st;
- RRDHOST *host = st->rrdhost;
- // we might want to optimize this by filling a temporary buffer
- // and copying the result to the host's buffer in order to avoid
- // holding the host's buffer lock for too long
- BUFFER *wb = sender_start(host->sender);
- buffer_sprintf(wb, PLUGINSD_KEYWORD_REPLAY_BEGIN " \"%s\"\n", rrdset_id(st));
- bool locked_data_collection = q->query.locked_data_collection;
- q->query.locked_data_collection = false;
- bool finished_with_gap = false;
- if(q->query.execute)
- finished_with_gap = replication_query_execute(wb, q, max_msg_size);
- time_t after = q->request.after;
- time_t before = q->query.before;
- bool enable_streaming = q->query.enable_streaming;
- replication_query_finalize(wb, q, q->query.execute);
- q = NULL; // IMPORTANT: q is invalid now
- // get a fresh retention to send to the parent
- time_t wall_clock_time = now_realtime_sec();
- time_t db_first_entry, db_last_entry;
- rrdset_get_retention_of_tier_for_collected_chart(st, &db_first_entry, &db_last_entry, wall_clock_time, 0);
- // end with first/last entries we have, and the first start time and
- // last end time of the data we sent
- buffer_sprintf(wb, PLUGINSD_KEYWORD_REPLAY_END " %d %llu %llu %s %llu %llu %llu\n",
- // current chart update every
- (int)st->update_every
- // child first db time, child end db time
- , (unsigned long long)db_first_entry, (unsigned long long)db_last_entry
- // start streaming boolean
- , enable_streaming ? "true" : "false"
- // after requested, before requested ('before' can be altered by the child when the request had enable_streaming true)
- , (unsigned long long)after, (unsigned long long)before
- // child world clock time
- , (unsigned long long)wall_clock_time
- );
- worker_is_busy(WORKER_JOB_BUFFER_COMMIT);
- sender_commit(host->sender, wb);
- worker_is_busy(WORKER_JOB_CLEANUP);
- if(enable_streaming) {
- if(sender_is_still_connected_for_this_request(rq)) {
- // enable normal streaming if we have to
- // but only if the sender buffer has not been flushed since we started
- if(rrdset_flag_check(st, RRDSET_FLAG_SENDER_REPLICATION_IN_PROGRESS)) {
- rrdset_flag_clear(st, RRDSET_FLAG_SENDER_REPLICATION_IN_PROGRESS);
- rrdset_flag_set(st, RRDSET_FLAG_SENDER_REPLICATION_FINISHED);
- rrdhost_sender_replicating_charts_minus_one(st->rrdhost);
- if(!finished_with_gap)
- st->upstream_resync_time_s = 0;
- #ifdef NETDATA_LOG_REPLICATION_REQUESTS
- internal_error(true, "STREAM_SENDER REPLAY: 'host:%s/chart:%s' streaming starts",
- rrdhost_hostname(st->rrdhost), rrdset_id(st));
- #endif
- }
- else
- internal_error(true, "REPLAY ERROR: 'host:%s/chart:%s' received start streaming command, but the chart is not in progress replicating",
- rrdhost_hostname(st->rrdhost), rrdset_id(st));
- }
- }
- if(locked_data_collection)
- netdata_spinlock_unlock(&st->data_collection_lock);
- return enable_streaming;
- }
- // ----------------------------------------------------------------------------
- // sending replication requests
- struct replication_request_details {
- struct {
- send_command callback;
- void *data;
- } caller;
- RRDHOST *host;
- RRDSET *st;
- struct {
- time_t first_entry_t; // the first entry time the child has
- time_t last_entry_t; // the last entry time the child has
- time_t wall_clock_time; // the current time of the child
- bool fixed_last_entry; // when set we set the last entry to wall clock time
- } child_db;
- struct {
- time_t first_entry_t; // the first entry time we have
- time_t last_entry_t; // the last entry time we have
- time_t wall_clock_time; // the current local world clock time
- } local_db;
- struct {
- time_t from; // the starting time of the entire gap we have
- time_t to; // the ending time of the entire gap we have
- } gap;
- struct {
- time_t after; // the start time we requested previously from this child
- time_t before; // the end time we requested previously from this child
- } last_request;
- struct {
- time_t after; // the start time of this replication request - the child will add 1 second
- time_t before; // the end time of this replication request
- bool start_streaming; // true when we want the child to send anything remaining and start streaming - the child will overwrite 'before'
- } wanted;
- };
- static void replicate_log_request(struct replication_request_details *r, const char *msg) {
- #ifdef NETDATA_INTERNAL_CHECKS
- internal_error(true,
- #else
- error_limit_static_global_var(erl, 1, 0);
- error_limit(&erl,
- #endif
- "REPLAY ERROR: 'host:%s/chart:%s' child sent: "
- "db from %ld to %ld%s, wall clock time %ld, "
- "last request from %ld to %ld, "
- "issue: %s - "
- "sending replication request from %ld to %ld, start streaming %s",
- rrdhost_hostname(r->st->rrdhost), rrdset_id(r->st),
- r->child_db.first_entry_t,
- r->child_db.last_entry_t, r->child_db.fixed_last_entry ? " (fixed)" : "",
- r->child_db.wall_clock_time,
- r->last_request.after,
- r->last_request.before,
- msg,
- r->wanted.after,
- r->wanted.before,
- r->wanted.start_streaming ? "true" : "false");
- }
- static bool send_replay_chart_cmd(struct replication_request_details *r, const char *msg, bool log) {
- RRDSET *st = r->st;
- if(log)
- replicate_log_request(r, msg);
- if(st->rrdhost->receiver && (!st->rrdhost->receiver->replication_first_time_t || r->wanted.after < st->rrdhost->receiver->replication_first_time_t))
- st->rrdhost->receiver->replication_first_time_t = r->wanted.after;
- #ifdef NETDATA_LOG_REPLICATION_REQUESTS
- st->replay.log_next_data_collection = true;
- char wanted_after_buf[LOG_DATE_LENGTH + 1] = "", wanted_before_buf[LOG_DATE_LENGTH + 1] = "";
- if(r->wanted.after)
- log_date(wanted_after_buf, LOG_DATE_LENGTH, r->wanted.after);
- if(r->wanted.before)
- log_date(wanted_before_buf, LOG_DATE_LENGTH, r->wanted.before);
- internal_error(true,
- "REPLAY: 'host:%s/chart:%s' sending replication request %ld [%s] to %ld [%s], start streaming '%s': %s: "
- "last[%ld - %ld] child[%ld - %ld, now %ld %s] local[%ld - %ld, now %ld] gap[%ld - %ld %s] %s"
- , rrdhost_hostname(r->host), rrdset_id(r->st)
- , r->wanted.after, wanted_after_buf
- , r->wanted.before, wanted_before_buf
- , r->wanted.start_streaming ? "YES" : "NO"
- , msg
- , r->last_request.after, r->last_request.before
- , r->child_db.first_entry_t, r->child_db.last_entry_t
- , r->child_db.world_time_t, (r->child_db.world_time_t == r->local_db.now) ? "SAME" : (r->child_db.world_time_t < r->local_db.now) ? "BEHIND" : "AHEAD"
- , r->local_db.first_entry_t, r->local_db.last_entry_t
- , r->local_db.now
- , r->gap.from, r->gap.to
- , (r->gap.from == r->wanted.after) ? "FULL" : "PARTIAL"
- , (st->replay.after != 0 || st->replay.before != 0) ? "OVERLAPPING" : ""
- );
- st->replay.start_streaming = r->wanted.start_streaming;
- st->replay.after = r->wanted.after;
- st->replay.before = r->wanted.before;
- #endif // NETDATA_LOG_REPLICATION_REQUESTS
- char buffer[2048 + 1];
- snprintfz(buffer, 2048, PLUGINSD_KEYWORD_REPLAY_CHART " \"%s\" \"%s\" %llu %llu\n",
- rrdset_id(st), r->wanted.start_streaming ? "true" : "false",
- (unsigned long long)r->wanted.after, (unsigned long long)r->wanted.before);
- int ret = r->caller.callback(buffer, r->caller.data);
- if (ret < 0) {
- error("REPLAY ERROR: 'host:%s/chart:%s' failed to send replication request to child (error %d)",
- rrdhost_hostname(r->host), rrdset_id(r->st), ret);
- return false;
- }
- return true;
- }
- bool replicate_chart_request(send_command callback, void *callback_data, RRDHOST *host, RRDSET *st,
- time_t child_first_entry, time_t child_last_entry, time_t child_wall_clock_time,
- time_t prev_first_entry_wanted, time_t prev_last_entry_wanted)
- {
- struct replication_request_details r = {
- .caller = {
- .callback = callback,
- .data = callback_data,
- },
- .host = host,
- .st = st,
- .child_db = {
- .first_entry_t = child_first_entry,
- .last_entry_t = child_last_entry,
- .wall_clock_time = child_wall_clock_time,
- .fixed_last_entry = false,
- },
- .local_db = {
- .first_entry_t = 0,
- .last_entry_t = 0,
- .wall_clock_time = now_realtime_sec(),
- },
- .last_request = {
- .after = prev_first_entry_wanted,
- .before = prev_last_entry_wanted,
- },
- .wanted = {
- .after = 0,
- .before = 0,
- .start_streaming = true,
- },
- };
- if(r.child_db.last_entry_t > r.child_db.wall_clock_time) {
- replicate_log_request(&r, "child's db last entry > child's wall clock time");
- r.child_db.last_entry_t = r.child_db.wall_clock_time;
- r.child_db.fixed_last_entry = true;
- }
- rrdset_get_retention_of_tier_for_collected_chart(r.st, &r.local_db.first_entry_t, &r.local_db.last_entry_t, r.local_db.wall_clock_time, 0);
- // let's find the GAP we have
- if(!r.last_request.after || !r.last_request.before) {
- // there is no previous request
- if(r.local_db.last_entry_t)
- // we have some data, let's continue from the last point we have
- r.gap.from = r.local_db.last_entry_t;
- else
- // we don't have any data, the gap is the max timeframe we are allowed to replicate
- r.gap.from = r.local_db.wall_clock_time - r.host->rrdpush_seconds_to_replicate;
- }
- else {
- // we had sent a request - let's continue at the point we left it
- // for this we don't take into account the actual data in our db
- // because the child may also have gaps, and we need to get over it
- r.gap.from = r.last_request.before;
- }
- // we want all the data up to now
- r.gap.to = r.local_db.wall_clock_time;
- // The gap is now r.gap.from -> r.gap.to
- if (unlikely(!rrdhost_option_check(host, RRDHOST_OPTION_REPLICATION)))
- return send_replay_chart_cmd(&r, "empty replication request, replication is disabled", false);
- if (unlikely(!rrdset_number_of_dimensions(st)))
- return send_replay_chart_cmd(&r, "empty replication request, chart has no dimensions", false);
- if (unlikely(!r.child_db.first_entry_t || !r.child_db.last_entry_t))
- return send_replay_chart_cmd(&r, "empty replication request, child has no stored data", false);
- if (unlikely(r.child_db.first_entry_t < 0 || r.child_db.last_entry_t < 0))
- return send_replay_chart_cmd(&r, "empty replication request, child db timestamps are invalid", true);
- if (unlikely(r.child_db.first_entry_t > r.child_db.wall_clock_time))
- return send_replay_chart_cmd(&r, "empty replication request, child db first entry is after its wall clock time", true);
- if (unlikely(r.child_db.first_entry_t > r.child_db.last_entry_t))
- return send_replay_chart_cmd(&r, "empty replication request, child timings are invalid (first entry > last entry)", true);
- if (unlikely(r.local_db.last_entry_t > r.child_db.last_entry_t))
- return send_replay_chart_cmd(&r, "empty replication request, local last entry is later than the child one", false);
- // let's find what the child can provide to fill that gap
- if(r.child_db.first_entry_t > r.gap.from)
- // the child does not have all the data - let's get what it has
- r.wanted.after = r.child_db.first_entry_t;
- else
- // ok, the child can fill the entire gap we have
- r.wanted.after = r.gap.from;
- if(r.gap.to - r.wanted.after > host->rrdpush_replication_step)
- // the duration is too big for one request - let's take the first step
- r.wanted.before = r.wanted.after + host->rrdpush_replication_step;
- else
- // wow, we can do it in one request
- r.wanted.before = r.gap.to;
- // don't ask from the child more than it has
- if(r.wanted.before > r.child_db.last_entry_t)
- r.wanted.before = r.child_db.last_entry_t;
- if(r.wanted.after > r.wanted.before) {
- r.wanted.after = 0;
- r.wanted.before = 0;
- r.wanted.start_streaming = true;
- return send_replay_chart_cmd(&r, "empty replication request, wanted after computed bigger than wanted before", true);
- }
- // the child should start streaming immediately if the wanted duration is small, or we reached the last entry of the child
- r.wanted.start_streaming = (r.local_db.wall_clock_time - r.wanted.after <= host->rrdpush_replication_step ||
- r.wanted.before >= r.child_db.last_entry_t ||
- r.wanted.before >= r.child_db.wall_clock_time ||
- r.wanted.before >= r.local_db.wall_clock_time);
- // the wanted timeframe is now r.wanted.after -> r.wanted.before
- // send it
- return send_replay_chart_cmd(&r, "OK", false);
- }
- // ----------------------------------------------------------------------------
- // replication thread
- // replication request in sender DICTIONARY
- // used for de-duplicating the requests
- struct replication_request {
- struct sender_state *sender; // the sender we should put the reply at
- STRING *chart_id; // the chart of the request
- time_t after; // the start time of the query (maybe zero) key for sorting (JudyL)
- time_t before; // the end time of the query (maybe zero)
- usec_t sender_last_flush_ut; // the timestamp of the sender, at the time we indexed this request
- Word_t unique_id; // auto-increment, later requests have bigger
- bool start_streaming; // true, when the parent wants to send the rest of the data (before is overwritten) and enable normal streaming
- bool indexed_in_judy; // true when the request is indexed in judy
- bool not_indexed_buffer_full; // true when the request is not indexed because the sender is full
- bool not_indexed_preprocessing; // true when the request is not indexed, but it is pending in preprocessing
- // prepare ahead members - preprocessing
- bool found; // used as a result boolean for the find call
- bool executed; // used to detect if we have skipped requests while preprocessing
- RRDSET *st; // caching of the chart during preprocessing
- struct replication_query *q; // the preprocessing query initialization
- };
- // replication sort entry in JudyL array
- // used for sorting all requests, across all nodes
- struct replication_sort_entry {
- struct replication_request *rq;
- size_t unique_id; // used as a key to identify the sort entry - we never access its contents
- };
- #define MAX_REPLICATION_THREADS 20 // + 1 for the main thread
- // the global variables for the replication thread
- static struct replication_thread {
- SPINLOCK spinlock;
- struct {
- size_t pending; // number of requests pending in the queue
- Word_t unique_id; // the last unique id we gave to a request (auto-increment, starting from 1)
- // statistics
- size_t added; // number of requests added to the queue
- size_t removed; // number of requests removed from the queue
- size_t pending_no_room; // number of requests skipped, because the sender has no room for responses
- size_t senders_full; // number of times a sender reset our last position in the queue
- size_t sender_resets; // number of times a sender reset our last position in the queue
- time_t first_time_t; // the minimum 'after' we encountered
- struct {
- Word_t after;
- Word_t unique_id;
- Pvoid_t JudyL_array;
- } queue;
- } unsafe; // protected from replication_recursive_lock()
- struct {
- size_t executed; // the number of replication requests executed
- size_t latest_first_time; // the 'after' timestamp of the last request we executed
- size_t memory; // the total memory allocated by replication
- } atomic; // access should be with atomic operations
- struct {
- size_t last_executed; // caching of the atomic.executed to report number of requests executed since last time
- netdata_thread_t **threads_ptrs;
- size_t threads;
- } main_thread; // access is allowed only by the main thread
- } replication_globals = {
- .spinlock = NETDATA_SPINLOCK_INITIALIZER,
- .unsafe = {
- .pending = 0,
- .unique_id = 0,
- .added = 0,
- .removed = 0,
- .pending_no_room = 0,
- .sender_resets = 0,
- .senders_full = 0,
- .first_time_t = 0,
- .queue = {
- .after = 0,
- .unique_id = 0,
- .JudyL_array = NULL,
- },
- },
- .atomic = {
- .executed = 0,
- .latest_first_time = 0,
- .memory = 0,
- },
- .main_thread = {
- .last_executed = 0,
- .threads = 0,
- .threads_ptrs = NULL,
- },
- };
- size_t replication_allocated_memory(void) {
- return __atomic_load_n(&replication_globals.atomic.memory, __ATOMIC_RELAXED);
- }
- #define replication_set_latest_first_time(t) __atomic_store_n(&replication_globals.atomic.latest_first_time, t, __ATOMIC_RELAXED)
- #define replication_get_latest_first_time() __atomic_load_n(&replication_globals.atomic.latest_first_time, __ATOMIC_RELAXED)
- static inline bool replication_recursive_lock_mode(char mode) {
- static __thread int recursions = 0;
- if(mode == 'L') { // (L)ock
- if(++recursions == 1)
- netdata_spinlock_lock(&replication_globals.spinlock);
- }
- else if(mode == 'U') { // (U)nlock
- if(--recursions == 0)
- netdata_spinlock_unlock(&replication_globals.spinlock);
- }
- else if(mode == 'C') { // (C)heck
- if(recursions > 0)
- return true;
- else
- return false;
- }
- else
- fatal("REPLICATION: unknown lock mode '%c'", mode);
- #ifdef NETDATA_INTERNAL_CHECKS
- if(recursions < 0)
- fatal("REPLICATION: recursions is %d", recursions);
- #endif
- return true;
- }
- #define replication_recursive_lock() replication_recursive_lock_mode('L')
- #define replication_recursive_unlock() replication_recursive_lock_mode('U')
- #define fatal_when_replication_is_not_locked_for_me() do { \
- if(!replication_recursive_lock_mode('C')) \
- fatal("REPLICATION: reached %s, but replication is not locked by this thread.", __FUNCTION__); \
- } while(0)
- void replication_set_next_point_in_time(time_t after, size_t unique_id) {
- replication_recursive_lock();
- replication_globals.unsafe.queue.after = after;
- replication_globals.unsafe.queue.unique_id = unique_id;
- replication_recursive_unlock();
- }
- // ----------------------------------------------------------------------------
- // replication sort entry management
- static struct replication_sort_entry *replication_sort_entry_create_unsafe(struct replication_request *rq) {
- fatal_when_replication_is_not_locked_for_me();
- struct replication_sort_entry *rse = mallocz(sizeof(struct replication_sort_entry));
- __atomic_add_fetch(&replication_globals.atomic.memory, sizeof(struct replication_sort_entry), __ATOMIC_RELAXED);
- rrdpush_sender_pending_replication_requests_plus_one(rq->sender);
- // copy the request
- rse->rq = rq;
- rse->unique_id = ++replication_globals.unsafe.unique_id;
- // save the unique id into the request, to be able to delete it later
- rq->unique_id = rse->unique_id;
- rq->indexed_in_judy = false;
- rq->not_indexed_buffer_full = false;
- rq->not_indexed_preprocessing = false;
- return rse;
- }
- static void replication_sort_entry_destroy(struct replication_sort_entry *rse) {
- freez(rse);
- __atomic_sub_fetch(&replication_globals.atomic.memory, sizeof(struct replication_sort_entry), __ATOMIC_RELAXED);
- }
- static void replication_sort_entry_add(struct replication_request *rq) {
- replication_recursive_lock();
- if(rrdpush_sender_replication_buffer_full_get(rq->sender)) {
- rq->indexed_in_judy = false;
- rq->not_indexed_buffer_full = true;
- rq->not_indexed_preprocessing = false;
- replication_globals.unsafe.pending_no_room++;
- replication_recursive_unlock();
- return;
- }
- if(rq->not_indexed_buffer_full)
- replication_globals.unsafe.pending_no_room--;
- struct replication_sort_entry *rse = replication_sort_entry_create_unsafe(rq);
- // if(rq->after < (time_t)replication_globals.protected.queue.after &&
- // rq->sender->buffer_used_percentage <= MAX_SENDER_BUFFER_PERCENTAGE_ALLOWED &&
- // !replication_globals.protected.skipped_no_room_since_last_reset) {
- //
- // // make it find this request first
- // replication_set_next_point_in_time(rq->after, rq->unique_id);
- // }
- replication_globals.unsafe.added++;
- replication_globals.unsafe.pending++;
- Pvoid_t *inner_judy_ptr;
- // find the outer judy entry, using after as key
- size_t mem_before_outer_judyl = JudyLMemUsed(replication_globals.unsafe.queue.JudyL_array);
- inner_judy_ptr = JudyLIns(&replication_globals.unsafe.queue.JudyL_array, (Word_t) rq->after, PJE0);
- size_t mem_after_outer_judyl = JudyLMemUsed(replication_globals.unsafe.queue.JudyL_array);
- if(unlikely(!inner_judy_ptr || inner_judy_ptr == PJERR))
- fatal("REPLICATION: corrupted outer judyL");
- // add it to the inner judy, using unique_id as key
- size_t mem_before_inner_judyl = JudyLMemUsed(*inner_judy_ptr);
- Pvoid_t *item = JudyLIns(inner_judy_ptr, rq->unique_id, PJE0);
- size_t mem_after_inner_judyl = JudyLMemUsed(*inner_judy_ptr);
- if(unlikely(!item || item == PJERR))
- fatal("REPLICATION: corrupted inner judyL");
- *item = rse;
- rq->indexed_in_judy = true;
- rq->not_indexed_buffer_full = false;
- rq->not_indexed_preprocessing = false;
- if(!replication_globals.unsafe.first_time_t || rq->after < replication_globals.unsafe.first_time_t)
- replication_globals.unsafe.first_time_t = rq->after;
- replication_recursive_unlock();
- __atomic_add_fetch(&replication_globals.atomic.memory, (mem_after_inner_judyl - mem_before_inner_judyl) + (mem_after_outer_judyl - mem_before_outer_judyl), __ATOMIC_RELAXED);
- }
- static bool replication_sort_entry_unlink_and_free_unsafe(struct replication_sort_entry *rse, Pvoid_t **inner_judy_ppptr, bool preprocessing) {
- fatal_when_replication_is_not_locked_for_me();
- bool inner_judy_deleted = false;
- replication_globals.unsafe.removed++;
- replication_globals.unsafe.pending--;
- rrdpush_sender_pending_replication_requests_minus_one(rse->rq->sender);
- rse->rq->indexed_in_judy = false;
- rse->rq->not_indexed_preprocessing = preprocessing;
- size_t memory_saved = 0;
- // delete it from the inner judy
- size_t mem_before_inner_judyl = JudyLMemUsed(**inner_judy_ppptr);
- JudyLDel(*inner_judy_ppptr, rse->rq->unique_id, PJE0);
- size_t mem_after_inner_judyl = JudyLMemUsed(**inner_judy_ppptr);
- memory_saved = mem_before_inner_judyl - mem_after_inner_judyl;
- // if no items left, delete it from the outer judy
- if(**inner_judy_ppptr == NULL) {
- size_t mem_before_outer_judyl = JudyLMemUsed(replication_globals.unsafe.queue.JudyL_array);
- JudyLDel(&replication_globals.unsafe.queue.JudyL_array, rse->rq->after, PJE0);
- size_t mem_after_outer_judyl = JudyLMemUsed(replication_globals.unsafe.queue.JudyL_array);
- memory_saved += mem_before_outer_judyl - mem_after_outer_judyl;
- inner_judy_deleted = true;
- }
- // free memory
- replication_sort_entry_destroy(rse);
- __atomic_sub_fetch(&replication_globals.atomic.memory, memory_saved, __ATOMIC_RELAXED);
- return inner_judy_deleted;
- }
- static void replication_sort_entry_del(struct replication_request *rq, bool buffer_full) {
- Pvoid_t *inner_judy_pptr;
- struct replication_sort_entry *rse_to_delete = NULL;
- replication_recursive_lock();
- if(rq->indexed_in_judy) {
- inner_judy_pptr = JudyLGet(replication_globals.unsafe.queue.JudyL_array, rq->after, PJE0);
- if (inner_judy_pptr) {
- Pvoid_t *our_item_pptr = JudyLGet(*inner_judy_pptr, rq->unique_id, PJE0);
- if (our_item_pptr) {
- rse_to_delete = *our_item_pptr;
- replication_sort_entry_unlink_and_free_unsafe(rse_to_delete, &inner_judy_pptr, false);
- if(buffer_full) {
- replication_globals.unsafe.pending_no_room++;
- rq->not_indexed_buffer_full = true;
- }
- }
- }
- if (!rse_to_delete)
- fatal("REPLAY: 'host:%s/chart:%s' Cannot find sort entry to delete for time %ld.",
- rrdhost_hostname(rq->sender->host), string2str(rq->chart_id), rq->after);
- }
- replication_recursive_unlock();
- }
- static struct replication_request replication_request_get_first_available() {
- Pvoid_t *inner_judy_pptr;
- replication_recursive_lock();
- struct replication_request rq_to_return = (struct replication_request){ .found = false };
- if(unlikely(!replication_globals.unsafe.queue.after || !replication_globals.unsafe.queue.unique_id)) {
- replication_globals.unsafe.queue.after = 0;
- replication_globals.unsafe.queue.unique_id = 0;
- }
- Word_t started_after = replication_globals.unsafe.queue.after;
- size_t round = 0;
- while(!rq_to_return.found) {
- round++;
- if(round > 2)
- break;
- if(round == 2) {
- if(started_after == 0)
- break;
- replication_globals.unsafe.queue.after = 0;
- replication_globals.unsafe.queue.unique_id = 0;
- }
- bool find_same_after = true;
- while (!rq_to_return.found && (inner_judy_pptr = JudyLFirstThenNext(replication_globals.unsafe.queue.JudyL_array, &replication_globals.unsafe.queue.after, &find_same_after))) {
- Pvoid_t *our_item_pptr;
- if(unlikely(round == 2 && replication_globals.unsafe.queue.after > started_after))
- break;
- while (!rq_to_return.found && (our_item_pptr = JudyLNext(*inner_judy_pptr, &replication_globals.unsafe.queue.unique_id, PJE0))) {
- struct replication_sort_entry *rse = *our_item_pptr;
- struct replication_request *rq = rse->rq;
- // copy the request to return it
- rq_to_return = *rq;
- rq_to_return.chart_id = string_dup(rq_to_return.chart_id);
- // set the return result to found
- rq_to_return.found = true;
- if (replication_sort_entry_unlink_and_free_unsafe(rse, &inner_judy_pptr, true))
- // we removed the item from the outer JudyL
- break;
- }
- // prepare for the next iteration on the outer loop
- replication_globals.unsafe.queue.unique_id = 0;
- }
- }
- replication_recursive_unlock();
- return rq_to_return;
- }
- // ----------------------------------------------------------------------------
- // replication request management
- static void replication_request_react_callback(const DICTIONARY_ITEM *item __maybe_unused, void *value __maybe_unused, void *sender_state __maybe_unused) {
- struct sender_state *s = sender_state; (void)s;
- struct replication_request *rq = value;
- // IMPORTANT:
- // We use the react instead of the insert callback
- // because we want the item to be atomically visible
- // to our replication thread, immediately after.
- // If we put this at the insert callback, the item is not guaranteed
- // to be atomically visible to others, so the replication thread
- // may see the replication sort entry, but fail to find the dictionary item
- // related to it.
- replication_sort_entry_add(rq);
- // this request is about a unique chart for this sender
- rrdpush_sender_replicating_charts_plus_one(s);
- }
- static bool replication_request_conflict_callback(const DICTIONARY_ITEM *item __maybe_unused, void *old_value, void *new_value, void *sender_state) {
- struct sender_state *s = sender_state; (void)s;
- struct replication_request *rq = old_value; (void)rq;
- struct replication_request *rq_new = new_value;
- replication_recursive_lock();
- if(!rq->indexed_in_judy && rq->not_indexed_buffer_full && !rq->not_indexed_preprocessing) {
- // we can replace this command
- internal_error(
- true,
- "STREAM %s [send to %s]: REPLAY: 'host:%s/chart:%s' replacing duplicate replication command received (existing from %llu to %llu [%s], new from %llu to %llu [%s])",
- rrdhost_hostname(s->host), s->connected_to, rrdhost_hostname(s->host), dictionary_acquired_item_name(item),
- (unsigned long long)rq->after, (unsigned long long)rq->before, rq->start_streaming ? "true" : "false",
- (unsigned long long)rq_new->after, (unsigned long long)rq_new->before, rq_new->start_streaming ? "true" : "false");
- rq->after = rq_new->after;
- rq->before = rq_new->before;
- rq->start_streaming = rq_new->start_streaming;
- }
- else if(!rq->indexed_in_judy && !rq->not_indexed_preprocessing) {
- replication_sort_entry_add(rq);
- internal_error(
- true,
- "STREAM %s [send to %s]: REPLAY: 'host:%s/chart:%s' adding duplicate replication command received (existing from %llu to %llu [%s], new from %llu to %llu [%s])",
- rrdhost_hostname(s->host), s->connected_to, rrdhost_hostname(s->host), dictionary_acquired_item_name(item),
- (unsigned long long)rq->after, (unsigned long long)rq->before, rq->start_streaming ? "true" : "false",
- (unsigned long long)rq_new->after, (unsigned long long)rq_new->before, rq_new->start_streaming ? "true" : "false");
- }
- else {
- internal_error(
- true,
- "STREAM %s [send to %s]: REPLAY: 'host:%s/chart:%s' ignoring duplicate replication command received (existing from %llu to %llu [%s], new from %llu to %llu [%s])",
- rrdhost_hostname(s->host), s->connected_to, rrdhost_hostname(s->host),
- dictionary_acquired_item_name(item),
- (unsigned long long) rq->after, (unsigned long long) rq->before, rq->start_streaming ? "true" : "false",
- (unsigned long long) rq_new->after, (unsigned long long) rq_new->before, rq_new->start_streaming ? "true" : "false");
- }
- replication_recursive_unlock();
- string_freez(rq_new->chart_id);
- return false;
- }
- static void replication_request_delete_callback(const DICTIONARY_ITEM *item __maybe_unused, void *value, void *sender_state __maybe_unused) {
- struct replication_request *rq = value;
- // this request is about a unique chart for this sender
- rrdpush_sender_replicating_charts_minus_one(rq->sender);
- if(rq->indexed_in_judy)
- replication_sort_entry_del(rq, false);
- else if(rq->not_indexed_buffer_full) {
- replication_recursive_lock();
- replication_globals.unsafe.pending_no_room--;
- replication_recursive_unlock();
- }
- string_freez(rq->chart_id);
- }
- static bool sender_is_still_connected_for_this_request(struct replication_request *rq) {
- return rq->sender_last_flush_ut == rrdpush_sender_get_flush_time(rq->sender);
- };
- static bool replication_execute_request(struct replication_request *rq, bool workers) {
- bool ret = false;
- if(!rq->st) {
- if(likely(workers))
- worker_is_busy(WORKER_JOB_FIND_CHART);
- rq->st = rrdset_find(rq->sender->host, string2str(rq->chart_id));
- }
- if(!rq->st) {
- internal_error(true, "REPLAY ERROR: 'host:%s/chart:%s' not found",
- rrdhost_hostname(rq->sender->host), string2str(rq->chart_id));
- goto cleanup;
- }
- netdata_thread_disable_cancelability();
- if(!rq->q) {
- if(likely(workers))
- worker_is_busy(WORKER_JOB_PREPARE_QUERY);
- rq->q = replication_response_prepare(rq->st, rq->start_streaming, rq->after, rq->before);
- }
- if(likely(workers))
- worker_is_busy(WORKER_JOB_QUERYING);
- // send the replication data
- rq->q->rq = rq;
- replication_response_execute_and_finalize(
- rq->q, (size_t)((unsigned long long)rq->sender->host->sender->buffer->max_size * MAX_REPLICATION_MESSAGE_PERCENT_SENDER_BUFFER / 100ULL));
- rq->q = NULL;
- netdata_thread_enable_cancelability();
- __atomic_add_fetch(&replication_globals.atomic.executed, 1, __ATOMIC_RELAXED);
- ret = true;
- cleanup:
- if(rq->q) {
- replication_response_cancel_and_finalize(rq->q);
- rq->q = NULL;
- }
- string_freez(rq->chart_id);
- worker_is_idle();
- return ret;
- }
- // ----------------------------------------------------------------------------
- // public API
- void replication_add_request(struct sender_state *sender, const char *chart_id, time_t after, time_t before, bool start_streaming) {
- struct replication_request rq = {
- .sender = sender,
- .chart_id = string_strdupz(chart_id),
- .after = after,
- .before = before,
- .start_streaming = start_streaming,
- .sender_last_flush_ut = rrdpush_sender_get_flush_time(sender),
- .indexed_in_judy = false,
- .not_indexed_buffer_full = false,
- .not_indexed_preprocessing = false,
- };
- if(start_streaming && rrdpush_sender_get_buffer_used_percent(sender) <= STREAMING_START_MAX_SENDER_BUFFER_PERCENTAGE_ALLOWED)
- replication_execute_request(&rq, false);
- else
- dictionary_set(sender->replication.requests, chart_id, &rq, sizeof(struct replication_request));
- }
- void replication_sender_delete_pending_requests(struct sender_state *sender) {
- // allow the dictionary destructor to go faster on locks
- dictionary_flush(sender->replication.requests);
- }
- void replication_init_sender(struct sender_state *sender) {
- sender->replication.requests = dictionary_create_advanced(DICT_OPTION_DONT_OVERWRITE_VALUE | DICT_OPTION_FIXED_SIZE,
- NULL, sizeof(struct replication_request));
- dictionary_register_react_callback(sender->replication.requests, replication_request_react_callback, sender);
- dictionary_register_conflict_callback(sender->replication.requests, replication_request_conflict_callback, sender);
- dictionary_register_delete_callback(sender->replication.requests, replication_request_delete_callback, sender);
- }
- void replication_cleanup_sender(struct sender_state *sender) {
- // allow the dictionary destructor to go faster on locks
- replication_recursive_lock();
- dictionary_destroy(sender->replication.requests);
- replication_recursive_unlock();
- }
- void replication_recalculate_buffer_used_ratio_unsafe(struct sender_state *s) {
- size_t available = cbuffer_available_size_unsafe(s->host->sender->buffer);
- size_t percentage = (s->buffer->max_size - available) * 100 / s->buffer->max_size;
- if(unlikely(percentage > MAX_SENDER_BUFFER_PERCENTAGE_ALLOWED && !rrdpush_sender_replication_buffer_full_get(s))) {
- rrdpush_sender_replication_buffer_full_set(s, true);
- struct replication_request *rq;
- dfe_start_read(s->replication.requests, rq) {
- if(rq->indexed_in_judy)
- replication_sort_entry_del(rq, true);
- }
- dfe_done(rq);
- replication_recursive_lock();
- replication_globals.unsafe.senders_full++;
- replication_recursive_unlock();
- }
- else if(unlikely(percentage < MIN_SENDER_BUFFER_PERCENTAGE_ALLOWED && rrdpush_sender_replication_buffer_full_get(s))) {
- rrdpush_sender_replication_buffer_full_set(s, false);
- struct replication_request *rq;
- dfe_start_read(s->replication.requests, rq) {
- if(!rq->indexed_in_judy && (rq->not_indexed_buffer_full || rq->not_indexed_preprocessing))
- replication_sort_entry_add(rq);
- }
- dfe_done(rq);
- replication_recursive_lock();
- replication_globals.unsafe.senders_full--;
- replication_globals.unsafe.sender_resets++;
- // replication_set_next_point_in_time(0, 0);
- replication_recursive_unlock();
- }
- rrdpush_sender_set_buffer_used_percent(s, percentage);
- }
- // ----------------------------------------------------------------------------
- // replication thread
- static size_t verify_host_charts_are_streaming_now(RRDHOST *host) {
- internal_error(
- host->sender &&
- !rrdpush_sender_pending_replication_requests(host->sender) &&
- dictionary_entries(host->sender->replication.requests) != 0,
- "REPLICATION SUMMARY: 'host:%s' reports %zu pending replication requests, but its chart replication index says there are %zu charts pending replication",
- rrdhost_hostname(host),
- rrdpush_sender_pending_replication_requests(host->sender),
- dictionary_entries(host->sender->replication.requests)
- );
- size_t ok = 0;
- size_t errors = 0;
- RRDSET *st;
- rrdset_foreach_read(st, host) {
- RRDSET_FLAGS flags = rrdset_flag_check(st, RRDSET_FLAG_SENDER_REPLICATION_IN_PROGRESS | RRDSET_FLAG_SENDER_REPLICATION_FINISHED);
- bool is_error = false;
- if(!flags) {
- internal_error(
- true,
- "REPLICATION SUMMARY: 'host:%s/chart:%s' is neither IN PROGRESS nor FINISHED",
- rrdhost_hostname(host), rrdset_id(st)
- );
- is_error = true;
- }
- if(!(flags & RRDSET_FLAG_SENDER_REPLICATION_FINISHED) || (flags & RRDSET_FLAG_SENDER_REPLICATION_IN_PROGRESS)) {
- internal_error(
- true,
- "REPLICATION SUMMARY: 'host:%s/chart:%s' is IN PROGRESS although replication is finished",
- rrdhost_hostname(host), rrdset_id(st)
- );
- is_error = true;
- }
- if(is_error)
- errors++;
- else
- ok++;
- }
- rrdset_foreach_done(st);
- internal_error(errors,
- "REPLICATION SUMMARY: 'host:%s' finished replicating %zu charts, but %zu charts are still in progress although replication finished",
- rrdhost_hostname(host), ok, errors);
- return errors;
- }
- static void verify_all_hosts_charts_are_streaming_now(void) {
- worker_is_busy(WORKER_JOB_CHECK_CONSISTENCY);
- size_t errors = 0;
- RRDHOST *host;
- dfe_start_read(rrdhost_root_index, host)
- errors += verify_host_charts_are_streaming_now(host);
- dfe_done(host);
- size_t executed = __atomic_load_n(&replication_globals.atomic.executed, __ATOMIC_RELAXED);
- info("REPLICATION SUMMARY: finished, executed %zu replication requests, %zu charts pending replication",
- executed - replication_globals.main_thread.last_executed, errors);
- replication_globals.main_thread.last_executed = executed;
- }
- static void replication_initialize_workers(bool master) {
- worker_register("REPLICATION");
- worker_register_job_name(WORKER_JOB_FIND_NEXT, "find next");
- worker_register_job_name(WORKER_JOB_QUERYING, "querying");
- worker_register_job_name(WORKER_JOB_DELETE_ENTRY, "dict delete");
- worker_register_job_name(WORKER_JOB_FIND_CHART, "find chart");
- worker_register_job_name(WORKER_JOB_PREPARE_QUERY, "prepare query");
- worker_register_job_name(WORKER_JOB_CHECK_CONSISTENCY, "check consistency");
- worker_register_job_name(WORKER_JOB_BUFFER_COMMIT, "commit");
- worker_register_job_name(WORKER_JOB_CLEANUP, "cleanup");
- worker_register_job_name(WORKER_JOB_WAIT, "wait");
- if(master) {
- worker_register_job_name(WORKER_JOB_STATISTICS, "statistics");
- worker_register_job_custom_metric(WORKER_JOB_CUSTOM_METRIC_PENDING_REQUESTS, "pending requests", "requests", WORKER_METRIC_ABSOLUTE);
- worker_register_job_custom_metric(WORKER_JOB_CUSTOM_METRIC_SKIPPED_NO_ROOM, "no room requests", "requests", WORKER_METRIC_ABSOLUTE);
- worker_register_job_custom_metric(WORKER_JOB_CUSTOM_METRIC_COMPLETION, "completion", "%", WORKER_METRIC_ABSOLUTE);
- worker_register_job_custom_metric(WORKER_JOB_CUSTOM_METRIC_ADDED, "added requests", "requests/s", WORKER_METRIC_INCREMENTAL_TOTAL);
- worker_register_job_custom_metric(WORKER_JOB_CUSTOM_METRIC_DONE, "finished requests", "requests/s", WORKER_METRIC_INCREMENTAL_TOTAL);
- worker_register_job_custom_metric(WORKER_JOB_CUSTOM_METRIC_SENDER_RESETS, "sender resets", "resets/s", WORKER_METRIC_INCREMENTAL_TOTAL);
- worker_register_job_custom_metric(WORKER_JOB_CUSTOM_METRIC_SENDER_FULL, "senders full", "senders", WORKER_METRIC_ABSOLUTE);
- }
- }
- #define REQUEST_OK (0)
- #define REQUEST_QUEUE_EMPTY (-1)
- #define REQUEST_CHART_NOT_FOUND (-2)
- static int replication_execute_next_pending_request(bool cancel) {
- static __thread int max_requests_ahead = 0;
- static __thread struct replication_request *rqs = NULL;
- static __thread int rqs_last_executed = 0, rqs_last_prepared = 0;
- static __thread size_t queue_rounds = 0; (void)queue_rounds;
- struct replication_request *rq;
- if(unlikely(cancel)) {
- if(rqs) {
- size_t cancelled = 0;
- do {
- if (++rqs_last_executed >= max_requests_ahead)
- rqs_last_executed = 0;
- rq = &rqs[rqs_last_executed];
- if (rq->q) {
- internal_fatal(rq->executed, "REPLAY FATAL: query has already been executed!");
- internal_fatal(!rq->found, "REPLAY FATAL: orphan q in rq");
- replication_response_cancel_and_finalize(rq->q);
- rq->q = NULL;
- cancelled++;
- }
- rq->executed = true;
- rq->found = false;
- } while (rqs_last_executed != rqs_last_prepared);
- internal_error(true, "REPLICATION: cancelled %zu inflight queries", cancelled);
- }
- return REQUEST_QUEUE_EMPTY;
- }
- if(unlikely(!rqs)) {
- max_requests_ahead = get_netdata_cpus() / 2;
- if(max_requests_ahead > libuv_worker_threads * 2)
- max_requests_ahead = libuv_worker_threads * 2;
- if(max_requests_ahead < 2)
- max_requests_ahead = 2;
- rqs = callocz(max_requests_ahead, sizeof(struct replication_request));
- __atomic_add_fetch(&replication_buffers_allocated, max_requests_ahead * sizeof(struct replication_request), __ATOMIC_RELAXED);
- }
- // fill the queue
- do {
- if(++rqs_last_prepared >= max_requests_ahead) {
- rqs_last_prepared = 0;
- queue_rounds++;
- }
- internal_fatal(rqs[rqs_last_prepared].q,
- "REPLAY FATAL: slot is used by query that has not been executed!");
- worker_is_busy(WORKER_JOB_FIND_NEXT);
- rqs[rqs_last_prepared] = replication_request_get_first_available();
- rq = &rqs[rqs_last_prepared];
- if(rq->found) {
- if (!rq->st) {
- worker_is_busy(WORKER_JOB_FIND_CHART);
- rq->st = rrdset_find(rq->sender->host, string2str(rq->chart_id));
- }
- if (rq->st && !rq->q) {
- worker_is_busy(WORKER_JOB_PREPARE_QUERY);
- rq->q = replication_response_prepare(rq->st, rq->start_streaming, rq->after, rq->before);
- }
- rq->executed = false;
- }
- } while(rq->found && rqs_last_prepared != rqs_last_executed);
- // pick the first usable
- do {
- if (++rqs_last_executed >= max_requests_ahead)
- rqs_last_executed = 0;
- rq = &rqs[rqs_last_executed];
- if(rq->found) {
- internal_fatal(rq->executed, "REPLAY FATAL: query has already been executed!");
- if (rq->sender_last_flush_ut != rrdpush_sender_get_flush_time(rq->sender)) {
- // the sender has reconnected since this request was queued,
- // we can safely throw it away, since the parent will resend it
- replication_response_cancel_and_finalize(rq->q);
- rq->executed = true;
- rq->found = false;
- rq->q = NULL;
- }
- else if (rrdpush_sender_replication_buffer_full_get(rq->sender)) {
- // the sender buffer is full, so we can ignore this request,
- // it has already been marked as 'preprocessed' in the dictionary,
- // and the sender will put it back in when there is
- // enough room in the buffer for processing replication requests
- replication_response_cancel_and_finalize(rq->q);
- rq->executed = true;
- rq->found = false;
- rq->q = NULL;
- }
- else {
- // we can execute this,
- // delete it from the dictionary
- worker_is_busy(WORKER_JOB_DELETE_ENTRY);
- dictionary_del(rq->sender->replication.requests, string2str(rq->chart_id));
- }
- }
- else
- internal_fatal(rq->q, "REPLAY FATAL: slot status says slot is empty, but it has a pending query!");
- } while(!rq->found && rqs_last_executed != rqs_last_prepared);
- if(unlikely(!rq->found)) {
- worker_is_idle();
- return REQUEST_QUEUE_EMPTY;
- }
- replication_set_latest_first_time(rq->after);
- bool chart_found = replication_execute_request(rq, true);
- rq->executed = true;
- rq->found = false;
- rq->q = NULL;
- if(unlikely(!chart_found)) {
- worker_is_idle();
- return REQUEST_CHART_NOT_FOUND;
- }
- worker_is_idle();
- return REQUEST_OK;
- }
- static void replication_worker_cleanup(void *ptr __maybe_unused) {
- replication_execute_next_pending_request(true);
- worker_unregister();
- }
- static void *replication_worker_thread(void *ptr) {
- replication_initialize_workers(false);
- netdata_thread_cleanup_push(replication_worker_cleanup, ptr);
- while(service_running(SERVICE_REPLICATION)) {
- if(unlikely(replication_execute_next_pending_request(false) == REQUEST_QUEUE_EMPTY)) {
- sender_thread_buffer_free();
- worker_is_busy(WORKER_JOB_WAIT);
- worker_is_idle();
- sleep_usec(1 * USEC_PER_SEC);
- }
- }
- netdata_thread_cleanup_pop(1);
- return NULL;
- }
- static void replication_main_cleanup(void *ptr) {
- struct netdata_static_thread *static_thread = (struct netdata_static_thread *)ptr;
- static_thread->enabled = NETDATA_MAIN_THREAD_EXITING;
- replication_execute_next_pending_request(true);
- int threads = (int)replication_globals.main_thread.threads;
- for(int i = 0; i < threads ;i++) {
- netdata_thread_join(*replication_globals.main_thread.threads_ptrs[i], NULL);
- freez(replication_globals.main_thread.threads_ptrs[i]);
- __atomic_sub_fetch(&replication_buffers_allocated, sizeof(netdata_thread_t), __ATOMIC_RELAXED);
- }
- freez(replication_globals.main_thread.threads_ptrs);
- replication_globals.main_thread.threads_ptrs = NULL;
- __atomic_sub_fetch(&replication_buffers_allocated, threads * sizeof(netdata_thread_t *), __ATOMIC_RELAXED);
- // custom code
- worker_unregister();
- static_thread->enabled = NETDATA_MAIN_THREAD_EXITED;
- }
- void *replication_thread_main(void *ptr __maybe_unused) {
- replication_initialize_workers(true);
- int threads = config_get_number(CONFIG_SECTION_DB, "replication threads", 1);
- if(threads < 1 || threads > MAX_REPLICATION_THREADS) {
- error("replication threads given %d is invalid, resetting to 1", threads);
- threads = 1;
- }
- if(--threads) {
- replication_globals.main_thread.threads = threads;
- replication_globals.main_thread.threads_ptrs = mallocz(threads * sizeof(netdata_thread_t *));
- __atomic_add_fetch(&replication_buffers_allocated, threads * sizeof(netdata_thread_t *), __ATOMIC_RELAXED);
- for(int i = 0; i < threads ;i++) {
- char tag[NETDATA_THREAD_TAG_MAX + 1];
- snprintfz(tag, NETDATA_THREAD_TAG_MAX, "REPLAY[%d]", i + 2);
- replication_globals.main_thread.threads_ptrs[i] = mallocz(sizeof(netdata_thread_t));
- __atomic_add_fetch(&replication_buffers_allocated, sizeof(netdata_thread_t), __ATOMIC_RELAXED);
- netdata_thread_create(replication_globals.main_thread.threads_ptrs[i], tag,
- NETDATA_THREAD_OPTION_JOINABLE, replication_worker_thread, NULL);
- }
- }
- netdata_thread_cleanup_push(replication_main_cleanup, ptr);
- // start from 100% completed
- worker_set_metric(WORKER_JOB_CUSTOM_METRIC_COMPLETION, 100.0);
- long run_verification_countdown = LONG_MAX; // LONG_MAX to prevent an initial verification when no replication ever took place
- bool slow = true; // control the time we sleep - it has to start with true!
- usec_t last_now_mono_ut = now_monotonic_usec();
- time_t replication_reset_next_point_in_time_countdown = SECONDS_TO_RESET_POINT_IN_TIME; // restart from the beginning every 10 seconds
- size_t last_executed = 0;
- size_t last_sender_resets = 0;
- while(service_running(SERVICE_REPLICATION)) {
- // statistics
- usec_t now_mono_ut = now_monotonic_usec();
- if(unlikely(now_mono_ut - last_now_mono_ut > default_rrd_update_every * USEC_PER_SEC)) {
- last_now_mono_ut = now_mono_ut;
- worker_is_busy(WORKER_JOB_STATISTICS);
- replication_recursive_lock();
- size_t current_executed = __atomic_load_n(&replication_globals.atomic.executed, __ATOMIC_RELAXED);
- if(last_executed != current_executed) {
- run_verification_countdown = ITERATIONS_IDLE_WITHOUT_PENDING_TO_RUN_SENDER_VERIFICATION;
- last_executed = current_executed;
- slow = false;
- }
- if(replication_reset_next_point_in_time_countdown-- == 0) {
- // once per second, make it scan all the pending requests next time
- replication_set_next_point_in_time(0, 0);
- // replication_globals.protected.skipped_no_room_since_last_reset = 0;
- replication_reset_next_point_in_time_countdown = SECONDS_TO_RESET_POINT_IN_TIME;
- }
- if(--run_verification_countdown == 0) {
- if (!replication_globals.unsafe.pending && !replication_globals.unsafe.pending_no_room) {
- // reset the statistics about completion percentage
- replication_globals.unsafe.first_time_t = 0;
- replication_set_latest_first_time(0);
- verify_all_hosts_charts_are_streaming_now();
- run_verification_countdown = LONG_MAX;
- slow = true;
- }
- else
- run_verification_countdown = ITERATIONS_IDLE_WITHOUT_PENDING_TO_RUN_SENDER_VERIFICATION;
- }
- time_t latest_first_time_t = replication_get_latest_first_time();
- if(latest_first_time_t && replication_globals.unsafe.pending) {
- // completion percentage statistics
- time_t now = now_realtime_sec();
- time_t total = now - replication_globals.unsafe.first_time_t;
- time_t done = latest_first_time_t - replication_globals.unsafe.first_time_t;
- worker_set_metric(WORKER_JOB_CUSTOM_METRIC_COMPLETION,
- (NETDATA_DOUBLE) done * 100.0 / (NETDATA_DOUBLE) total);
- }
- else
- worker_set_metric(WORKER_JOB_CUSTOM_METRIC_COMPLETION, 100.0);
- worker_set_metric(WORKER_JOB_CUSTOM_METRIC_PENDING_REQUESTS, (NETDATA_DOUBLE)replication_globals.unsafe.pending);
- worker_set_metric(WORKER_JOB_CUSTOM_METRIC_ADDED, (NETDATA_DOUBLE)replication_globals.unsafe.added);
- worker_set_metric(WORKER_JOB_CUSTOM_METRIC_DONE, (NETDATA_DOUBLE)__atomic_load_n(&replication_globals.atomic.executed, __ATOMIC_RELAXED));
- worker_set_metric(WORKER_JOB_CUSTOM_METRIC_SKIPPED_NO_ROOM, (NETDATA_DOUBLE)replication_globals.unsafe.pending_no_room);
- worker_set_metric(WORKER_JOB_CUSTOM_METRIC_SENDER_RESETS, (NETDATA_DOUBLE)replication_globals.unsafe.sender_resets);
- worker_set_metric(WORKER_JOB_CUSTOM_METRIC_SENDER_FULL, (NETDATA_DOUBLE)replication_globals.unsafe.senders_full);
- replication_recursive_unlock();
- worker_is_idle();
- }
- if(unlikely(replication_execute_next_pending_request(false) == REQUEST_QUEUE_EMPTY)) {
- worker_is_busy(WORKER_JOB_WAIT);
- replication_recursive_lock();
- // the timeout also defines now frequently we will traverse all the pending requests
- // when the outbound buffers of all senders is full
- usec_t timeout;
- if(slow) {
- // no work to be done, wait for a request to come in
- timeout = 1000 * USEC_PER_MS;
- sender_thread_buffer_free();
- }
- else if(replication_globals.unsafe.pending > 0) {
- if(replication_globals.unsafe.sender_resets == last_sender_resets)
- timeout = 1000 * USEC_PER_MS;
- else {
- // there are pending requests waiting to be executed,
- // but none could be executed at this time.
- // try again after this time.
- timeout = 100 * USEC_PER_MS;
- }
- last_sender_resets = replication_globals.unsafe.sender_resets;
- }
- else {
- // no requests pending, but there were requests recently (run_verification_countdown)
- // so, try in a short time.
- // if this is big, one chart replicating will be slow to finish (ping - pong just one chart)
- timeout = 10 * USEC_PER_MS;
- last_sender_resets = replication_globals.unsafe.sender_resets;
- }
- replication_recursive_unlock();
- worker_is_idle();
- sleep_usec(timeout);
- // make it scan all the pending requests next time
- replication_set_next_point_in_time(0, 0);
- replication_reset_next_point_in_time_countdown = SECONDS_TO_RESET_POINT_IN_TIME;
- continue;
- }
- }
- netdata_thread_cleanup_pop(1);
- return NULL;
- }
|