replication.c 47 KB

1234567891011121314151617181920212223242526272829303132333435363738394041424344454647484950515253545556575859606162636465666768697071727374757677787980818283848586878889909192939495969798991001011021031041051061071081091101111121131141151161171181191201211221231241251261271281291301311321331341351361371381391401411421431441451461471481491501511521531541551561571581591601611621631641651661671681691701711721731741751761771781791801811821831841851861871881891901911921931941951961971981992002012022032042052062072082092102112122132142152162172182192202212222232242252262272282292302312322332342352362372382392402412422432442452462472482492502512522532542552562572582592602612622632642652662672682692702712722732742752762772782792802812822832842852862872882892902912922932942952962972982993003013023033043053063073083093103113123133143153163173183193203213223233243253263273283293303313323333343353363373383393403413423433443453463473483493503513523533543553563573583593603613623633643653663673683693703713723733743753763773783793803813823833843853863873883893903913923933943953963973983994004014024034044054064074084094104114124134144154164174184194204214224234244254264274284294304314324334344354364374384394404414424434444454464474484494504514524534544554564574584594604614624634644654664674684694704714724734744754764774784794804814824834844854864874884894904914924934944954964974984995005015025035045055065075085095105115125135145155165175185195205215225235245255265275285295305315325335345355365375385395405415425435445455465475485495505515525535545555565575585595605615625635645655665675685695705715725735745755765775785795805815825835845855865875885895905915925935945955965975985996006016026036046056066076086096106116126136146156166176186196206216226236246256266276286296306316326336346356366376386396406416426436446456466476486496506516526536546556566576586596606616626636646656666676686696706716726736746756766776786796806816826836846856866876886896906916926936946956966976986997007017027037047057067077087097107117127137147157167177187197207217227237247257267277287297307317327337347357367377387397407417427437447457467477487497507517527537547557567577587597607617627637647657667677687697707717727737747757767777787797807817827837847857867877887897907917927937947957967977987998008018028038048058068078088098108118128138148158168178188198208218228238248258268278288298308318328338348358368378388398408418428438448458468478488498508518528538548558568578588598608618628638648658668678688698708718728738748758768778788798808818828838848858868878888898908918928938948958968978988999009019029039049059069079089099109119129139149159169179189199209219229239249259269279289299309319329339349359369379389399409419429439449459469479489499509519529539549559569579589599609619629639649659669679689699709719729739749759769779789799809819829839849859869879889899909919929939949959969979989991000100110021003100410051006100710081009101010111012101310141015101610171018101910201021102210231024102510261027102810291030103110321033103410351036103710381039104010411042104310441045104610471048104910501051105210531054105510561057105810591060106110621063106410651066106710681069107010711072107310741075107610771078107910801081108210831084108510861087108810891090109110921093109410951096109710981099110011011102110311041105110611071108110911101111111211131114111511161117111811191120112111221123112411251126112711281129113011311132113311341135113611371138113911401141114211431144
  1. // SPDX-License-Identifier: GPL-3.0-or-later
  2. #include "replication.h"
  3. #include "Judy.h"
  4. #define MAX_SENDER_BUFFER_PERCENTAGE_ALLOWED 20
  5. #define MIN_SENDER_BUFFER_PERCENTAGE_ALLOWED 10
  6. // ----------------------------------------------------------------------------
  7. // sending replication replies
  8. static time_t replicate_chart_timeframe(BUFFER *wb, RRDSET *st, time_t after, time_t before, bool enable_streaming, time_t wall_clock_time) {
  9. size_t dimensions = rrdset_number_of_dimensions(st);
  10. struct storage_engine_query_ops *ops = &st->rrdhost->db[0].eng->api.query_ops;
  11. struct {
  12. DICTIONARY *dict;
  13. const DICTIONARY_ITEM *rda;
  14. RRDDIM *rd;
  15. struct storage_engine_query_handle handle;
  16. STORAGE_POINT sp;
  17. bool enabled;
  18. } data[dimensions];
  19. memset(data, 0, sizeof(data));
  20. if(enable_streaming && st->last_updated.tv_sec > before) {
  21. internal_error(true, "STREAM_SENDER REPLAY: 'host:%s/chart:%s' has start_streaming = true, adjusting replication before timestamp from %llu to %llu",
  22. rrdhost_hostname(st->rrdhost), rrdset_id(st),
  23. (unsigned long long)before,
  24. (unsigned long long)st->last_updated.tv_sec
  25. );
  26. before = st->last_updated.tv_sec;
  27. }
  28. // prepare our array of dimensions
  29. {
  30. RRDDIM *rd;
  31. rrddim_foreach_read(rd, st) {
  32. if (rd_dfe.counter >= dimensions) {
  33. internal_error(true, "STREAM_SENDER REPLAY ERROR: 'host:%s/chart:%s' has more dimensions than the replicated ones",
  34. rrdhost_hostname(st->rrdhost), rrdset_id(st));
  35. break;
  36. }
  37. if(rd->exposed) {
  38. data[rd_dfe.counter].dict = rd_dfe.dict;
  39. data[rd_dfe.counter].rda = dictionary_acquired_item_dup(rd_dfe.dict, rd_dfe.item);
  40. data[rd_dfe.counter].rd = rd;
  41. ops->init(rd->tiers[0]->db_metric_handle, &data[rd_dfe.counter].handle, after, before);
  42. data[rd_dfe.counter].enabled = true;
  43. }
  44. else
  45. data[rd_dfe.counter].enabled = false;
  46. }
  47. rrddim_foreach_done(rd);
  48. }
  49. time_t now = after + 1, actual_after = 0, actual_before = 0; (void)actual_before;
  50. while(now <= before) {
  51. time_t min_start_time = 0, min_end_time = 0;
  52. for (size_t i = 0; i < dimensions && data[i].rd; i++) {
  53. if(!data[i].enabled) continue;
  54. // fetch the first valid point for the dimension
  55. int max_skip = 100;
  56. while(data[i].sp.end_time < now && !ops->is_finished(&data[i].handle) && max_skip-- > 0)
  57. data[i].sp = ops->next_metric(&data[i].handle);
  58. internal_error(max_skip <= 0,
  59. "STREAM_SENDER REPLAY ERROR: 'host:%s/chart:%s/dim:%s': db does not advance the query beyond time %llu",
  60. rrdhost_hostname(st->rrdhost), rrdset_id(st), rrddim_id(data[i].rd), (unsigned long long) now);
  61. if(data[i].sp.end_time < now)
  62. continue;
  63. if(!min_start_time) {
  64. min_start_time = data[i].sp.start_time;
  65. min_end_time = data[i].sp.end_time;
  66. }
  67. else {
  68. min_start_time = MIN(min_start_time, data[i].sp.start_time);
  69. min_end_time = MIN(min_end_time, data[i].sp.end_time);
  70. }
  71. }
  72. if(min_start_time > wall_clock_time + 1 || min_end_time > wall_clock_time + st->update_every + 1) {
  73. internal_error(true,
  74. "STREAM_SENDER REPLAY ERROR: 'host:%s/chart:%s': db provided future start time %llu or end time %llu (now is %llu)",
  75. rrdhost_hostname(st->rrdhost), rrdset_id(st),
  76. (unsigned long long)min_start_time,
  77. (unsigned long long)min_end_time,
  78. (unsigned long long)wall_clock_time);
  79. break;
  80. }
  81. if(min_end_time < now) {
  82. #ifdef NETDATA_LOG_REPLICATION_REQUESTS
  83. internal_error(true,
  84. "STREAM_SENDER REPLAY: 'host:%s/chart:%s': no data on any dimension beyond time %llu",
  85. rrdhost_hostname(st->rrdhost), rrdset_id(st), (unsigned long long)now);
  86. #endif // NETDATA_LOG_REPLICATION_REQUESTS
  87. break;
  88. }
  89. if(min_end_time <= min_start_time)
  90. min_start_time = min_end_time - st->update_every;
  91. if(!actual_after) {
  92. actual_after = min_end_time;
  93. actual_before = min_end_time;
  94. }
  95. else
  96. actual_before = min_end_time;
  97. buffer_sprintf(wb, PLUGINSD_KEYWORD_REPLAY_BEGIN " '' %llu %llu %llu\n"
  98. , (unsigned long long)min_start_time
  99. , (unsigned long long)min_end_time
  100. , (unsigned long long)wall_clock_time
  101. );
  102. // output the replay values for this time
  103. for (size_t i = 0; i < dimensions && data[i].rd; i++) {
  104. if(!data[i].enabled) continue;
  105. if(data[i].sp.start_time <= min_end_time && data[i].sp.end_time >= min_end_time)
  106. buffer_sprintf(wb, PLUGINSD_KEYWORD_REPLAY_SET " \"%s\" " NETDATA_DOUBLE_FORMAT " \"%s\"\n",
  107. rrddim_id(data[i].rd), data[i].sp.sum, data[i].sp.flags & SN_FLAG_RESET ? "R" : "");
  108. else
  109. buffer_sprintf(wb, PLUGINSD_KEYWORD_REPLAY_SET " \"%s\" NAN \"E\"\n",
  110. rrddim_id(data[i].rd));
  111. }
  112. now = min_end_time + 1;
  113. }
  114. #ifdef NETDATA_LOG_REPLICATION_REQUESTS
  115. if(actual_after) {
  116. char actual_after_buf[LOG_DATE_LENGTH + 1], actual_before_buf[LOG_DATE_LENGTH + 1];
  117. log_date(actual_after_buf, LOG_DATE_LENGTH, actual_after);
  118. log_date(actual_before_buf, LOG_DATE_LENGTH, actual_before);
  119. internal_error(true,
  120. "STREAM_SENDER REPLAY: 'host:%s/chart:%s': sending data %llu [%s] to %llu [%s] (requested %llu [delta %lld] to %llu [delta %lld])",
  121. rrdhost_hostname(st->rrdhost), rrdset_id(st),
  122. (unsigned long long)actual_after, actual_after_buf, (unsigned long long)actual_before, actual_before_buf,
  123. (unsigned long long)after, (long long)(actual_after - after), (unsigned long long)before, (long long)(actual_before - before));
  124. }
  125. else
  126. internal_error(true,
  127. "STREAM_SENDER REPLAY: 'host:%s/chart:%s': nothing to send (requested %llu to %llu)",
  128. rrdhost_hostname(st->rrdhost), rrdset_id(st),
  129. (unsigned long long)after, (unsigned long long)before);
  130. #endif // NETDATA_LOG_REPLICATION_REQUESTS
  131. // release all the dictionary items acquired
  132. // finalize the queries
  133. for(size_t i = 0; i < dimensions && data[i].rda ;i++) {
  134. if(!data[i].enabled) continue;
  135. ops->finalize(&data[i].handle);
  136. dictionary_acquired_item_release(data[i].dict, data[i].rda);
  137. }
  138. return before;
  139. }
  140. static void replicate_chart_collection_state(BUFFER *wb, RRDSET *st) {
  141. RRDDIM *rd;
  142. rrddim_foreach_read(rd, st) {
  143. if(!rd->exposed) continue;
  144. buffer_sprintf(wb, PLUGINSD_KEYWORD_REPLAY_RRDDIM_STATE " \"%s\" %llu %lld " NETDATA_DOUBLE_FORMAT " " NETDATA_DOUBLE_FORMAT "\n",
  145. rrddim_id(rd),
  146. (usec_t)rd->last_collected_time.tv_sec * USEC_PER_SEC + (usec_t)rd->last_collected_time.tv_usec,
  147. rd->last_collected_value,
  148. rd->last_calculated_value,
  149. rd->last_stored_value
  150. );
  151. }
  152. rrddim_foreach_done(rd);
  153. buffer_sprintf(wb, PLUGINSD_KEYWORD_REPLAY_RRDSET_STATE " %llu %llu\n",
  154. (usec_t)st->last_collected_time.tv_sec * USEC_PER_SEC + (usec_t)st->last_collected_time.tv_usec,
  155. (usec_t)st->last_updated.tv_sec * USEC_PER_SEC + (usec_t)st->last_updated.tv_usec
  156. );
  157. }
  158. bool replicate_chart_response(RRDHOST *host, RRDSET *st, bool start_streaming, time_t after, time_t before) {
  159. time_t query_after = after;
  160. time_t query_before = before;
  161. time_t now = now_realtime_sec();
  162. time_t tolerance = 2; // sometimes from the time we get this value, to the time we check,
  163. // a data collection has been made
  164. // so, we give this tolerance to detect invalid timestamps
  165. // find the first entry we have
  166. time_t first_entry_local = rrdset_first_entry_t(st);
  167. if(first_entry_local > now + tolerance) {
  168. internal_error(true,
  169. "STREAM_SENDER REPLAY ERROR: 'host:%s/chart:%s' db first time %llu is in the future (now is %llu)",
  170. rrdhost_hostname(st->rrdhost), rrdset_id(st),
  171. (unsigned long long)first_entry_local, (unsigned long long)now);
  172. first_entry_local = now;
  173. }
  174. if (query_after < first_entry_local)
  175. query_after = first_entry_local;
  176. // find the latest entry we have
  177. time_t last_entry_local = st->last_updated.tv_sec;
  178. if(!last_entry_local) {
  179. internal_error(true,
  180. "STREAM_SENDER REPLAY ERROR: 'host:%s/chart:%s' RRDSET reports last updated time zero.",
  181. rrdhost_hostname(st->rrdhost), rrdset_id(st));
  182. last_entry_local = rrdset_last_entry_t(st);
  183. if(!last_entry_local) {
  184. internal_error(true,
  185. "STREAM_SENDER REPLAY ERROR: 'host:%s/chart:%s' db reports last time zero.",
  186. rrdhost_hostname(st->rrdhost), rrdset_id(st));
  187. last_entry_local = now;
  188. }
  189. }
  190. if(last_entry_local > now + tolerance) {
  191. internal_error(true,
  192. "STREAM_SENDER REPLAY ERROR: 'host:%s/chart:%s' last updated time %llu is in the future (now is %llu)",
  193. rrdhost_hostname(st->rrdhost), rrdset_id(st),
  194. (unsigned long long)last_entry_local, (unsigned long long)now);
  195. last_entry_local = now;
  196. }
  197. if (query_before > last_entry_local)
  198. query_before = last_entry_local;
  199. // if the parent asked us to start streaming, then fill the rest with the data that we have
  200. if (start_streaming)
  201. query_before = last_entry_local;
  202. if (query_after > query_before) {
  203. time_t tmp = query_before;
  204. query_before = query_after;
  205. query_after = tmp;
  206. }
  207. bool enable_streaming = (start_streaming || query_before == last_entry_local || !after || !before) ? true : false;
  208. // we might want to optimize this by filling a temporary buffer
  209. // and copying the result to the host's buffer in order to avoid
  210. // holding the host's buffer lock for too long
  211. BUFFER *wb = sender_start(host->sender);
  212. buffer_sprintf(wb, PLUGINSD_KEYWORD_REPLAY_BEGIN " \"%s\"\n", rrdset_id(st));
  213. if(after != 0 && before != 0)
  214. before = replicate_chart_timeframe(wb, st, query_after, query_before, enable_streaming, now);
  215. else {
  216. after = 0;
  217. before = 0;
  218. enable_streaming = true;
  219. }
  220. // get again the world clock time
  221. time_t world_clock_time = now_realtime_sec();
  222. if(enable_streaming) {
  223. if(now < world_clock_time) {
  224. // we needed time to execute this request
  225. // so, the parent will need to replicate more data
  226. enable_streaming = false;
  227. }
  228. else
  229. replicate_chart_collection_state(wb, st);
  230. }
  231. // end with first/last entries we have, and the first start time and
  232. // last end time of the data we sent
  233. buffer_sprintf(wb, PLUGINSD_KEYWORD_REPLAY_END " %d %llu %llu %s %llu %llu %llu\n",
  234. // current chart update every
  235. (int)st->update_every
  236. // child first db time, child end db time
  237. , (unsigned long long)first_entry_local, (unsigned long long)last_entry_local
  238. // start streaming boolean
  239. , enable_streaming ? "true" : "false"
  240. // after requested, before requested ('before' can be altered by the child when the request had enable_streaming true)
  241. , (unsigned long long)after, (unsigned long long)before
  242. // child world clock time
  243. , (unsigned long long)world_clock_time
  244. );
  245. sender_commit(host->sender, wb);
  246. return enable_streaming;
  247. }
  248. // ----------------------------------------------------------------------------
  249. // sending replication requests
  250. struct replication_request_details {
  251. struct {
  252. send_command callback;
  253. void *data;
  254. } caller;
  255. RRDHOST *host;
  256. RRDSET *st;
  257. struct {
  258. time_t first_entry_t; // the first entry time the child has
  259. time_t last_entry_t; // the last entry time the child has
  260. time_t world_time_t; // the current time of the child
  261. } child_db;
  262. struct {
  263. time_t first_entry_t; // the first entry time we have
  264. time_t last_entry_t; // the last entry time we have
  265. bool last_entry_t_adjusted_to_now; // true, if the last entry time was in the future and we fixed
  266. time_t now; // the current local world clock time
  267. } local_db;
  268. struct {
  269. time_t from; // the starting time of the entire gap we have
  270. time_t to; // the ending time of the entire gap we have
  271. } gap;
  272. struct {
  273. time_t after; // the start time we requested previously from this child
  274. time_t before; // the end time we requested previously from this child
  275. } last_request;
  276. struct {
  277. time_t after; // the start time of this replication request - the child will add 1 second
  278. time_t before; // the end time of this replication request
  279. bool start_streaming; // true when we want the child to send anything remaining and start streaming - the child will overwrite 'before'
  280. } wanted;
  281. };
  282. static bool send_replay_chart_cmd(struct replication_request_details *r, const char *msg __maybe_unused) {
  283. RRDSET *st = r->st;
  284. if(st->rrdhost->receiver && (!st->rrdhost->receiver->replication_first_time_t || r->wanted.after < st->rrdhost->receiver->replication_first_time_t))
  285. st->rrdhost->receiver->replication_first_time_t = r->wanted.after;
  286. #ifdef NETDATA_LOG_REPLICATION_REQUESTS
  287. st->replay.log_next_data_collection = true;
  288. char wanted_after_buf[LOG_DATE_LENGTH + 1] = "", wanted_before_buf[LOG_DATE_LENGTH + 1] = "";
  289. if(r->wanted.after)
  290. log_date(wanted_after_buf, LOG_DATE_LENGTH, r->wanted.after);
  291. if(r->wanted.before)
  292. log_date(wanted_before_buf, LOG_DATE_LENGTH, r->wanted.before);
  293. internal_error(true,
  294. "REPLAY: 'host:%s/chart:%s' sending replication request %ld [%s] to %ld [%s], start streaming '%s': %s: "
  295. "last[%ld - %ld] child[%ld - %ld, now %ld %s] local[%ld - %ld %s, now %ld] gap[%ld - %ld %s] %s"
  296. , rrdhost_hostname(r->host), rrdset_id(r->st)
  297. , r->wanted.after, wanted_after_buf
  298. , r->wanted.before, wanted_before_buf
  299. , r->wanted.start_streaming ? "YES" : "NO"
  300. , msg
  301. , r->last_request.after, r->last_request.before
  302. , r->child_db.first_entry_t, r->child_db.last_entry_t
  303. , r->child_db.world_time_t, (r->child_db.world_time_t == r->local_db.now) ? "SAME" : (r->child_db.world_time_t < r->local_db.now) ? "BEHIND" : "AHEAD"
  304. , r->local_db.first_entry_t, r->local_db.last_entry_t
  305. , r->local_db.last_entry_t_adjusted_to_now?"FIXED":"RAW", r->local_db.now
  306. , r->gap.from, r->gap.to
  307. , (r->gap.from == r->wanted.after) ? "FULL" : "PARTIAL"
  308. , (st->replay.after != 0 || st->replay.before != 0) ? "OVERLAPPING" : ""
  309. );
  310. st->replay.start_streaming = r->wanted.start_streaming;
  311. st->replay.after = r->wanted.after;
  312. st->replay.before = r->wanted.before;
  313. #endif // NETDATA_LOG_REPLICATION_REQUESTS
  314. char buffer[2048 + 1];
  315. snprintfz(buffer, 2048, PLUGINSD_KEYWORD_REPLAY_CHART " \"%s\" \"%s\" %llu %llu\n",
  316. rrdset_id(st), r->wanted.start_streaming ? "true" : "false",
  317. (unsigned long long)r->wanted.after, (unsigned long long)r->wanted.before);
  318. int ret = r->caller.callback(buffer, r->caller.data);
  319. if (ret < 0) {
  320. error("REPLAY ERROR: 'host:%s/chart:%s' failed to send replication request to child (error %d)",
  321. rrdhost_hostname(r->host), rrdset_id(r->st), ret);
  322. return false;
  323. }
  324. return true;
  325. }
  326. bool replicate_chart_request(send_command callback, void *callback_data, RRDHOST *host, RRDSET *st,
  327. time_t first_entry_child, time_t last_entry_child, time_t child_world_time,
  328. time_t prev_first_entry_wanted, time_t prev_last_entry_wanted)
  329. {
  330. struct replication_request_details r = {
  331. .caller = {
  332. .callback = callback,
  333. .data = callback_data,
  334. },
  335. .host = host,
  336. .st = st,
  337. .child_db = {
  338. .first_entry_t = first_entry_child,
  339. .last_entry_t = last_entry_child,
  340. .world_time_t = child_world_time,
  341. },
  342. .local_db = {
  343. .first_entry_t = rrdset_first_entry_t(st),
  344. .last_entry_t = rrdset_last_entry_t(st),
  345. .last_entry_t_adjusted_to_now = false,
  346. .now = now_realtime_sec(),
  347. },
  348. .last_request = {
  349. .after = prev_first_entry_wanted,
  350. .before = prev_last_entry_wanted,
  351. },
  352. .wanted = {
  353. .after = 0,
  354. .before = 0,
  355. .start_streaming = true,
  356. },
  357. };
  358. // check our local database retention
  359. if(r.local_db.last_entry_t > r.local_db.now) {
  360. r.local_db.last_entry_t = r.local_db.now;
  361. r.local_db.last_entry_t_adjusted_to_now = true;
  362. }
  363. // let's find the GAP we have
  364. if(!r.last_request.after || !r.last_request.before) {
  365. // there is no previous request
  366. if(r.local_db.last_entry_t)
  367. // we have some data, let's continue from the last point we have
  368. r.gap.from = r.local_db.last_entry_t;
  369. else
  370. // we don't have any data, the gap is the max timeframe we are allowed to replicate
  371. r.gap.from = r.local_db.now - r.host->rrdpush_seconds_to_replicate;
  372. }
  373. else {
  374. // we had sent a request - let's continue at the point we left it
  375. // for this we don't take into account the actual data in our db
  376. // because the child may also have gaps and we need to get over it
  377. r.gap.from = r.last_request.before;
  378. }
  379. // we want all the data up to now
  380. r.gap.to = r.local_db.now;
  381. // The gap is now r.gap.from -> r.gap.to
  382. if (unlikely(!rrdhost_option_check(host, RRDHOST_OPTION_REPLICATION)))
  383. return send_replay_chart_cmd(&r, "empty replication request, replication is disabled");
  384. if (unlikely(!r.child_db.last_entry_t))
  385. return send_replay_chart_cmd(&r, "empty replication request, child has no stored data");
  386. if (unlikely(!rrdset_number_of_dimensions(st)))
  387. return send_replay_chart_cmd(&r, "empty replication request, chart has no dimensions");
  388. if (r.child_db.first_entry_t <= 0)
  389. return send_replay_chart_cmd(&r, "empty replication request, first entry of the child db first entry is invalid");
  390. if (r.child_db.first_entry_t > r.child_db.last_entry_t)
  391. return send_replay_chart_cmd(&r, "empty replication request, child timings are invalid (first entry > last entry)");
  392. if (r.local_db.last_entry_t > r.child_db.last_entry_t)
  393. return send_replay_chart_cmd(&r, "empty replication request, local last entry is later than the child one");
  394. // let's find what the child can provide to fill that gap
  395. if(r.child_db.first_entry_t > r.gap.from)
  396. // the child does not have all the data - let's get what it has
  397. r.wanted.after = r.child_db.first_entry_t;
  398. else
  399. // ok, the child can fill the entire gap we have
  400. r.wanted.after = r.gap.from;
  401. if(r.gap.to - r.wanted.after > host->rrdpush_replication_step)
  402. // the duration is too big for one request - let's take the first step
  403. r.wanted.before = r.wanted.after + host->rrdpush_replication_step;
  404. else
  405. // wow, we can do it in one request
  406. r.wanted.before = r.gap.to;
  407. // don't ask from the child more than it has
  408. if(r.wanted.before > r.child_db.last_entry_t)
  409. r.wanted.before = r.child_db.last_entry_t;
  410. if(r.wanted.after > r.wanted.before)
  411. r.wanted.after = r.wanted.before;
  412. // the child should start streaming immediately if the wanted duration is small or we reached the last entry of the child
  413. r.wanted.start_streaming = (r.local_db.now - r.wanted.after <= host->rrdpush_replication_step || r.wanted.before == r.child_db.last_entry_t);
  414. // the wanted timeframe is now r.wanted.after -> r.wanted.before
  415. // send it
  416. return send_replay_chart_cmd(&r, "OK");
  417. }
  418. // ----------------------------------------------------------------------------
  419. // replication thread
  420. // replication request in sender DICTIONARY
  421. // used for de-duplicating the requests
  422. struct replication_request {
  423. struct sender_state *sender; // the sender we should put the reply at
  424. STRING *chart_id; // the chart of the request
  425. time_t after; // the start time of the query (maybe zero) key for sorting (JudyL)
  426. time_t before; // the end time of the query (maybe zero)
  427. bool start_streaming; // true, when the parent wants to send the rest of the data (before is overwritten) and enable normal streaming
  428. usec_t sender_last_flush_ut; // the timestamp of the sender, at the time we indexed this request
  429. Word_t unique_id; // auto-increment, later requests have bigger
  430. bool found; // used as a result boolean for the find call
  431. bool indexed_in_judy; // true when the request is indexed in judy
  432. };
  433. // replication sort entry in JudyL array
  434. // used for sorting all requests, across all nodes
  435. struct replication_sort_entry {
  436. struct replication_request *rq;
  437. size_t unique_id; // used as a key to identify the sort entry - we never access its contents
  438. };
  439. // the global variables for the replication thread
  440. static struct replication_thread {
  441. netdata_mutex_t mutex;
  442. size_t pending;
  443. size_t added;
  444. size_t executed;
  445. size_t removed;
  446. size_t last_executed;
  447. time_t first_time_t;
  448. Word_t next_unique_id;
  449. struct replication_request *requests;
  450. Word_t last_after;
  451. Word_t last_unique_id;
  452. size_t skipped_not_connected;
  453. size_t skipped_no_room;
  454. size_t sender_resets;
  455. size_t waits;
  456. Pvoid_t JudyL_array;
  457. } replication_globals = {
  458. .mutex = NETDATA_MUTEX_INITIALIZER,
  459. .pending = 0,
  460. .added = 0,
  461. .executed = 0,
  462. .last_executed = 0,
  463. .first_time_t = 0,
  464. .next_unique_id = 1,
  465. .skipped_no_room = 0,
  466. .skipped_not_connected = 0,
  467. .sender_resets = 0,
  468. .waits = 0,
  469. .requests = NULL,
  470. .JudyL_array = NULL,
  471. };
  472. static __thread int replication_recursive_mutex_recursions = 0;
  473. static void replication_recursive_lock() {
  474. if(++replication_recursive_mutex_recursions == 1)
  475. netdata_mutex_lock(&replication_globals.mutex);
  476. #ifdef NETDATA_INTERNAL_CHECKS
  477. if(replication_recursive_mutex_recursions < 0 || replication_recursive_mutex_recursions > 2)
  478. fatal("REPLICATION: recursions is %d", replication_recursive_mutex_recursions);
  479. #endif
  480. }
  481. static void replication_recursive_unlock() {
  482. if(--replication_recursive_mutex_recursions == 0)
  483. netdata_mutex_unlock(&replication_globals.mutex);
  484. #ifdef NETDATA_INTERNAL_CHECKS
  485. if(replication_recursive_mutex_recursions < 0 || replication_recursive_mutex_recursions > 2)
  486. fatal("REPLICATION: recursions is %d", replication_recursive_mutex_recursions);
  487. #endif
  488. }
  489. // ----------------------------------------------------------------------------
  490. // replication sort entry management
  491. static struct replication_sort_entry *replication_sort_entry_create(struct replication_request *rq) {
  492. struct replication_sort_entry *rse = mallocz(sizeof(struct replication_sort_entry));
  493. rrdpush_sender_pending_replication_requests_plus_one(rq->sender);
  494. // copy the request
  495. rse->rq = rq;
  496. rse->unique_id = replication_globals.next_unique_id++;
  497. // save the unique id into the request, to be able to delete it later
  498. rq->unique_id = rse->unique_id;
  499. rq->indexed_in_judy = false;
  500. return rse;
  501. }
  502. static void replication_sort_entry_destroy(struct replication_sort_entry *rse) {
  503. freez(rse);
  504. }
  505. static struct replication_sort_entry *replication_sort_entry_add(struct replication_request *rq) {
  506. replication_recursive_lock();
  507. struct replication_sort_entry *rse = replication_sort_entry_create(rq);
  508. if(rq->after < (time_t)replication_globals.last_after) {
  509. // make it find this request first
  510. replication_globals.last_after = rq->after;
  511. replication_globals.last_unique_id = rq->unique_id;
  512. }
  513. replication_globals.added++;
  514. replication_globals.pending++;
  515. Pvoid_t *inner_judy_ptr;
  516. // find the outer judy entry, using after as key
  517. inner_judy_ptr = JudyLGet(replication_globals.JudyL_array, (Word_t) rq->after, PJE0);
  518. if(!inner_judy_ptr)
  519. inner_judy_ptr = JudyLIns(&replication_globals.JudyL_array, (Word_t) rq->after, PJE0);
  520. // add it to the inner judy, using unique_id as key
  521. Pvoid_t *item = JudyLIns(inner_judy_ptr, rq->unique_id, PJE0);
  522. *item = rse;
  523. rq->indexed_in_judy = true;
  524. if(!replication_globals.first_time_t || rq->after < replication_globals.first_time_t)
  525. replication_globals.first_time_t = rq->after;
  526. replication_recursive_unlock();
  527. return rse;
  528. }
  529. static bool replication_sort_entry_unlink_and_free_unsafe(struct replication_sort_entry *rse, Pvoid_t **inner_judy_ppptr) {
  530. bool inner_judy_deleted = false;
  531. replication_globals.removed++;
  532. replication_globals.pending--;
  533. rrdpush_sender_pending_replication_requests_minus_one(rse->rq->sender);
  534. rse->rq->indexed_in_judy = false;
  535. // delete it from the inner judy
  536. JudyLDel(*inner_judy_ppptr, rse->rq->unique_id, PJE0);
  537. // if no items left, delete it from the outer judy
  538. if(**inner_judy_ppptr == NULL) {
  539. JudyLDel(&replication_globals.JudyL_array, rse->rq->after, PJE0);
  540. inner_judy_deleted = true;
  541. }
  542. // free memory
  543. replication_sort_entry_destroy(rse);
  544. return inner_judy_deleted;
  545. }
  546. static void replication_sort_entry_del(struct replication_request *rq) {
  547. Pvoid_t *inner_judy_pptr;
  548. struct replication_sort_entry *rse_to_delete = NULL;
  549. replication_recursive_lock();
  550. if(rq->indexed_in_judy) {
  551. inner_judy_pptr = JudyLGet(replication_globals.JudyL_array, rq->after, PJE0);
  552. if (inner_judy_pptr) {
  553. Pvoid_t *our_item_pptr = JudyLGet(*inner_judy_pptr, rq->unique_id, PJE0);
  554. if (our_item_pptr) {
  555. rse_to_delete = *our_item_pptr;
  556. replication_sort_entry_unlink_and_free_unsafe(rse_to_delete, &inner_judy_pptr);
  557. }
  558. }
  559. if (!rse_to_delete)
  560. fatal("REPLAY: 'host:%s/chart:%s' Cannot find sort entry to delete for time %ld.",
  561. rrdhost_hostname(rq->sender->host), string2str(rq->chart_id), rq->after);
  562. }
  563. replication_recursive_unlock();
  564. }
  565. static inline PPvoid_t JudyLFirstOrNext(Pcvoid_t PArray, Word_t * PIndex, bool first) {
  566. if(unlikely(first))
  567. return JudyLFirst(PArray, PIndex, PJE0);
  568. return JudyLNext(PArray, PIndex, PJE0);
  569. }
  570. static struct replication_request replication_request_get_first_available() {
  571. Pvoid_t *inner_judy_pptr;
  572. replication_recursive_lock();
  573. struct replication_request rq = (struct replication_request){ .found = false };
  574. if(unlikely(!replication_globals.last_after || !replication_globals.last_unique_id)) {
  575. replication_globals.last_after = 0;
  576. replication_globals.last_unique_id = 0;
  577. }
  578. bool find_same_after = true;
  579. while(!rq.found && (inner_judy_pptr = JudyLFirstOrNext(replication_globals.JudyL_array, &replication_globals.last_after, find_same_after))) {
  580. Pvoid_t *our_item_pptr;
  581. while(!rq.found && (our_item_pptr = JudyLNext(*inner_judy_pptr, &replication_globals.last_unique_id, PJE0))) {
  582. struct replication_sort_entry *rse = *our_item_pptr;
  583. struct sender_state *s = rse->rq->sender;
  584. bool sender_is_connected =
  585. rrdhost_flag_check(s->host, RRDHOST_FLAG_RRDPUSH_SENDER_CONNECTED);
  586. bool sender_has_been_flushed_since_this_request =
  587. rse->rq->sender_last_flush_ut != rrdpush_sender_get_flush_time(s);
  588. bool sender_has_room_to_spare =
  589. s->buffer_used_percentage <= MAX_SENDER_BUFFER_PERCENTAGE_ALLOWED;
  590. if(unlikely(!sender_is_connected || sender_has_been_flushed_since_this_request)) {
  591. replication_globals.skipped_not_connected++;
  592. if(replication_sort_entry_unlink_and_free_unsafe(rse, &inner_judy_pptr))
  593. break;
  594. }
  595. else if(sender_has_room_to_spare) {
  596. // copy the request to return it
  597. rq = *rse->rq;
  598. rq.chart_id = string_dup(rq.chart_id);
  599. // set the return result to found
  600. rq.found = true;
  601. if(replication_sort_entry_unlink_and_free_unsafe(rse, &inner_judy_pptr))
  602. break;
  603. }
  604. else
  605. replication_globals.skipped_no_room++;
  606. }
  607. // call JudyLNext from now on
  608. find_same_after = false;
  609. // prepare for the next iteration on the outer loop
  610. replication_globals.last_unique_id = 0;
  611. }
  612. replication_recursive_unlock();
  613. return rq;
  614. }
  615. // ----------------------------------------------------------------------------
  616. // replication request management
  617. static void replication_request_react_callback(const DICTIONARY_ITEM *item __maybe_unused, void *value __maybe_unused, void *sender_state __maybe_unused) {
  618. struct sender_state *s = sender_state; (void)s;
  619. struct replication_request *rq = value;
  620. // IMPORTANT:
  621. // We use the react instead of the insert callback
  622. // because we want the item to be atomically visible
  623. // to our replication thread, immediately after.
  624. // If we put this at the insert callback, the item is not guaranteed
  625. // to be atomically visible to others, so the replication thread
  626. // may see the replication sort entry, but fail to find the dictionary item
  627. // related to it.
  628. replication_sort_entry_add(rq);
  629. // this request is about a unique chart for this sender
  630. rrdpush_sender_replicating_charts_plus_one(s);
  631. }
  632. static bool replication_request_conflict_callback(const DICTIONARY_ITEM *item __maybe_unused, void *old_value, void *new_value, void *sender_state) {
  633. struct sender_state *s = sender_state; (void)s;
  634. struct replication_request *rq = old_value; (void)rq;
  635. struct replication_request *rq_new = new_value;
  636. replication_recursive_lock();
  637. if(!rq->indexed_in_judy) {
  638. replication_sort_entry_add(rq);
  639. internal_error(
  640. true,
  641. "STREAM %s [send to %s]: REPLAY: 'host:%s/chart:%s' adding duplicate replication command received (existing from %llu to %llu [%s], new from %llu to %llu [%s])",
  642. rrdhost_hostname(s->host), s->connected_to, rrdhost_hostname(s->host), dictionary_acquired_item_name(item),
  643. (unsigned long long)rq->after, (unsigned long long)rq->before, rq->start_streaming ? "true" : "false",
  644. (unsigned long long)rq_new->after, (unsigned long long)rq_new->before, rq_new->start_streaming ? "true" : "false");
  645. }
  646. else {
  647. internal_error(
  648. true,
  649. "STREAM %s [send to %s]: REPLAY: 'host:%s/chart:%s' ignoring duplicate replication command received (existing from %llu to %llu [%s], new from %llu to %llu [%s])",
  650. rrdhost_hostname(s->host), s->connected_to, rrdhost_hostname(s->host),
  651. dictionary_acquired_item_name(item),
  652. (unsigned long long) rq->after, (unsigned long long) rq->before, rq->start_streaming ? "true" : "false",
  653. (unsigned long long) rq_new->after, (unsigned long long) rq_new->before, rq_new->start_streaming ? "true" : "false");
  654. }
  655. replication_recursive_unlock();
  656. string_freez(rq_new->chart_id);
  657. return false;
  658. }
  659. static void replication_request_delete_callback(const DICTIONARY_ITEM *item __maybe_unused, void *value, void *sender_state __maybe_unused) {
  660. struct replication_request *rq = value;
  661. // this request is about a unique chart for this sender
  662. rrdpush_sender_replicating_charts_minus_one(rq->sender);
  663. if(rq->indexed_in_judy)
  664. replication_sort_entry_del(rq);
  665. string_freez(rq->chart_id);
  666. }
  667. // ----------------------------------------------------------------------------
  668. // public API
  669. void replication_add_request(struct sender_state *sender, const char *chart_id, time_t after, time_t before, bool start_streaming) {
  670. struct replication_request rq = {
  671. .sender = sender,
  672. .chart_id = string_strdupz(chart_id),
  673. .after = after,
  674. .before = before,
  675. .start_streaming = start_streaming,
  676. .sender_last_flush_ut = rrdpush_sender_get_flush_time(sender),
  677. };
  678. dictionary_set(sender->replication_requests, chart_id, &rq, sizeof(struct replication_request));
  679. }
  680. void replication_sender_delete_pending_requests(struct sender_state *sender) {
  681. // allow the dictionary destructor to go faster on locks
  682. replication_recursive_lock();
  683. dictionary_flush(sender->replication_requests);
  684. replication_recursive_unlock();
  685. }
  686. void replication_init_sender(struct sender_state *sender) {
  687. sender->replication_requests = dictionary_create(DICT_OPTION_DONT_OVERWRITE_VALUE);
  688. dictionary_register_react_callback(sender->replication_requests, replication_request_react_callback, sender);
  689. dictionary_register_conflict_callback(sender->replication_requests, replication_request_conflict_callback, sender);
  690. dictionary_register_delete_callback(sender->replication_requests, replication_request_delete_callback, sender);
  691. }
  692. void replication_cleanup_sender(struct sender_state *sender) {
  693. // allow the dictionary destructor to go faster on locks
  694. replication_recursive_lock();
  695. dictionary_destroy(sender->replication_requests);
  696. replication_recursive_unlock();
  697. }
  698. void replication_recalculate_buffer_used_ratio_unsafe(struct sender_state *s) {
  699. size_t available = cbuffer_available_size_unsafe(s->host->sender->buffer);
  700. size_t percentage = (s->buffer->max_size - available) * 100 / s->buffer->max_size;
  701. if(percentage > MAX_SENDER_BUFFER_PERCENTAGE_ALLOWED)
  702. s->replication_reached_max = true;
  703. if(s->replication_reached_max &&
  704. percentage <= MIN_SENDER_BUFFER_PERCENTAGE_ALLOWED) {
  705. s->replication_reached_max = false;
  706. replication_recursive_lock();
  707. replication_globals.last_after = 0;
  708. replication_globals.last_unique_id = 0;
  709. replication_globals.sender_resets++;
  710. replication_recursive_unlock();
  711. }
  712. s->buffer_used_percentage = percentage;
  713. }
  714. // ----------------------------------------------------------------------------
  715. // replication thread
  716. static void replication_main_cleanup(void *ptr) {
  717. struct netdata_static_thread *static_thread = (struct netdata_static_thread *)ptr;
  718. static_thread->enabled = NETDATA_MAIN_THREAD_EXITING;
  719. // custom code
  720. worker_unregister();
  721. static_thread->enabled = NETDATA_MAIN_THREAD_EXITED;
  722. }
  723. #define WORKER_JOB_FIND_NEXT 1
  724. #define WORKER_JOB_QUERYING 2
  725. #define WORKER_JOB_DELETE_ENTRY 3
  726. #define WORKER_JOB_FIND_CHART 4
  727. #define WORKER_JOB_STATISTICS 5
  728. #define WORKER_JOB_ACTIVATE_ENABLE_STREAMING 6
  729. #define WORKER_JOB_CUSTOM_METRIC_PENDING_REQUESTS 7
  730. #define WORKER_JOB_CUSTOM_METRIC_COMPLETION 8
  731. #define WORKER_JOB_CUSTOM_METRIC_ADDED 9
  732. #define WORKER_JOB_CUSTOM_METRIC_DONE 10
  733. #define WORKER_JOB_CUSTOM_METRIC_SKIPPED_NOT_CONNECTED 11
  734. #define WORKER_JOB_CUSTOM_METRIC_SKIPPED_NO_ROOM 12
  735. #define WORKER_JOB_CUSTOM_METRIC_SENDER_RESETS 13
  736. #define WORKER_JOB_CUSTOM_METRIC_WAITS 14
  737. #define WORKER_JOB_CHECK_CONSISTENCY 15
  738. #define ITERATIONS_IDLE_WITHOUT_PENDING_TO_RUN_SENDER_VERIFICATION 10
  739. static size_t verify_host_charts_are_streaming_now(RRDHOST *host) {
  740. if(host->sender) {
  741. size_t pending_requests = host->sender->replication_pending_requests;
  742. size_t dict_entries = dictionary_entries(host->sender->replication_requests);
  743. internal_error(
  744. !pending_requests && dict_entries,
  745. "REPLICATION SUMMARY: 'host:%s' reports %zu pending replication requests, but its chart replication index says there are %zu charts pending replication",
  746. rrdhost_hostname(host), pending_requests, dict_entries);
  747. }
  748. size_t ok = 0;
  749. size_t errors = 0;
  750. RRDSET *st;
  751. rrdset_foreach_read(st, host) {
  752. RRDSET_FLAGS flags = rrdset_flag_check(st, RRDSET_FLAG_SENDER_REPLICATION_IN_PROGRESS | RRDSET_FLAG_SENDER_REPLICATION_FINISHED);
  753. bool is_error = false;
  754. if(!flags) {
  755. internal_error(
  756. true,
  757. "REPLICATION SUMMARY: 'host:%s/chart:%s' is neither IN PROGRESS nor FINISHED",
  758. rrdhost_hostname(host), rrdset_id(st)
  759. );
  760. is_error = true;
  761. }
  762. if(!(flags & RRDSET_FLAG_SENDER_REPLICATION_FINISHED) || (flags & RRDSET_FLAG_SENDER_REPLICATION_IN_PROGRESS)) {
  763. internal_error(
  764. true,
  765. "REPLICATION SUMMARY: 'host:%s/chart:%s' is IN PROGRESS although replication is finished",
  766. rrdhost_hostname(host), rrdset_id(st)
  767. );
  768. is_error = true;
  769. }
  770. if(is_error)
  771. errors++;
  772. else
  773. ok++;
  774. }
  775. rrdset_foreach_done(st);
  776. internal_error(errors,
  777. "REPLICATION SUMMARY: 'host:%s' finished replicating %zu charts, but %zu charts are still in progress although replication finished",
  778. rrdhost_hostname(host), ok, errors);
  779. return errors;
  780. }
  781. static void verify_all_hosts_charts_are_streaming_now(void) {
  782. #ifdef NETDATA_INTERNAL_CHECKS
  783. worker_is_busy(WORKER_JOB_CHECK_CONSISTENCY);
  784. size_t errors = 0;
  785. RRDHOST *host;
  786. dfe_start_reentrant(rrdhost_root_index, host)
  787. errors += verify_host_charts_are_streaming_now(host);
  788. dfe_done(host);
  789. size_t executed = replication_globals.executed;
  790. internal_error(true, "REPLICATION SUMMARY: finished, executed %zu replication requests, %zu charts pending replication", executed - replication_globals.last_executed, errors);
  791. replication_globals.last_executed = executed;
  792. #else
  793. ;
  794. #endif
  795. }
  796. void *replication_thread_main(void *ptr __maybe_unused) {
  797. netdata_thread_cleanup_push(replication_main_cleanup, ptr);
  798. worker_register("REPLICATION");
  799. worker_register_job_name(WORKER_JOB_FIND_NEXT, "find next");
  800. worker_register_job_name(WORKER_JOB_QUERYING, "querying");
  801. worker_register_job_name(WORKER_JOB_DELETE_ENTRY, "dict delete");
  802. worker_register_job_name(WORKER_JOB_FIND_CHART, "find chart");
  803. worker_register_job_name(WORKER_JOB_ACTIVATE_ENABLE_STREAMING, "enable streaming");
  804. worker_register_job_name(WORKER_JOB_CHECK_CONSISTENCY, "check consistency");
  805. worker_register_job_name(WORKER_JOB_STATISTICS, "statistics");
  806. worker_register_job_custom_metric(WORKER_JOB_CUSTOM_METRIC_PENDING_REQUESTS, "pending requests", "requests", WORKER_METRIC_ABSOLUTE);
  807. worker_register_job_custom_metric(WORKER_JOB_CUSTOM_METRIC_COMPLETION, "completion", "%", WORKER_METRIC_ABSOLUTE);
  808. worker_register_job_custom_metric(WORKER_JOB_CUSTOM_METRIC_ADDED, "added requests", "requests/s", WORKER_METRIC_INCREMENTAL_TOTAL);
  809. worker_register_job_custom_metric(WORKER_JOB_CUSTOM_METRIC_DONE, "finished requests", "requests/s", WORKER_METRIC_INCREMENTAL_TOTAL);
  810. worker_register_job_custom_metric(WORKER_JOB_CUSTOM_METRIC_SKIPPED_NOT_CONNECTED, "not connected requests", "requests/s", WORKER_METRIC_INCREMENTAL_TOTAL);
  811. worker_register_job_custom_metric(WORKER_JOB_CUSTOM_METRIC_SKIPPED_NO_ROOM, "no room requests", "requests/s", WORKER_METRIC_INCREMENTAL_TOTAL);
  812. worker_register_job_custom_metric(WORKER_JOB_CUSTOM_METRIC_SENDER_RESETS, "sender resets", "resets/s", WORKER_METRIC_INCREMENTAL_TOTAL);
  813. worker_register_job_custom_metric(WORKER_JOB_CUSTOM_METRIC_WAITS, "waits", "waits/s", WORKER_METRIC_INCREMENTAL_TOTAL);
  814. // start from 100% completed
  815. worker_set_metric(WORKER_JOB_CUSTOM_METRIC_COMPLETION, 100.0);
  816. time_t latest_first_time_t = 0;
  817. long run_verification_countdown = LONG_MAX; // LONG_MAX to prevent an initial verification when no replication ever took place
  818. usec_t last_now_mono_ut = now_monotonic_usec();
  819. while(!netdata_exit) {
  820. // statistics
  821. usec_t now_mono_ut = now_monotonic_usec();
  822. if(unlikely(now_mono_ut - last_now_mono_ut > default_rrd_update_every * USEC_PER_SEC)) {
  823. last_now_mono_ut = now_mono_ut;
  824. if(!replication_globals.pending && run_verification_countdown-- == 0) {
  825. replication_globals.first_time_t = 0; // reset the statistics about completion percentage
  826. verify_all_hosts_charts_are_streaming_now();
  827. }
  828. worker_is_busy(WORKER_JOB_STATISTICS);
  829. if(latest_first_time_t && replication_globals.pending) {
  830. // completion percentage statistics
  831. time_t now = now_realtime_sec();
  832. time_t total = now - replication_globals.first_time_t;
  833. time_t done = latest_first_time_t - replication_globals.first_time_t;
  834. worker_set_metric(WORKER_JOB_CUSTOM_METRIC_COMPLETION,
  835. (NETDATA_DOUBLE) done * 100.0 / (NETDATA_DOUBLE) total);
  836. }
  837. else
  838. worker_set_metric(WORKER_JOB_CUSTOM_METRIC_COMPLETION, 100.0);
  839. worker_set_metric(WORKER_JOB_CUSTOM_METRIC_PENDING_REQUESTS, (NETDATA_DOUBLE)replication_globals.pending);
  840. worker_set_metric(WORKER_JOB_CUSTOM_METRIC_ADDED, (NETDATA_DOUBLE)replication_globals.added);
  841. worker_set_metric(WORKER_JOB_CUSTOM_METRIC_DONE, (NETDATA_DOUBLE)replication_globals.executed);
  842. worker_set_metric(WORKER_JOB_CUSTOM_METRIC_SKIPPED_NOT_CONNECTED, (NETDATA_DOUBLE)replication_globals.skipped_not_connected);
  843. worker_set_metric(WORKER_JOB_CUSTOM_METRIC_SKIPPED_NO_ROOM, (NETDATA_DOUBLE)replication_globals.skipped_no_room);
  844. worker_set_metric(WORKER_JOB_CUSTOM_METRIC_SENDER_RESETS, (NETDATA_DOUBLE)replication_globals.sender_resets);
  845. worker_set_metric(WORKER_JOB_CUSTOM_METRIC_WAITS, (NETDATA_DOUBLE)replication_globals.waits);
  846. }
  847. worker_is_busy(WORKER_JOB_FIND_NEXT);
  848. struct replication_request rq = replication_request_get_first_available();
  849. if(unlikely(!rq.found)) {
  850. // make it scan all the pending requests next time
  851. replication_globals.last_after = 0;
  852. replication_globals.last_unique_id = 0;
  853. replication_globals.waits++;
  854. worker_is_idle();
  855. sleep_usec(((replication_globals.pending) ? 10 : 1000) * USEC_PER_MS);
  856. continue;
  857. }
  858. run_verification_countdown = ITERATIONS_IDLE_WITHOUT_PENDING_TO_RUN_SENDER_VERIFICATION;
  859. // delete the request from the dictionary
  860. worker_is_busy(WORKER_JOB_DELETE_ENTRY);
  861. if(!dictionary_del(rq.sender->replication_requests, string2str(rq.chart_id)))
  862. error("REPLAY ERROR: 'host:%s/chart:%s' failed to be deleted from sender pending charts index",
  863. rrdhost_hostname(rq.sender->host), string2str(rq.chart_id));
  864. worker_is_busy(WORKER_JOB_FIND_CHART);
  865. RRDSET *st = rrdset_find(rq.sender->host, string2str(rq.chart_id));
  866. if(!st) {
  867. internal_error(true, "REPLAY ERROR: 'host:%s/chart:%s' not found",
  868. rrdhost_hostname(rq.sender->host), string2str(rq.chart_id));
  869. continue;
  870. }
  871. worker_is_busy(WORKER_JOB_QUERYING);
  872. latest_first_time_t = rq.after;
  873. if(rq.after < rq.sender->replication_first_time || !rq.sender->replication_first_time)
  874. rq.sender->replication_first_time = rq.after;
  875. if(rq.before < rq.sender->replication_current_time || !rq.sender->replication_current_time)
  876. rq.sender->replication_current_time = rq.before;
  877. netdata_thread_disable_cancelability();
  878. // send the replication data
  879. bool start_streaming = replicate_chart_response(
  880. st->rrdhost, st, rq.start_streaming, rq.after, rq.before);
  881. netdata_thread_enable_cancelability();
  882. replication_globals.executed++;
  883. if(start_streaming && rq.sender_last_flush_ut == rrdpush_sender_get_flush_time(rq.sender)) {
  884. worker_is_busy(WORKER_JOB_ACTIVATE_ENABLE_STREAMING);
  885. // enable normal streaming if we have to
  886. // but only if the sender buffer has not been flushed since we started
  887. if(rrdset_flag_check(st, RRDSET_FLAG_SENDER_REPLICATION_IN_PROGRESS)) {
  888. rrdset_flag_clear(st, RRDSET_FLAG_SENDER_REPLICATION_IN_PROGRESS);
  889. rrdset_flag_set(st, RRDSET_FLAG_SENDER_REPLICATION_FINISHED);
  890. rrdhost_sender_replicating_charts_minus_one(st->rrdhost);
  891. #ifdef NETDATA_LOG_REPLICATION_REQUESTS
  892. internal_error(true, "STREAM_SENDER REPLAY: 'host:%s/chart:%s' streaming starts",
  893. rrdhost_hostname(st->rrdhost), rrdset_id(st));
  894. #endif
  895. }
  896. else
  897. internal_error(true, "REPLAY ERROR: 'host:%s/chart:%s' received start streaming command, but the chart is not in progress replicating",
  898. rrdhost_hostname(st->rrdhost), string2str(rq.chart_id));
  899. }
  900. string_freez(rq.chart_id);
  901. }
  902. netdata_thread_cleanup_pop(1);
  903. return NULL;
  904. }