replication.c 81 KB

1234567891011121314151617181920212223242526272829303132333435363738394041424344454647484950515253545556575859606162636465666768697071727374757677787980818283848586878889909192939495969798991001011021031041051061071081091101111121131141151161171181191201211221231241251261271281291301311321331341351361371381391401411421431441451461471481491501511521531541551561571581591601611621631641651661671681691701711721731741751761771781791801811821831841851861871881891901911921931941951961971981992002012022032042052062072082092102112122132142152162172182192202212222232242252262272282292302312322332342352362372382392402412422432442452462472482492502512522532542552562572582592602612622632642652662672682692702712722732742752762772782792802812822832842852862872882892902912922932942952962972982993003013023033043053063073083093103113123133143153163173183193203213223233243253263273283293303313323333343353363373383393403413423433443453463473483493503513523533543553563573583593603613623633643653663673683693703713723733743753763773783793803813823833843853863873883893903913923933943953963973983994004014024034044054064074084094104114124134144154164174184194204214224234244254264274284294304314324334344354364374384394404414424434444454464474484494504514524534544554564574584594604614624634644654664674684694704714724734744754764774784794804814824834844854864874884894904914924934944954964974984995005015025035045055065075085095105115125135145155165175185195205215225235245255265275285295305315325335345355365375385395405415425435445455465475485495505515525535545555565575585595605615625635645655665675685695705715725735745755765775785795805815825835845855865875885895905915925935945955965975985996006016026036046056066076086096106116126136146156166176186196206216226236246256266276286296306316326336346356366376386396406416426436446456466476486496506516526536546556566576586596606616626636646656666676686696706716726736746756766776786796806816826836846856866876886896906916926936946956966976986997007017027037047057067077087097107117127137147157167177187197207217227237247257267277287297307317327337347357367377387397407417427437447457467477487497507517527537547557567577587597607617627637647657667677687697707717727737747757767777787797807817827837847857867877887897907917927937947957967977987998008018028038048058068078088098108118128138148158168178188198208218228238248258268278288298308318328338348358368378388398408418428438448458468478488498508518528538548558568578588598608618628638648658668678688698708718728738748758768778788798808818828838848858868878888898908918928938948958968978988999009019029039049059069079089099109119129139149159169179189199209219229239249259269279289299309319329339349359369379389399409419429439449459469479489499509519529539549559569579589599609619629639649659669679689699709719729739749759769779789799809819829839849859869879889899909919929939949959969979989991000100110021003100410051006100710081009101010111012101310141015101610171018101910201021102210231024102510261027102810291030103110321033103410351036103710381039104010411042104310441045104610471048104910501051105210531054105510561057105810591060106110621063106410651066106710681069107010711072107310741075107610771078107910801081108210831084108510861087108810891090109110921093109410951096109710981099110011011102110311041105110611071108110911101111111211131114111511161117111811191120112111221123112411251126112711281129113011311132113311341135113611371138113911401141114211431144114511461147114811491150115111521153115411551156115711581159116011611162116311641165116611671168116911701171117211731174117511761177117811791180118111821183118411851186118711881189119011911192119311941195119611971198119912001201120212031204120512061207120812091210121112121213121412151216121712181219122012211222122312241225122612271228122912301231123212331234123512361237123812391240124112421243124412451246124712481249125012511252125312541255125612571258125912601261126212631264126512661267126812691270127112721273127412751276127712781279128012811282128312841285128612871288128912901291129212931294129512961297129812991300130113021303130413051306130713081309131013111312131313141315131613171318131913201321132213231324132513261327132813291330133113321333133413351336133713381339134013411342134313441345134613471348134913501351135213531354135513561357135813591360136113621363136413651366136713681369137013711372137313741375137613771378137913801381138213831384138513861387138813891390139113921393139413951396139713981399140014011402140314041405140614071408140914101411141214131414141514161417141814191420142114221423142414251426142714281429143014311432143314341435143614371438143914401441144214431444144514461447144814491450145114521453145414551456145714581459146014611462146314641465146614671468146914701471147214731474147514761477147814791480148114821483148414851486148714881489149014911492149314941495149614971498149915001501150215031504150515061507150815091510151115121513151415151516151715181519152015211522152315241525152615271528152915301531153215331534153515361537153815391540154115421543154415451546154715481549155015511552155315541555155615571558155915601561156215631564156515661567156815691570157115721573157415751576157715781579158015811582158315841585158615871588158915901591159215931594159515961597159815991600160116021603160416051606160716081609161016111612161316141615161616171618161916201621162216231624162516261627162816291630163116321633163416351636163716381639164016411642164316441645164616471648164916501651165216531654165516561657165816591660166116621663166416651666166716681669167016711672167316741675167616771678167916801681168216831684168516861687168816891690169116921693169416951696169716981699170017011702170317041705170617071708170917101711171217131714171517161717171817191720172117221723172417251726172717281729173017311732173317341735173617371738173917401741174217431744174517461747174817491750175117521753175417551756175717581759176017611762176317641765176617671768176917701771177217731774177517761777177817791780178117821783178417851786178717881789179017911792179317941795179617971798179918001801180218031804180518061807180818091810181118121813181418151816181718181819182018211822182318241825182618271828182918301831183218331834183518361837183818391840184118421843184418451846184718481849185018511852185318541855185618571858185918601861186218631864186518661867186818691870187118721873187418751876187718781879188018811882188318841885188618871888188918901891189218931894189518961897189818991900190119021903190419051906190719081909191019111912191319141915191619171918191919201921192219231924192519261927192819291930193119321933193419351936193719381939194019411942194319441945194619471948194919501951195219531954195519561957195819591960196119621963196419651966196719681969197019711972197319741975197619771978197919801981198219831984198519861987198819891990199119921993199419951996199719981999200020012002200320042005200620072008200920102011201220132014201520162017201820192020202120222023202420252026202720282029203020312032203320342035
  1. // SPDX-License-Identifier: GPL-3.0-or-later
  2. #include "replication.h"
  3. #include "Judy.h"
  4. #define STREAMING_START_MAX_SENDER_BUFFER_PERCENTAGE_ALLOWED 50ULL
  5. #define MAX_REPLICATION_MESSAGE_PERCENT_SENDER_BUFFER 25ULL
  6. #define MAX_SENDER_BUFFER_PERCENTAGE_ALLOWED 50ULL
  7. #define MIN_SENDER_BUFFER_PERCENTAGE_ALLOWED 10ULL
  8. #define WORKER_JOB_FIND_NEXT 1
  9. #define WORKER_JOB_QUERYING 2
  10. #define WORKER_JOB_DELETE_ENTRY 3
  11. #define WORKER_JOB_FIND_CHART 4
  12. #define WORKER_JOB_PREPARE_QUERY 5
  13. #define WORKER_JOB_CHECK_CONSISTENCY 6
  14. #define WORKER_JOB_BUFFER_COMMIT 7
  15. #define WORKER_JOB_CLEANUP 8
  16. #define WORKER_JOB_WAIT 9
  17. // master thread worker jobs
  18. #define WORKER_JOB_STATISTICS 10
  19. #define WORKER_JOB_CUSTOM_METRIC_PENDING_REQUESTS 11
  20. #define WORKER_JOB_CUSTOM_METRIC_SKIPPED_NO_ROOM 12
  21. #define WORKER_JOB_CUSTOM_METRIC_COMPLETION 13
  22. #define WORKER_JOB_CUSTOM_METRIC_ADDED 14
  23. #define WORKER_JOB_CUSTOM_METRIC_DONE 15
  24. #define WORKER_JOB_CUSTOM_METRIC_SENDER_RESETS 16
  25. #define WORKER_JOB_CUSTOM_METRIC_SENDER_FULL 17
  26. #define ITERATIONS_IDLE_WITHOUT_PENDING_TO_RUN_SENDER_VERIFICATION 30
  27. #define SECONDS_TO_RESET_POINT_IN_TIME 10
  28. static struct replication_query_statistics replication_queries = {
  29. .spinlock = NETDATA_SPINLOCK_INITIALIZER,
  30. .queries_started = 0,
  31. .queries_finished = 0,
  32. .points_read = 0,
  33. .points_generated = 0,
  34. };
  35. struct replication_query_statistics replication_get_query_statistics(void) {
  36. spinlock_lock(&replication_queries.spinlock);
  37. struct replication_query_statistics ret = replication_queries;
  38. spinlock_unlock(&replication_queries.spinlock);
  39. return ret;
  40. }
  41. size_t replication_buffers_allocated = 0;
  42. size_t replication_allocated_buffers(void) {
  43. return __atomic_load_n(&replication_buffers_allocated, __ATOMIC_RELAXED);
  44. }
  45. // ----------------------------------------------------------------------------
  46. // sending replication replies
  47. struct replication_dimension {
  48. STORAGE_POINT sp;
  49. struct storage_engine_query_handle handle;
  50. bool enabled;
  51. bool skip;
  52. DICTIONARY *dict;
  53. const DICTIONARY_ITEM *rda;
  54. RRDDIM *rd;
  55. };
  56. struct replication_query {
  57. RRDSET *st;
  58. struct {
  59. time_t first_entry_t;
  60. time_t last_entry_t;
  61. } db;
  62. struct { // what the parent requested
  63. time_t after;
  64. time_t before;
  65. bool enable_streaming;
  66. } request;
  67. struct { // what the child will do
  68. time_t after;
  69. time_t before;
  70. bool enable_streaming;
  71. bool locked_data_collection;
  72. bool execute;
  73. bool interrupted;
  74. STREAM_CAPABILITIES capabilities;
  75. } query;
  76. time_t wall_clock_time;
  77. size_t points_read;
  78. size_t points_generated;
  79. STORAGE_ENGINE_BACKEND backend;
  80. struct replication_request *rq;
  81. size_t dimensions;
  82. struct replication_dimension data[];
  83. };
  84. static struct replication_query *replication_query_prepare(
  85. RRDSET *st,
  86. time_t db_first_entry,
  87. time_t db_last_entry,
  88. time_t requested_after,
  89. time_t requested_before,
  90. bool requested_enable_streaming,
  91. time_t query_after,
  92. time_t query_before,
  93. bool query_enable_streaming,
  94. time_t wall_clock_time,
  95. STREAM_CAPABILITIES capabilities
  96. ) {
  97. size_t dimensions = rrdset_number_of_dimensions(st);
  98. struct replication_query *q = callocz(1, sizeof(struct replication_query) + dimensions * sizeof(struct replication_dimension));
  99. __atomic_add_fetch(&replication_buffers_allocated, sizeof(struct replication_query) + dimensions * sizeof(struct replication_dimension), __ATOMIC_RELAXED);
  100. q->dimensions = dimensions;
  101. q->st = st;
  102. q->db.first_entry_t = db_first_entry;
  103. q->db.last_entry_t = db_last_entry;
  104. q->request.after = requested_after,
  105. q->request.before = requested_before,
  106. q->request.enable_streaming = requested_enable_streaming,
  107. q->query.after = query_after;
  108. q->query.before = query_before;
  109. q->query.enable_streaming = query_enable_streaming;
  110. q->query.capabilities = capabilities;
  111. q->wall_clock_time = wall_clock_time;
  112. if (!q->dimensions || !q->query.after || !q->query.before) {
  113. q->query.execute = false;
  114. q->dimensions = 0;
  115. return q;
  116. }
  117. if(q->query.enable_streaming) {
  118. spinlock_lock(&st->data_collection_lock);
  119. q->query.locked_data_collection = true;
  120. if (st->last_updated.tv_sec > q->query.before) {
  121. #ifdef NETDATA_LOG_REPLICATION_REQUESTS
  122. internal_error(true,
  123. "STREAM_SENDER REPLAY: 'host:%s/chart:%s' "
  124. "has start_streaming = true, "
  125. "adjusting replication before timestamp from %llu to %llu",
  126. rrdhost_hostname(st->rrdhost), rrdset_id(st),
  127. (unsigned long long) q->query.before,
  128. (unsigned long long) st->last_updated.tv_sec
  129. );
  130. #endif
  131. q->query.before = MIN(st->last_updated.tv_sec, wall_clock_time);
  132. }
  133. }
  134. q->backend = st->rrdhost->db[0].eng->seb;
  135. // prepare our array of dimensions
  136. size_t count = 0;
  137. RRDDIM *rd;
  138. rrddim_foreach_read(rd, st) {
  139. if (unlikely(!rd || !rd_dfe.item || !rrddim_check_upstream_exposed(rd)))
  140. continue;
  141. if (unlikely(rd_dfe.counter >= q->dimensions)) {
  142. internal_error(true,
  143. "STREAM_SENDER REPLAY ERROR: 'host:%s/chart:%s' has more dimensions than the replicated ones",
  144. rrdhost_hostname(st->rrdhost), rrdset_id(st));
  145. break;
  146. }
  147. struct replication_dimension *d = &q->data[rd_dfe.counter];
  148. d->dict = rd_dfe.dict;
  149. d->rda = dictionary_acquired_item_dup(rd_dfe.dict, rd_dfe.item);
  150. d->rd = rd;
  151. storage_engine_query_init(q->backend, rd->tiers[0].smh, &d->handle, q->query.after, q->query.before,
  152. q->query.locked_data_collection ? STORAGE_PRIORITY_HIGH : STORAGE_PRIORITY_LOW);
  153. d->enabled = true;
  154. d->skip = false;
  155. count++;
  156. }
  157. rrddim_foreach_done(rd);
  158. if(!count) {
  159. // no data for this chart
  160. q->query.execute = false;
  161. if(q->query.locked_data_collection) {
  162. spinlock_unlock(&st->data_collection_lock);
  163. q->query.locked_data_collection = false;
  164. }
  165. }
  166. else {
  167. // we have data for this chart
  168. q->query.execute = true;
  169. }
  170. return q;
  171. }
  172. static void replication_send_chart_collection_state(BUFFER *wb, RRDSET *st, STREAM_CAPABILITIES capabilities) {
  173. bool with_slots = (capabilities & STREAM_CAP_SLOTS) ? true : false;
  174. NUMBER_ENCODING integer_encoding = (capabilities & STREAM_CAP_IEEE754) ? NUMBER_ENCODING_BASE64 : NUMBER_ENCODING_DECIMAL;
  175. RRDDIM *rd;
  176. rrddim_foreach_read(rd, st){
  177. if (!rrddim_check_upstream_exposed(rd)) continue;
  178. buffer_fast_strcat(wb, PLUGINSD_KEYWORD_REPLAY_RRDDIM_STATE, sizeof(PLUGINSD_KEYWORD_REPLAY_RRDDIM_STATE) - 1);
  179. if(with_slots) {
  180. buffer_fast_strcat(wb, " "PLUGINSD_KEYWORD_SLOT":", sizeof(PLUGINSD_KEYWORD_SLOT) - 1 + 2);
  181. buffer_print_uint64_encoded(wb, integer_encoding, rd->rrdpush.sender.dim_slot);
  182. }
  183. buffer_fast_strcat(wb, " '", 2);
  184. buffer_fast_strcat(wb, rrddim_id(rd), string_strlen(rd->id));
  185. buffer_fast_strcat(wb, "' ", 2);
  186. buffer_print_uint64_encoded(wb, integer_encoding, (usec_t) rd->collector.last_collected_time.tv_sec * USEC_PER_SEC +
  187. (usec_t) rd->collector.last_collected_time.tv_usec);
  188. buffer_fast_strcat(wb, " ", 1);
  189. buffer_print_int64_encoded(wb, integer_encoding, rd->collector.last_collected_value);
  190. buffer_fast_strcat(wb, " ", 1);
  191. buffer_print_netdata_double_encoded(wb, integer_encoding, rd->collector.last_calculated_value);
  192. buffer_fast_strcat(wb, " ", 1);
  193. buffer_print_netdata_double_encoded(wb, integer_encoding, rd->collector.last_stored_value);
  194. buffer_fast_strcat(wb, "\n", 1);
  195. }
  196. rrddim_foreach_done(rd);
  197. buffer_fast_strcat(wb, PLUGINSD_KEYWORD_REPLAY_RRDSET_STATE " ", sizeof(PLUGINSD_KEYWORD_REPLAY_RRDSET_STATE) - 1 + 1);
  198. buffer_print_uint64_encoded(wb, integer_encoding, (usec_t) st->last_collected_time.tv_sec * USEC_PER_SEC + (usec_t) st->last_collected_time.tv_usec);
  199. buffer_fast_strcat(wb, " ", 1);
  200. buffer_print_uint64_encoded(wb, integer_encoding, (usec_t) st->last_updated.tv_sec * USEC_PER_SEC + (usec_t) st->last_updated.tv_usec);
  201. buffer_fast_strcat(wb, "\n", 1);
  202. }
  203. static void replication_query_finalize(BUFFER *wb, struct replication_query *q, bool executed) {
  204. size_t dimensions = q->dimensions;
  205. if(wb && q->query.enable_streaming)
  206. replication_send_chart_collection_state(wb, q->st, q->query.capabilities);
  207. if(q->query.locked_data_collection) {
  208. spinlock_unlock(&q->st->data_collection_lock);
  209. q->query.locked_data_collection = false;
  210. }
  211. // release all the dictionary items acquired
  212. // finalize the queries
  213. size_t queries = 0;
  214. for (size_t i = 0; i < dimensions; i++) {
  215. struct replication_dimension *d = &q->data[i];
  216. if (unlikely(!d->enabled)) continue;
  217. storage_engine_query_finalize(&d->handle);
  218. dictionary_acquired_item_release(d->dict, d->rda);
  219. // update global statistics
  220. queries++;
  221. }
  222. if(executed) {
  223. spinlock_lock(&replication_queries.spinlock);
  224. replication_queries.queries_started += queries;
  225. replication_queries.queries_finished += queries;
  226. replication_queries.points_read += q->points_read;
  227. replication_queries.points_generated += q->points_generated;
  228. if(q->st && q->st->rrdhost->sender) {
  229. struct sender_state *s = q->st->rrdhost->sender;
  230. s->replication.latest_completed_before_t = q->query.before;
  231. }
  232. spinlock_unlock(&replication_queries.spinlock);
  233. }
  234. __atomic_sub_fetch(&replication_buffers_allocated, sizeof(struct replication_query) + dimensions * sizeof(struct replication_dimension), __ATOMIC_RELAXED);
  235. freez(q);
  236. }
  237. static void replication_query_align_to_optimal_before(struct replication_query *q) {
  238. if(!q->query.execute || q->query.enable_streaming)
  239. return;
  240. size_t dimensions = q->dimensions;
  241. time_t expanded_before = 0;
  242. for (size_t i = 0; i < dimensions; i++) {
  243. struct replication_dimension *d = &q->data[i];
  244. if(unlikely(!d->enabled)) continue;
  245. time_t new_before = storage_engine_align_to_optimal_before(&d->handle);
  246. if (!expanded_before || new_before < expanded_before)
  247. expanded_before = new_before;
  248. }
  249. if(expanded_before > q->query.before && // it is later than the original
  250. (expanded_before - q->query.before) / q->st->update_every < 1024 && // it is reasonable (up to a page)
  251. expanded_before < q->st->last_updated.tv_sec && // it is not the chart's last updated time
  252. expanded_before < q->wall_clock_time) // it is not later than the wall clock time
  253. q->query.before = expanded_before;
  254. }
  255. static bool replication_query_execute(BUFFER *wb, struct replication_query *q, size_t max_msg_size) {
  256. replication_query_align_to_optimal_before(q);
  257. bool with_slots = (q->query.capabilities & STREAM_CAP_SLOTS) ? true : false;
  258. NUMBER_ENCODING integer_encoding = (q->query.capabilities & STREAM_CAP_IEEE754) ? NUMBER_ENCODING_BASE64 : NUMBER_ENCODING_DECIMAL;
  259. time_t after = q->query.after;
  260. time_t before = q->query.before;
  261. size_t dimensions = q->dimensions;
  262. time_t wall_clock_time = q->wall_clock_time;
  263. bool finished_with_gap = false;
  264. size_t points_read = 0, points_generated = 0;
  265. #ifdef NETDATA_LOG_REPLICATION_REQUESTS
  266. time_t actual_after = 0, actual_before = 0;
  267. #endif
  268. time_t now = after + 1;
  269. time_t last_end_time_in_buffer = 0;
  270. while(now <= before) {
  271. time_t min_start_time = 0, max_start_time = 0, min_end_time = 0, max_end_time = 0, min_update_every = 0, max_update_every = 0;
  272. for (size_t i = 0; i < dimensions ;i++) {
  273. struct replication_dimension *d = &q->data[i];
  274. if(unlikely(!d->enabled || d->skip)) continue;
  275. // fetch the first valid point for the dimension
  276. int max_skip = 1000;
  277. while(d->sp.end_time_s < now && !storage_engine_query_is_finished(&d->handle) && max_skip-- >= 0) {
  278. d->sp = storage_engine_query_next_metric(&d->handle);
  279. points_read++;
  280. }
  281. if(max_skip <= 0) {
  282. d->skip = true;
  283. nd_log_limit_static_global_var(erl, 1, 0);
  284. nd_log_limit(&erl, NDLS_DAEMON, NDLP_ERR,
  285. "STREAM_SENDER REPLAY ERROR: 'host:%s/chart:%s/dim:%s': db does not advance the query "
  286. "beyond time %llu (tried 1000 times to get the next point and always got back a point in the past)",
  287. rrdhost_hostname(q->st->rrdhost), rrdset_id(q->st), rrddim_id(d->rd),
  288. (unsigned long long) now);
  289. continue;
  290. }
  291. if(unlikely(d->sp.end_time_s < now || d->sp.end_time_s < d->sp.start_time_s))
  292. // this dimension does not provide any data
  293. continue;
  294. time_t update_every = d->sp.end_time_s - d->sp.start_time_s;
  295. if(unlikely(!update_every))
  296. update_every = q->st->update_every;
  297. if(unlikely(!min_update_every))
  298. min_update_every = update_every;
  299. if(unlikely(!min_start_time))
  300. min_start_time = d->sp.start_time_s;
  301. if(unlikely(!min_end_time))
  302. min_end_time = d->sp.end_time_s;
  303. min_update_every = MIN(min_update_every, update_every);
  304. max_update_every = MAX(max_update_every, update_every);
  305. min_start_time = MIN(min_start_time, d->sp.start_time_s);
  306. max_start_time = MAX(max_start_time, d->sp.start_time_s);
  307. min_end_time = MIN(min_end_time, d->sp.end_time_s);
  308. max_end_time = MAX(max_end_time, d->sp.end_time_s);
  309. }
  310. if (unlikely(min_update_every != max_update_every ||
  311. min_start_time != max_start_time)) {
  312. time_t fix_min_start_time;
  313. if(last_end_time_in_buffer &&
  314. last_end_time_in_buffer >= min_start_time &&
  315. last_end_time_in_buffer <= max_start_time) {
  316. fix_min_start_time = last_end_time_in_buffer;
  317. }
  318. else
  319. fix_min_start_time = min_end_time - min_update_every;
  320. #ifdef NETDATA_INTERNAL_CHECKS
  321. nd_log_limit_static_global_var(erl, 1, 0);
  322. nd_log_limit(&erl, NDLS_DAEMON, NDLP_WARNING,
  323. "REPLAY WARNING: 'host:%s/chart:%s' "
  324. "misaligned dimensions, "
  325. "update every (min: %ld, max: %ld), "
  326. "start time (min: %ld, max: %ld), "
  327. "end time (min %ld, max %ld), "
  328. "now %ld, last end time sent %ld, "
  329. "min start time is fixed to %ld",
  330. rrdhost_hostname(q->st->rrdhost), rrdset_id(q->st),
  331. min_update_every, max_update_every,
  332. min_start_time, max_start_time,
  333. min_end_time, max_end_time,
  334. now, last_end_time_in_buffer,
  335. fix_min_start_time
  336. );
  337. #endif
  338. min_start_time = fix_min_start_time;
  339. }
  340. if(likely(min_start_time <= now && min_end_time >= now)) {
  341. // we have a valid point
  342. if (unlikely(min_end_time == min_start_time))
  343. min_start_time = min_end_time - q->st->update_every;
  344. #ifdef NETDATA_LOG_REPLICATION_REQUESTS
  345. if (unlikely(!actual_after))
  346. actual_after = min_end_time;
  347. actual_before = min_end_time;
  348. #endif
  349. if(buffer_strlen(wb) > max_msg_size && last_end_time_in_buffer) {
  350. q->query.before = last_end_time_in_buffer;
  351. q->query.enable_streaming = false;
  352. internal_error(true, "REPLICATION: current buffer size %zu is more than the "
  353. "max message size %zu for chart '%s' of host '%s'. "
  354. "Interrupting replication request (%ld to %ld, %s) at %ld to %ld, %s.",
  355. buffer_strlen(wb), max_msg_size, rrdset_id(q->st), rrdhost_hostname(q->st->rrdhost),
  356. q->request.after, q->request.before, q->request.enable_streaming?"true":"false",
  357. q->query.after, q->query.before, q->query.enable_streaming?"true":"false");
  358. q->query.interrupted = true;
  359. break;
  360. }
  361. last_end_time_in_buffer = min_end_time;
  362. buffer_fast_strcat(wb, PLUGINSD_KEYWORD_REPLAY_BEGIN, sizeof(PLUGINSD_KEYWORD_REPLAY_BEGIN) - 1);
  363. if(with_slots) {
  364. buffer_fast_strcat(wb, " "PLUGINSD_KEYWORD_SLOT":", sizeof(PLUGINSD_KEYWORD_SLOT) - 1 + 2);
  365. buffer_print_uint64_encoded(wb, integer_encoding, q->st->rrdpush.sender.chart_slot);
  366. }
  367. buffer_fast_strcat(wb, " '' ", 4);
  368. buffer_print_uint64_encoded(wb, integer_encoding, min_start_time);
  369. buffer_fast_strcat(wb, " ", 1);
  370. buffer_print_uint64_encoded(wb, integer_encoding, min_end_time);
  371. buffer_fast_strcat(wb, " ", 1);
  372. buffer_print_uint64_encoded(wb, integer_encoding, wall_clock_time);
  373. buffer_fast_strcat(wb, "\n", 1);
  374. // output the replay values for this time
  375. for (size_t i = 0; i < dimensions; i++) {
  376. struct replication_dimension *d = &q->data[i];
  377. if (unlikely(!d->enabled)) continue;
  378. if (likely( d->sp.start_time_s <= min_end_time &&
  379. d->sp.end_time_s >= min_end_time &&
  380. !storage_point_is_unset(d->sp) &&
  381. !storage_point_is_gap(d->sp))) {
  382. buffer_fast_strcat(wb, PLUGINSD_KEYWORD_REPLAY_SET, sizeof(PLUGINSD_KEYWORD_REPLAY_SET) - 1);
  383. if(with_slots) {
  384. buffer_fast_strcat(wb, " "PLUGINSD_KEYWORD_SLOT":", sizeof(PLUGINSD_KEYWORD_SLOT) - 1 + 2);
  385. buffer_print_uint64_encoded(wb, integer_encoding, d->rd->rrdpush.sender.dim_slot);
  386. }
  387. buffer_fast_strcat(wb, " \"", 2);
  388. buffer_fast_strcat(wb, rrddim_id(d->rd), string_strlen(d->rd->id));
  389. buffer_fast_strcat(wb, "\" ", 2);
  390. buffer_print_netdata_double_encoded(wb, integer_encoding, d->sp.sum);
  391. buffer_fast_strcat(wb, " ", 1);
  392. buffer_print_sn_flags(wb, d->sp.flags, q->query.capabilities & STREAM_CAP_INTERPOLATED);
  393. buffer_fast_strcat(wb, "\n", 1);
  394. points_generated++;
  395. }
  396. }
  397. now = min_end_time + 1;
  398. }
  399. else if(unlikely(min_end_time < now))
  400. // the query does not progress
  401. break;
  402. else {
  403. // we have gap - all points are in the future
  404. now = min_start_time;
  405. if(min_start_time > before && !points_generated) {
  406. before = q->query.before = min_start_time - 1;
  407. finished_with_gap = true;
  408. break;
  409. }
  410. }
  411. }
  412. #ifdef NETDATA_LOG_REPLICATION_REQUESTS
  413. if(actual_after) {
  414. char actual_after_buf[LOG_DATE_LENGTH + 1], actual_before_buf[LOG_DATE_LENGTH + 1];
  415. log_date(actual_after_buf, LOG_DATE_LENGTH, actual_after);
  416. log_date(actual_before_buf, LOG_DATE_LENGTH, actual_before);
  417. internal_error(true,
  418. "STREAM_SENDER REPLAY: 'host:%s/chart:%s': sending data %llu [%s] to %llu [%s] (requested %llu [delta %lld] to %llu [delta %lld])",
  419. rrdhost_hostname(q->st->rrdhost), rrdset_id(q->st),
  420. (unsigned long long)actual_after, actual_after_buf, (unsigned long long)actual_before, actual_before_buf,
  421. (unsigned long long)after, (long long)(actual_after - after), (unsigned long long)before, (long long)(actual_before - before));
  422. }
  423. else
  424. internal_error(true,
  425. "STREAM_SENDER REPLAY: 'host:%s/chart:%s': nothing to send (requested %llu to %llu)",
  426. rrdhost_hostname(q->st->rrdhost), rrdset_id(q->st),
  427. (unsigned long long)after, (unsigned long long)before);
  428. #endif // NETDATA_LOG_REPLICATION_REQUESTS
  429. q->points_read += points_read;
  430. q->points_generated += points_generated;
  431. if(last_end_time_in_buffer < before - q->st->update_every)
  432. finished_with_gap = true;
  433. return finished_with_gap;
  434. }
  435. static struct replication_query *replication_response_prepare(
  436. RRDSET *st,
  437. bool requested_enable_streaming,
  438. time_t requested_after,
  439. time_t requested_before,
  440. STREAM_CAPABILITIES capabilities
  441. ) {
  442. time_t wall_clock_time = now_realtime_sec();
  443. if(requested_after > requested_before) {
  444. // flip them
  445. time_t t = requested_before;
  446. requested_before = requested_after;
  447. requested_after = t;
  448. }
  449. if(requested_after > wall_clock_time) {
  450. requested_after = 0;
  451. requested_before = 0;
  452. requested_enable_streaming = true;
  453. }
  454. if(requested_before > wall_clock_time) {
  455. requested_before = wall_clock_time;
  456. requested_enable_streaming = true;
  457. }
  458. time_t query_after = requested_after;
  459. time_t query_before = requested_before;
  460. bool query_enable_streaming = requested_enable_streaming;
  461. time_t db_first_entry = 0, db_last_entry = 0;
  462. rrdset_get_retention_of_tier_for_collected_chart(
  463. st, &db_first_entry, &db_last_entry, wall_clock_time, 0);
  464. if(requested_after == 0 && requested_before == 0 && requested_enable_streaming == true) {
  465. // no data requested - just enable streaming
  466. ;
  467. }
  468. else {
  469. if (query_after < db_first_entry)
  470. query_after = db_first_entry;
  471. if (query_before > db_last_entry)
  472. query_before = db_last_entry;
  473. // if the parent asked us to start streaming, then fill the rest with the data that we have
  474. if (requested_enable_streaming)
  475. query_before = db_last_entry;
  476. if (query_after > query_before) {
  477. time_t tmp = query_before;
  478. query_before = query_after;
  479. query_after = tmp;
  480. }
  481. query_enable_streaming = (requested_enable_streaming ||
  482. query_before == db_last_entry ||
  483. !requested_after ||
  484. !requested_before) ? true : false;
  485. }
  486. return replication_query_prepare(
  487. st,
  488. db_first_entry, db_last_entry,
  489. requested_after, requested_before, requested_enable_streaming,
  490. query_after, query_before, query_enable_streaming,
  491. wall_clock_time, capabilities);
  492. }
  493. void replication_response_cancel_and_finalize(struct replication_query *q) {
  494. replication_query_finalize(NULL, q, false);
  495. }
  496. static bool sender_is_still_connected_for_this_request(struct replication_request *rq);
  497. bool replication_response_execute_and_finalize(struct replication_query *q, size_t max_msg_size) {
  498. bool with_slots = (q->query.capabilities & STREAM_CAP_SLOTS) ? true : false;
  499. NUMBER_ENCODING integer_encoding = (q->query.capabilities & STREAM_CAP_IEEE754) ? NUMBER_ENCODING_BASE64 : NUMBER_ENCODING_DECIMAL;
  500. struct replication_request *rq = q->rq;
  501. RRDSET *st = q->st;
  502. RRDHOST *host = st->rrdhost;
  503. // we might want to optimize this by filling a temporary buffer
  504. // and copying the result to the host's buffer in order to avoid
  505. // holding the host's buffer lock for too long
  506. BUFFER *wb = sender_start(host->sender);
  507. buffer_fast_strcat(wb, PLUGINSD_KEYWORD_REPLAY_BEGIN, sizeof(PLUGINSD_KEYWORD_REPLAY_BEGIN) - 1);
  508. if(with_slots) {
  509. buffer_fast_strcat(wb, " "PLUGINSD_KEYWORD_SLOT":", sizeof(PLUGINSD_KEYWORD_SLOT) - 1 + 2);
  510. buffer_print_uint64_encoded(wb, integer_encoding, q->st->rrdpush.sender.chart_slot);
  511. }
  512. buffer_fast_strcat(wb, " '", 2);
  513. buffer_fast_strcat(wb, rrdset_id(st), string_strlen(st->id));
  514. buffer_fast_strcat(wb, "'\n", 2);
  515. bool locked_data_collection = q->query.locked_data_collection;
  516. q->query.locked_data_collection = false;
  517. bool finished_with_gap = false;
  518. if(q->query.execute)
  519. finished_with_gap = replication_query_execute(wb, q, max_msg_size);
  520. time_t after = q->request.after;
  521. time_t before = q->query.before;
  522. bool enable_streaming = q->query.enable_streaming;
  523. replication_query_finalize(wb, q, q->query.execute);
  524. q = NULL; // IMPORTANT: q is invalid now
  525. // get a fresh retention to send to the parent
  526. time_t wall_clock_time = now_realtime_sec();
  527. time_t db_first_entry, db_last_entry;
  528. rrdset_get_retention_of_tier_for_collected_chart(st, &db_first_entry, &db_last_entry, wall_clock_time, 0);
  529. // end with first/last entries we have, and the first start time and
  530. // last end time of the data we sent
  531. buffer_fast_strcat(wb, PLUGINSD_KEYWORD_REPLAY_END " ", sizeof(PLUGINSD_KEYWORD_REPLAY_END) - 1 + 1);
  532. buffer_print_int64_encoded(wb, integer_encoding, st->update_every);
  533. buffer_fast_strcat(wb, " ", 1);
  534. buffer_print_uint64_encoded(wb, integer_encoding, db_first_entry);
  535. buffer_fast_strcat(wb, " ", 1);
  536. buffer_print_uint64_encoded(wb, integer_encoding, db_last_entry);
  537. buffer_fast_strcat(wb, enable_streaming ? " true " : " false ", 7);
  538. buffer_print_uint64_encoded(wb, integer_encoding, after);
  539. buffer_fast_strcat(wb, " ", 1);
  540. buffer_print_uint64_encoded(wb, integer_encoding, before);
  541. buffer_fast_strcat(wb, " ", 1);
  542. buffer_print_uint64_encoded(wb, integer_encoding, wall_clock_time);
  543. buffer_fast_strcat(wb, "\n", 1);
  544. worker_is_busy(WORKER_JOB_BUFFER_COMMIT);
  545. sender_commit(host->sender, wb, STREAM_TRAFFIC_TYPE_REPLICATION);
  546. worker_is_busy(WORKER_JOB_CLEANUP);
  547. if(enable_streaming) {
  548. if(sender_is_still_connected_for_this_request(rq)) {
  549. // enable normal streaming if we have to
  550. // but only if the sender buffer has not been flushed since we started
  551. if(rrdset_flag_check(st, RRDSET_FLAG_SENDER_REPLICATION_IN_PROGRESS)) {
  552. rrdset_flag_clear(st, RRDSET_FLAG_SENDER_REPLICATION_IN_PROGRESS);
  553. rrdset_flag_set(st, RRDSET_FLAG_SENDER_REPLICATION_FINISHED);
  554. rrdhost_sender_replicating_charts_minus_one(st->rrdhost);
  555. if(!finished_with_gap)
  556. st->rrdpush.sender.resync_time_s = 0;
  557. #ifdef NETDATA_LOG_REPLICATION_REQUESTS
  558. internal_error(true, "STREAM_SENDER REPLAY: 'host:%s/chart:%s' streaming starts",
  559. rrdhost_hostname(st->rrdhost), rrdset_id(st));
  560. #endif
  561. }
  562. else
  563. internal_error(true, "REPLAY ERROR: 'host:%s/chart:%s' received start streaming command, but the chart is not in progress replicating",
  564. rrdhost_hostname(st->rrdhost), rrdset_id(st));
  565. }
  566. }
  567. if(locked_data_collection)
  568. spinlock_unlock(&st->data_collection_lock);
  569. return enable_streaming;
  570. }
  571. // ----------------------------------------------------------------------------
  572. // sending replication requests
  573. struct replication_request_details {
  574. struct {
  575. send_command callback;
  576. void *data;
  577. } caller;
  578. RRDHOST *host;
  579. RRDSET *st;
  580. struct {
  581. time_t first_entry_t; // the first entry time the child has
  582. time_t last_entry_t; // the last entry time the child has
  583. time_t wall_clock_time; // the current time of the child
  584. bool fixed_last_entry; // when set we set the last entry to wall clock time
  585. } child_db;
  586. struct {
  587. time_t first_entry_t; // the first entry time we have
  588. time_t last_entry_t; // the last entry time we have
  589. time_t wall_clock_time; // the current local world clock time
  590. } local_db;
  591. struct {
  592. time_t from; // the starting time of the entire gap we have
  593. time_t to; // the ending time of the entire gap we have
  594. } gap;
  595. struct {
  596. time_t after; // the start time we requested previously from this child
  597. time_t before; // the end time we requested previously from this child
  598. } last_request;
  599. struct {
  600. time_t after; // the start time of this replication request - the child will add 1 second
  601. time_t before; // the end time of this replication request
  602. bool start_streaming; // true when we want the child to send anything remaining and start streaming - the child will overwrite 'before'
  603. } wanted;
  604. };
  605. static void replicate_log_request(struct replication_request_details *r, const char *msg) {
  606. #ifdef NETDATA_INTERNAL_CHECKS
  607. internal_error(true,
  608. #else
  609. nd_log_limit_static_global_var(erl, 1, 0);
  610. nd_log_limit(&erl, NDLS_DAEMON, NDLP_ERR,
  611. #endif
  612. "REPLAY ERROR: 'host:%s/chart:%s' child sent: "
  613. "db from %ld to %ld%s, wall clock time %ld, "
  614. "last request from %ld to %ld, "
  615. "issue: %s - "
  616. "sending replication request from %ld to %ld, start streaming %s",
  617. rrdhost_hostname(r->st->rrdhost), rrdset_id(r->st),
  618. r->child_db.first_entry_t,
  619. r->child_db.last_entry_t, r->child_db.fixed_last_entry ? " (fixed)" : "",
  620. r->child_db.wall_clock_time,
  621. r->last_request.after,
  622. r->last_request.before,
  623. msg,
  624. r->wanted.after,
  625. r->wanted.before,
  626. r->wanted.start_streaming ? "true" : "false");
  627. }
  628. static bool send_replay_chart_cmd(struct replication_request_details *r, const char *msg, bool log) {
  629. RRDSET *st = r->st;
  630. if(log)
  631. replicate_log_request(r, msg);
  632. if(st->rrdhost->receiver && (!st->rrdhost->receiver->replication_first_time_t || r->wanted.after < st->rrdhost->receiver->replication_first_time_t))
  633. st->rrdhost->receiver->replication_first_time_t = r->wanted.after;
  634. #ifdef NETDATA_LOG_REPLICATION_REQUESTS
  635. st->replay.log_next_data_collection = true;
  636. char wanted_after_buf[LOG_DATE_LENGTH + 1] = "", wanted_before_buf[LOG_DATE_LENGTH + 1] = "";
  637. if(r->wanted.after)
  638. log_date(wanted_after_buf, LOG_DATE_LENGTH, r->wanted.after);
  639. if(r->wanted.before)
  640. log_date(wanted_before_buf, LOG_DATE_LENGTH, r->wanted.before);
  641. internal_error(true,
  642. "REPLAY: 'host:%s/chart:%s' sending replication request %ld [%s] to %ld [%s], start streaming '%s': %s: "
  643. "last[%ld - %ld] child[%ld - %ld, now %ld %s] local[%ld - %ld, now %ld] gap[%ld - %ld %s] %s"
  644. , rrdhost_hostname(r->host), rrdset_id(r->st)
  645. , r->wanted.after, wanted_after_buf
  646. , r->wanted.before, wanted_before_buf
  647. , r->wanted.start_streaming ? "YES" : "NO"
  648. , msg
  649. , r->last_request.after, r->last_request.before
  650. , r->child_db.first_entry_t, r->child_db.last_entry_t
  651. , r->child_db.wall_clock_time, (r->child_db.wall_clock_time == r->local_db.wall_clock_time) ? "SAME" : (r->child_db.wall_clock_time < r->local_db.wall_clock_time) ? "BEHIND" : "AHEAD"
  652. , r->local_db.first_entry_t, r->local_db.last_entry_t
  653. , r->local_db.wall_clock_time
  654. , r->gap.from, r->gap.to
  655. , (r->gap.from == r->wanted.after) ? "FULL" : "PARTIAL"
  656. , (st->replay.after != 0 || st->replay.before != 0) ? "OVERLAPPING" : ""
  657. );
  658. st->replay.start_streaming = r->wanted.start_streaming;
  659. st->replay.after = r->wanted.after;
  660. st->replay.before = r->wanted.before;
  661. #endif // NETDATA_LOG_REPLICATION_REQUESTS
  662. char buffer[2048 + 1];
  663. snprintfz(buffer, sizeof(buffer) - 1, PLUGINSD_KEYWORD_REPLAY_CHART " \"%s\" \"%s\" %llu %llu\n",
  664. rrdset_id(st), r->wanted.start_streaming ? "true" : "false",
  665. (unsigned long long)r->wanted.after, (unsigned long long)r->wanted.before);
  666. ssize_t ret = r->caller.callback(buffer, r->caller.data);
  667. if (ret < 0) {
  668. netdata_log_error("REPLAY ERROR: 'host:%s/chart:%s' failed to send replication request to child (error %zd)",
  669. rrdhost_hostname(r->host), rrdset_id(r->st), ret);
  670. return false;
  671. }
  672. return true;
  673. }
  674. bool replicate_chart_request(send_command callback, void *callback_data, RRDHOST *host, RRDSET *st,
  675. time_t child_first_entry, time_t child_last_entry, time_t child_wall_clock_time,
  676. time_t prev_first_entry_wanted, time_t prev_last_entry_wanted)
  677. {
  678. struct replication_request_details r = {
  679. .caller = {
  680. .callback = callback,
  681. .data = callback_data,
  682. },
  683. .host = host,
  684. .st = st,
  685. .child_db = {
  686. .first_entry_t = child_first_entry,
  687. .last_entry_t = child_last_entry,
  688. .wall_clock_time = child_wall_clock_time,
  689. .fixed_last_entry = false,
  690. },
  691. .local_db = {
  692. .first_entry_t = 0,
  693. .last_entry_t = 0,
  694. .wall_clock_time = now_realtime_sec(),
  695. },
  696. .last_request = {
  697. .after = prev_first_entry_wanted,
  698. .before = prev_last_entry_wanted,
  699. },
  700. .wanted = {
  701. .after = 0,
  702. .before = 0,
  703. .start_streaming = true,
  704. },
  705. };
  706. if(r.child_db.last_entry_t > r.child_db.wall_clock_time) {
  707. replicate_log_request(&r, "child's db last entry > child's wall clock time");
  708. r.child_db.last_entry_t = r.child_db.wall_clock_time;
  709. r.child_db.fixed_last_entry = true;
  710. }
  711. rrdset_get_retention_of_tier_for_collected_chart(r.st, &r.local_db.first_entry_t, &r.local_db.last_entry_t, r.local_db.wall_clock_time, 0);
  712. // let's find the GAP we have
  713. if(!r.last_request.after || !r.last_request.before) {
  714. // there is no previous request
  715. if(r.local_db.last_entry_t)
  716. // we have some data, let's continue from the last point we have
  717. r.gap.from = r.local_db.last_entry_t;
  718. else
  719. // we don't have any data, the gap is the max timeframe we are allowed to replicate
  720. r.gap.from = r.local_db.wall_clock_time - r.host->rrdpush_seconds_to_replicate;
  721. }
  722. else {
  723. // we had sent a request - let's continue at the point we left it
  724. // for this we don't take into account the actual data in our db
  725. // because the child may also have gaps, and we need to get over it
  726. r.gap.from = r.last_request.before;
  727. }
  728. // we want all the data up to now
  729. r.gap.to = r.local_db.wall_clock_time;
  730. // The gap is now r.gap.from -> r.gap.to
  731. if (unlikely(!rrdhost_option_check(host, RRDHOST_OPTION_REPLICATION)))
  732. return send_replay_chart_cmd(&r, "empty replication request, replication is disabled", false);
  733. if (unlikely(!rrdset_number_of_dimensions(st)))
  734. return send_replay_chart_cmd(&r, "empty replication request, chart has no dimensions", false);
  735. if (unlikely(!r.child_db.first_entry_t || !r.child_db.last_entry_t))
  736. return send_replay_chart_cmd(&r, "empty replication request, child has no stored data", false);
  737. if (unlikely(r.child_db.first_entry_t < 0 || r.child_db.last_entry_t < 0))
  738. return send_replay_chart_cmd(&r, "empty replication request, child db timestamps are invalid", true);
  739. if (unlikely(r.child_db.first_entry_t > r.child_db.wall_clock_time))
  740. return send_replay_chart_cmd(&r, "empty replication request, child db first entry is after its wall clock time", true);
  741. if (unlikely(r.child_db.first_entry_t > r.child_db.last_entry_t))
  742. return send_replay_chart_cmd(&r, "empty replication request, child timings are invalid (first entry > last entry)", true);
  743. if (unlikely(r.local_db.last_entry_t > r.child_db.last_entry_t))
  744. return send_replay_chart_cmd(&r, "empty replication request, local last entry is later than the child one", false);
  745. // let's find what the child can provide to fill that gap
  746. if(r.child_db.first_entry_t > r.gap.from)
  747. // the child does not have all the data - let's get what it has
  748. r.wanted.after = r.child_db.first_entry_t;
  749. else
  750. // ok, the child can fill the entire gap we have
  751. r.wanted.after = r.gap.from;
  752. if(r.gap.to - r.wanted.after > host->rrdpush_replication_step)
  753. // the duration is too big for one request - let's take the first step
  754. r.wanted.before = r.wanted.after + host->rrdpush_replication_step;
  755. else
  756. // wow, we can do it in one request
  757. r.wanted.before = r.gap.to;
  758. // don't ask from the child more than it has
  759. if(r.wanted.before > r.child_db.last_entry_t)
  760. r.wanted.before = r.child_db.last_entry_t;
  761. if(r.wanted.after > r.wanted.before) {
  762. r.wanted.after = 0;
  763. r.wanted.before = 0;
  764. r.wanted.start_streaming = true;
  765. return send_replay_chart_cmd(&r, "empty replication request, wanted after computed bigger than wanted before", true);
  766. }
  767. // the child should start streaming immediately if the wanted duration is small, or we reached the last entry of the child
  768. r.wanted.start_streaming = (r.local_db.wall_clock_time - r.wanted.after <= host->rrdpush_replication_step ||
  769. r.wanted.before >= r.child_db.last_entry_t ||
  770. r.wanted.before >= r.child_db.wall_clock_time ||
  771. r.wanted.before >= r.local_db.wall_clock_time);
  772. // the wanted timeframe is now r.wanted.after -> r.wanted.before
  773. // send it
  774. return send_replay_chart_cmd(&r, "OK", false);
  775. }
  776. // ----------------------------------------------------------------------------
  777. // replication thread
  778. // replication request in sender DICTIONARY
  779. // used for de-duplicating the requests
  780. struct replication_request {
  781. struct sender_state *sender; // the sender we should put the reply at
  782. STRING *chart_id; // the chart of the request
  783. time_t after; // the start time of the query (maybe zero) key for sorting (JudyL)
  784. time_t before; // the end time of the query (maybe zero)
  785. usec_t sender_last_flush_ut; // the timestamp of the sender, at the time we indexed this request
  786. Word_t unique_id; // auto-increment, later requests have bigger
  787. bool start_streaming; // true, when the parent wants to send the rest of the data (before is overwritten) and enable normal streaming
  788. bool indexed_in_judy; // true when the request is indexed in judy
  789. bool not_indexed_buffer_full; // true when the request is not indexed because the sender is full
  790. bool not_indexed_preprocessing; // true when the request is not indexed, but it is pending in preprocessing
  791. // prepare ahead members - preprocessing
  792. bool found; // used as a result boolean for the find call
  793. bool executed; // used to detect if we have skipped requests while preprocessing
  794. RRDSET *st; // caching of the chart during preprocessing
  795. struct replication_query *q; // the preprocessing query initialization
  796. };
  797. // replication sort entry in JudyL array
  798. // used for sorting all requests, across all nodes
  799. struct replication_sort_entry {
  800. struct replication_request *rq;
  801. size_t unique_id; // used as a key to identify the sort entry - we never access its contents
  802. };
  803. #define MAX_REPLICATION_THREADS 20 // + 1 for the main thread
  804. // the global variables for the replication thread
  805. static struct replication_thread {
  806. ARAL *aral_rse;
  807. SPINLOCK spinlock;
  808. struct {
  809. size_t pending; // number of requests pending in the queue
  810. // statistics
  811. size_t added; // number of requests added to the queue
  812. size_t removed; // number of requests removed from the queue
  813. size_t pending_no_room; // number of requests skipped, because the sender has no room for responses
  814. size_t senders_full; // number of times a sender reset our last position in the queue
  815. size_t sender_resets; // number of times a sender reset our last position in the queue
  816. time_t first_time_t; // the minimum 'after' we encountered
  817. struct {
  818. Word_t after;
  819. Word_t unique_id;
  820. Pvoid_t JudyL_array;
  821. } queue;
  822. } unsafe; // protected from replication_recursive_lock()
  823. struct {
  824. Word_t unique_id; // the last unique id we gave to a request (auto-increment, starting from 1)
  825. size_t executed; // the number of replication requests executed
  826. size_t latest_first_time; // the 'after' timestamp of the last request we executed
  827. size_t memory; // the total memory allocated by replication
  828. } atomic; // access should be with atomic operations
  829. struct {
  830. size_t last_executed; // caching of the atomic.executed to report number of requests executed since last time
  831. netdata_thread_t **threads_ptrs;
  832. size_t threads;
  833. } main_thread; // access is allowed only by the main thread
  834. } replication_globals = {
  835. .aral_rse = NULL,
  836. .spinlock = NETDATA_SPINLOCK_INITIALIZER,
  837. .unsafe = {
  838. .pending = 0,
  839. .added = 0,
  840. .removed = 0,
  841. .pending_no_room = 0,
  842. .sender_resets = 0,
  843. .senders_full = 0,
  844. .first_time_t = 0,
  845. .queue = {
  846. .after = 0,
  847. .unique_id = 0,
  848. .JudyL_array = NULL,
  849. },
  850. },
  851. .atomic = {
  852. .unique_id = 0,
  853. .executed = 0,
  854. .latest_first_time = 0,
  855. .memory = 0,
  856. },
  857. .main_thread = {
  858. .last_executed = 0,
  859. .threads = 0,
  860. .threads_ptrs = NULL,
  861. },
  862. };
  863. size_t replication_allocated_memory(void) {
  864. return __atomic_load_n(&replication_globals.atomic.memory, __ATOMIC_RELAXED);
  865. }
  866. #define replication_set_latest_first_time(t) __atomic_store_n(&replication_globals.atomic.latest_first_time, t, __ATOMIC_RELAXED)
  867. #define replication_get_latest_first_time() __atomic_load_n(&replication_globals.atomic.latest_first_time, __ATOMIC_RELAXED)
  868. static inline bool replication_recursive_lock_mode(char mode) {
  869. static __thread int recursions = 0;
  870. if(mode == 'L') { // (L)ock
  871. if(++recursions == 1)
  872. spinlock_lock(&replication_globals.spinlock);
  873. }
  874. else if(mode == 'U') { // (U)nlock
  875. if(--recursions == 0)
  876. spinlock_unlock(&replication_globals.spinlock);
  877. }
  878. else if(mode == 'C') { // (C)heck
  879. if(recursions > 0)
  880. return true;
  881. else
  882. return false;
  883. }
  884. else
  885. fatal("REPLICATION: unknown lock mode '%c'", mode);
  886. #ifdef NETDATA_INTERNAL_CHECKS
  887. if(recursions < 0)
  888. fatal("REPLICATION: recursions is %d", recursions);
  889. #endif
  890. return true;
  891. }
  892. #define replication_recursive_lock() replication_recursive_lock_mode('L')
  893. #define replication_recursive_unlock() replication_recursive_lock_mode('U')
  894. #define fatal_when_replication_is_not_locked_for_me() do { \
  895. if(!replication_recursive_lock_mode('C')) \
  896. fatal("REPLICATION: reached %s, but replication is not locked by this thread.", __FUNCTION__); \
  897. } while(0)
  898. void replication_set_next_point_in_time(time_t after, size_t unique_id) {
  899. replication_recursive_lock();
  900. replication_globals.unsafe.queue.after = after;
  901. replication_globals.unsafe.queue.unique_id = unique_id;
  902. replication_recursive_unlock();
  903. }
  904. // ----------------------------------------------------------------------------
  905. // replication sort entry management
  906. static inline struct replication_sort_entry *replication_sort_entry_create(struct replication_request *rq) {
  907. struct replication_sort_entry *rse = aral_mallocz(replication_globals.aral_rse);
  908. __atomic_add_fetch(&replication_globals.atomic.memory, sizeof(struct replication_sort_entry), __ATOMIC_RELAXED);
  909. rrdpush_sender_pending_replication_requests_plus_one(rq->sender);
  910. // copy the request
  911. rse->rq = rq;
  912. rse->unique_id = __atomic_add_fetch(&replication_globals.atomic.unique_id, 1, __ATOMIC_SEQ_CST);
  913. // save the unique id into the request, to be able to delete it later
  914. rq->unique_id = rse->unique_id;
  915. rq->indexed_in_judy = false;
  916. rq->not_indexed_buffer_full = false;
  917. rq->not_indexed_preprocessing = false;
  918. return rse;
  919. }
  920. static void replication_sort_entry_destroy(struct replication_sort_entry *rse) {
  921. aral_freez(replication_globals.aral_rse, rse);
  922. __atomic_sub_fetch(&replication_globals.atomic.memory, sizeof(struct replication_sort_entry), __ATOMIC_RELAXED);
  923. }
  924. static void replication_sort_entry_add(struct replication_request *rq) {
  925. if(unlikely(rrdpush_sender_replication_buffer_full_get(rq->sender))) {
  926. rq->indexed_in_judy = false;
  927. rq->not_indexed_buffer_full = true;
  928. rq->not_indexed_preprocessing = false;
  929. replication_recursive_lock();
  930. replication_globals.unsafe.pending_no_room++;
  931. replication_recursive_unlock();
  932. return;
  933. }
  934. // cache this, because it will be changed
  935. bool decrement_no_room = rq->not_indexed_buffer_full;
  936. struct replication_sort_entry *rse = replication_sort_entry_create(rq);
  937. replication_recursive_lock();
  938. if(decrement_no_room)
  939. replication_globals.unsafe.pending_no_room--;
  940. // if(rq->after < (time_t)replication_globals.protected.queue.after &&
  941. // rq->sender->buffer_used_percentage <= MAX_SENDER_BUFFER_PERCENTAGE_ALLOWED &&
  942. // !replication_globals.protected.skipped_no_room_since_last_reset) {
  943. //
  944. // // make it find this request first
  945. // replication_set_next_point_in_time(rq->after, rq->unique_id);
  946. // }
  947. replication_globals.unsafe.added++;
  948. replication_globals.unsafe.pending++;
  949. Pvoid_t *inner_judy_ptr;
  950. // find the outer judy entry, using after as key
  951. size_t mem_before_outer_judyl = JudyLMemUsed(replication_globals.unsafe.queue.JudyL_array);
  952. inner_judy_ptr = JudyLIns(&replication_globals.unsafe.queue.JudyL_array, (Word_t) rq->after, PJE0);
  953. size_t mem_after_outer_judyl = JudyLMemUsed(replication_globals.unsafe.queue.JudyL_array);
  954. if(unlikely(!inner_judy_ptr || inner_judy_ptr == PJERR))
  955. fatal("REPLICATION: corrupted outer judyL");
  956. // add it to the inner judy, using unique_id as key
  957. size_t mem_before_inner_judyl = JudyLMemUsed(*inner_judy_ptr);
  958. Pvoid_t *item = JudyLIns(inner_judy_ptr, rq->unique_id, PJE0);
  959. size_t mem_after_inner_judyl = JudyLMemUsed(*inner_judy_ptr);
  960. if(unlikely(!item || item == PJERR))
  961. fatal("REPLICATION: corrupted inner judyL");
  962. *item = rse;
  963. rq->indexed_in_judy = true;
  964. rq->not_indexed_buffer_full = false;
  965. rq->not_indexed_preprocessing = false;
  966. if(!replication_globals.unsafe.first_time_t || rq->after < replication_globals.unsafe.first_time_t)
  967. replication_globals.unsafe.first_time_t = rq->after;
  968. replication_recursive_unlock();
  969. __atomic_add_fetch(&replication_globals.atomic.memory, (mem_after_inner_judyl - mem_before_inner_judyl) + (mem_after_outer_judyl - mem_before_outer_judyl), __ATOMIC_RELAXED);
  970. }
  971. static bool replication_sort_entry_unlink_and_free_unsafe(struct replication_sort_entry *rse, Pvoid_t **inner_judy_ppptr, bool preprocessing) {
  972. fatal_when_replication_is_not_locked_for_me();
  973. bool inner_judy_deleted = false;
  974. replication_globals.unsafe.removed++;
  975. replication_globals.unsafe.pending--;
  976. rrdpush_sender_pending_replication_requests_minus_one(rse->rq->sender);
  977. rse->rq->indexed_in_judy = false;
  978. rse->rq->not_indexed_preprocessing = preprocessing;
  979. size_t memory_saved = 0;
  980. // delete it from the inner judy
  981. size_t mem_before_inner_judyl = JudyLMemUsed(**inner_judy_ppptr);
  982. JudyLDel(*inner_judy_ppptr, rse->rq->unique_id, PJE0);
  983. size_t mem_after_inner_judyl = JudyLMemUsed(**inner_judy_ppptr);
  984. memory_saved = mem_before_inner_judyl - mem_after_inner_judyl;
  985. // if no items left, delete it from the outer judy
  986. if(**inner_judy_ppptr == NULL) {
  987. size_t mem_before_outer_judyl = JudyLMemUsed(replication_globals.unsafe.queue.JudyL_array);
  988. JudyLDel(&replication_globals.unsafe.queue.JudyL_array, rse->rq->after, PJE0);
  989. size_t mem_after_outer_judyl = JudyLMemUsed(replication_globals.unsafe.queue.JudyL_array);
  990. memory_saved += mem_before_outer_judyl - mem_after_outer_judyl;
  991. inner_judy_deleted = true;
  992. }
  993. // free memory
  994. replication_sort_entry_destroy(rse);
  995. __atomic_sub_fetch(&replication_globals.atomic.memory, memory_saved, __ATOMIC_RELAXED);
  996. return inner_judy_deleted;
  997. }
  998. static void replication_sort_entry_del(struct replication_request *rq, bool buffer_full) {
  999. Pvoid_t *inner_judy_pptr;
  1000. struct replication_sort_entry *rse_to_delete = NULL;
  1001. replication_recursive_lock();
  1002. if(rq->indexed_in_judy) {
  1003. inner_judy_pptr = JudyLGet(replication_globals.unsafe.queue.JudyL_array, rq->after, PJE0);
  1004. if (inner_judy_pptr) {
  1005. Pvoid_t *our_item_pptr = JudyLGet(*inner_judy_pptr, rq->unique_id, PJE0);
  1006. if (our_item_pptr) {
  1007. rse_to_delete = *our_item_pptr;
  1008. replication_sort_entry_unlink_and_free_unsafe(rse_to_delete, &inner_judy_pptr, false);
  1009. if(buffer_full) {
  1010. replication_globals.unsafe.pending_no_room++;
  1011. rq->not_indexed_buffer_full = true;
  1012. }
  1013. }
  1014. }
  1015. if (!rse_to_delete)
  1016. fatal("REPLAY: 'host:%s/chart:%s' Cannot find sort entry to delete for time %ld.",
  1017. rrdhost_hostname(rq->sender->host), string2str(rq->chart_id), rq->after);
  1018. }
  1019. replication_recursive_unlock();
  1020. }
  1021. static struct replication_request replication_request_get_first_available() {
  1022. Pvoid_t *inner_judy_pptr;
  1023. replication_recursive_lock();
  1024. struct replication_request rq_to_return = (struct replication_request){ .found = false };
  1025. if(unlikely(!replication_globals.unsafe.queue.after || !replication_globals.unsafe.queue.unique_id)) {
  1026. replication_globals.unsafe.queue.after = 0;
  1027. replication_globals.unsafe.queue.unique_id = 0;
  1028. }
  1029. Word_t started_after = replication_globals.unsafe.queue.after;
  1030. size_t round = 0;
  1031. while(!rq_to_return.found) {
  1032. round++;
  1033. if(round > 2)
  1034. break;
  1035. if(round == 2) {
  1036. if(started_after == 0)
  1037. break;
  1038. replication_globals.unsafe.queue.after = 0;
  1039. replication_globals.unsafe.queue.unique_id = 0;
  1040. }
  1041. bool find_same_after = true;
  1042. while (!rq_to_return.found && (inner_judy_pptr = JudyLFirstThenNext(replication_globals.unsafe.queue.JudyL_array, &replication_globals.unsafe.queue.after, &find_same_after))) {
  1043. Pvoid_t *our_item_pptr;
  1044. if(unlikely(round == 2 && replication_globals.unsafe.queue.after > started_after))
  1045. break;
  1046. while (!rq_to_return.found && (our_item_pptr = JudyLNext(*inner_judy_pptr, &replication_globals.unsafe.queue.unique_id, PJE0))) {
  1047. struct replication_sort_entry *rse = *our_item_pptr;
  1048. struct replication_request *rq = rse->rq;
  1049. // copy the request to return it
  1050. rq_to_return = *rq;
  1051. rq_to_return.chart_id = string_dup(rq_to_return.chart_id);
  1052. // set the return result to found
  1053. rq_to_return.found = true;
  1054. if (replication_sort_entry_unlink_and_free_unsafe(rse, &inner_judy_pptr, true))
  1055. // we removed the item from the outer JudyL
  1056. break;
  1057. }
  1058. // prepare for the next iteration on the outer loop
  1059. replication_globals.unsafe.queue.unique_id = 0;
  1060. }
  1061. }
  1062. replication_recursive_unlock();
  1063. return rq_to_return;
  1064. }
  1065. // ----------------------------------------------------------------------------
  1066. // replication request management
  1067. static void replication_request_react_callback(const DICTIONARY_ITEM *item __maybe_unused, void *value __maybe_unused, void *sender_state __maybe_unused) {
  1068. struct sender_state *s = sender_state; (void)s;
  1069. struct replication_request *rq = value;
  1070. // IMPORTANT:
  1071. // We use the react instead of the insert callback
  1072. // because we want the item to be atomically visible
  1073. // to our replication thread, immediately after.
  1074. // If we put this at the insert callback, the item is not guaranteed
  1075. // to be atomically visible to others, so the replication thread
  1076. // may see the replication sort entry, but fail to find the dictionary item
  1077. // related to it.
  1078. replication_sort_entry_add(rq);
  1079. // this request is about a unique chart for this sender
  1080. rrdpush_sender_replicating_charts_plus_one(s);
  1081. }
  1082. static bool replication_request_conflict_callback(const DICTIONARY_ITEM *item __maybe_unused, void *old_value, void *new_value, void *sender_state) {
  1083. struct sender_state *s = sender_state; (void)s;
  1084. struct replication_request *rq = old_value; (void)rq;
  1085. struct replication_request *rq_new = new_value;
  1086. replication_recursive_lock();
  1087. if(!rq->indexed_in_judy && rq->not_indexed_buffer_full && !rq->not_indexed_preprocessing) {
  1088. // we can replace this command
  1089. internal_error(
  1090. true,
  1091. "STREAM %s [send to %s]: REPLAY: 'host:%s/chart:%s' replacing duplicate replication command received (existing from %llu to %llu [%s], new from %llu to %llu [%s])",
  1092. rrdhost_hostname(s->host), s->connected_to, rrdhost_hostname(s->host), dictionary_acquired_item_name(item),
  1093. (unsigned long long)rq->after, (unsigned long long)rq->before, rq->start_streaming ? "true" : "false",
  1094. (unsigned long long)rq_new->after, (unsigned long long)rq_new->before, rq_new->start_streaming ? "true" : "false");
  1095. rq->after = rq_new->after;
  1096. rq->before = rq_new->before;
  1097. rq->start_streaming = rq_new->start_streaming;
  1098. }
  1099. else if(!rq->indexed_in_judy && !rq->not_indexed_preprocessing) {
  1100. replication_sort_entry_add(rq);
  1101. internal_error(
  1102. true,
  1103. "STREAM %s [send to %s]: REPLAY: 'host:%s/chart:%s' adding duplicate replication command received (existing from %llu to %llu [%s], new from %llu to %llu [%s])",
  1104. rrdhost_hostname(s->host), s->connected_to, rrdhost_hostname(s->host), dictionary_acquired_item_name(item),
  1105. (unsigned long long)rq->after, (unsigned long long)rq->before, rq->start_streaming ? "true" : "false",
  1106. (unsigned long long)rq_new->after, (unsigned long long)rq_new->before, rq_new->start_streaming ? "true" : "false");
  1107. }
  1108. else {
  1109. internal_error(
  1110. true,
  1111. "STREAM %s [send to %s]: REPLAY: 'host:%s/chart:%s' ignoring duplicate replication command received (existing from %llu to %llu [%s], new from %llu to %llu [%s])",
  1112. rrdhost_hostname(s->host), s->connected_to, rrdhost_hostname(s->host),
  1113. dictionary_acquired_item_name(item),
  1114. (unsigned long long) rq->after, (unsigned long long) rq->before, rq->start_streaming ? "true" : "false",
  1115. (unsigned long long) rq_new->after, (unsigned long long) rq_new->before, rq_new->start_streaming ? "true" : "false");
  1116. }
  1117. replication_recursive_unlock();
  1118. string_freez(rq_new->chart_id);
  1119. return false;
  1120. }
  1121. static void replication_request_delete_callback(const DICTIONARY_ITEM *item __maybe_unused, void *value, void *sender_state __maybe_unused) {
  1122. struct replication_request *rq = value;
  1123. // this request is about a unique chart for this sender
  1124. rrdpush_sender_replicating_charts_minus_one(rq->sender);
  1125. if(rq->indexed_in_judy)
  1126. replication_sort_entry_del(rq, false);
  1127. else if(rq->not_indexed_buffer_full) {
  1128. replication_recursive_lock();
  1129. replication_globals.unsafe.pending_no_room--;
  1130. replication_recursive_unlock();
  1131. }
  1132. string_freez(rq->chart_id);
  1133. }
  1134. static bool sender_is_still_connected_for_this_request(struct replication_request *rq) {
  1135. return rq->sender_last_flush_ut == rrdpush_sender_get_flush_time(rq->sender);
  1136. }
  1137. static bool replication_execute_request(struct replication_request *rq, bool workers) {
  1138. bool ret = false;
  1139. if(!rq->st) {
  1140. if(likely(workers))
  1141. worker_is_busy(WORKER_JOB_FIND_CHART);
  1142. rq->st = rrdset_find(rq->sender->host, string2str(rq->chart_id));
  1143. }
  1144. if(!rq->st) {
  1145. internal_error(true, "REPLAY ERROR: 'host:%s/chart:%s' not found",
  1146. rrdhost_hostname(rq->sender->host), string2str(rq->chart_id));
  1147. goto cleanup;
  1148. }
  1149. netdata_thread_disable_cancelability();
  1150. if(!rq->q) {
  1151. if(likely(workers))
  1152. worker_is_busy(WORKER_JOB_PREPARE_QUERY);
  1153. rq->q = replication_response_prepare(
  1154. rq->st,
  1155. rq->start_streaming,
  1156. rq->after,
  1157. rq->before,
  1158. rq->sender->capabilities);
  1159. }
  1160. if(likely(workers))
  1161. worker_is_busy(WORKER_JOB_QUERYING);
  1162. // send the replication data
  1163. rq->q->rq = rq;
  1164. replication_response_execute_and_finalize(
  1165. rq->q, (size_t)((unsigned long long)rq->sender->host->sender->buffer->max_size * MAX_REPLICATION_MESSAGE_PERCENT_SENDER_BUFFER / 100ULL));
  1166. rq->q = NULL;
  1167. netdata_thread_enable_cancelability();
  1168. __atomic_add_fetch(&replication_globals.atomic.executed, 1, __ATOMIC_RELAXED);
  1169. ret = true;
  1170. cleanup:
  1171. if(rq->q) {
  1172. replication_response_cancel_and_finalize(rq->q);
  1173. rq->q = NULL;
  1174. }
  1175. string_freez(rq->chart_id);
  1176. worker_is_idle();
  1177. return ret;
  1178. }
  1179. // ----------------------------------------------------------------------------
  1180. // public API
  1181. void replication_add_request(struct sender_state *sender, const char *chart_id, time_t after, time_t before, bool start_streaming) {
  1182. struct replication_request rq = {
  1183. .sender = sender,
  1184. .chart_id = string_strdupz(chart_id),
  1185. .after = after,
  1186. .before = before,
  1187. .start_streaming = start_streaming,
  1188. .sender_last_flush_ut = rrdpush_sender_get_flush_time(sender),
  1189. .indexed_in_judy = false,
  1190. .not_indexed_buffer_full = false,
  1191. .not_indexed_preprocessing = false,
  1192. };
  1193. if(!sender->replication.oldest_request_after_t || rq.after < sender->replication.oldest_request_after_t)
  1194. sender->replication.oldest_request_after_t = rq.after;
  1195. if(start_streaming && rrdpush_sender_get_buffer_used_percent(sender) <= STREAMING_START_MAX_SENDER_BUFFER_PERCENTAGE_ALLOWED)
  1196. replication_execute_request(&rq, false);
  1197. else
  1198. dictionary_set(sender->replication.requests, chart_id, &rq, sizeof(struct replication_request));
  1199. }
  1200. void replication_sender_delete_pending_requests(struct sender_state *sender) {
  1201. // allow the dictionary destructor to go faster on locks
  1202. dictionary_flush(sender->replication.requests);
  1203. }
  1204. void replication_init_sender(struct sender_state *sender) {
  1205. sender->replication.requests = dictionary_create_advanced(DICT_OPTION_DONT_OVERWRITE_VALUE | DICT_OPTION_FIXED_SIZE,
  1206. NULL, sizeof(struct replication_request));
  1207. dictionary_register_react_callback(sender->replication.requests, replication_request_react_callback, sender);
  1208. dictionary_register_conflict_callback(sender->replication.requests, replication_request_conflict_callback, sender);
  1209. dictionary_register_delete_callback(sender->replication.requests, replication_request_delete_callback, sender);
  1210. }
  1211. void replication_cleanup_sender(struct sender_state *sender) {
  1212. // allow the dictionary destructor to go faster on locks
  1213. replication_recursive_lock();
  1214. dictionary_destroy(sender->replication.requests);
  1215. replication_recursive_unlock();
  1216. }
  1217. void replication_recalculate_buffer_used_ratio_unsafe(struct sender_state *s) {
  1218. size_t available = cbuffer_available_size_unsafe(s->host->sender->buffer);
  1219. size_t percentage = (s->buffer->max_size - available) * 100 / s->buffer->max_size;
  1220. if(unlikely(percentage > MAX_SENDER_BUFFER_PERCENTAGE_ALLOWED && !rrdpush_sender_replication_buffer_full_get(s))) {
  1221. rrdpush_sender_replication_buffer_full_set(s, true);
  1222. struct replication_request *rq;
  1223. dfe_start_read(s->replication.requests, rq) {
  1224. if(rq->indexed_in_judy)
  1225. replication_sort_entry_del(rq, true);
  1226. }
  1227. dfe_done(rq);
  1228. replication_recursive_lock();
  1229. replication_globals.unsafe.senders_full++;
  1230. replication_recursive_unlock();
  1231. }
  1232. else if(unlikely(percentage < MIN_SENDER_BUFFER_PERCENTAGE_ALLOWED && rrdpush_sender_replication_buffer_full_get(s))) {
  1233. rrdpush_sender_replication_buffer_full_set(s, false);
  1234. struct replication_request *rq;
  1235. dfe_start_read(s->replication.requests, rq) {
  1236. if(!rq->indexed_in_judy && (rq->not_indexed_buffer_full || rq->not_indexed_preprocessing))
  1237. replication_sort_entry_add(rq);
  1238. }
  1239. dfe_done(rq);
  1240. replication_recursive_lock();
  1241. replication_globals.unsafe.senders_full--;
  1242. replication_globals.unsafe.sender_resets++;
  1243. // replication_set_next_point_in_time(0, 0);
  1244. replication_recursive_unlock();
  1245. }
  1246. rrdpush_sender_set_buffer_used_percent(s, percentage);
  1247. }
  1248. // ----------------------------------------------------------------------------
  1249. // replication thread
  1250. static size_t verify_host_charts_are_streaming_now(RRDHOST *host) {
  1251. internal_error(
  1252. host->sender &&
  1253. !rrdpush_sender_pending_replication_requests(host->sender) &&
  1254. dictionary_entries(host->sender->replication.requests) != 0,
  1255. "REPLICATION SUMMARY: 'host:%s' reports %zu pending replication requests, but its chart replication index says there are %zu charts pending replication",
  1256. rrdhost_hostname(host),
  1257. rrdpush_sender_pending_replication_requests(host->sender),
  1258. dictionary_entries(host->sender->replication.requests)
  1259. );
  1260. size_t ok = 0;
  1261. size_t errors = 0;
  1262. RRDSET *st;
  1263. rrdset_foreach_read(st, host) {
  1264. RRDSET_FLAGS flags = rrdset_flag_check(st, RRDSET_FLAG_SENDER_REPLICATION_IN_PROGRESS | RRDSET_FLAG_SENDER_REPLICATION_FINISHED);
  1265. bool is_error = false;
  1266. if(!flags) {
  1267. internal_error(
  1268. true,
  1269. "REPLICATION SUMMARY: 'host:%s/chart:%s' is neither IN PROGRESS nor FINISHED",
  1270. rrdhost_hostname(host), rrdset_id(st)
  1271. );
  1272. is_error = true;
  1273. }
  1274. if(!(flags & RRDSET_FLAG_SENDER_REPLICATION_FINISHED) || (flags & RRDSET_FLAG_SENDER_REPLICATION_IN_PROGRESS)) {
  1275. internal_error(
  1276. true,
  1277. "REPLICATION SUMMARY: 'host:%s/chart:%s' is IN PROGRESS although replication is finished",
  1278. rrdhost_hostname(host), rrdset_id(st)
  1279. );
  1280. is_error = true;
  1281. }
  1282. if(is_error)
  1283. errors++;
  1284. else
  1285. ok++;
  1286. }
  1287. rrdset_foreach_done(st);
  1288. internal_error(errors,
  1289. "REPLICATION SUMMARY: 'host:%s' finished replicating %zu charts, but %zu charts are still in progress although replication finished",
  1290. rrdhost_hostname(host), ok, errors);
  1291. return errors;
  1292. }
  1293. static void verify_all_hosts_charts_are_streaming_now(void) {
  1294. worker_is_busy(WORKER_JOB_CHECK_CONSISTENCY);
  1295. size_t errors = 0;
  1296. RRDHOST *host;
  1297. dfe_start_read(rrdhost_root_index, host)
  1298. errors += verify_host_charts_are_streaming_now(host);
  1299. dfe_done(host);
  1300. size_t executed = __atomic_load_n(&replication_globals.atomic.executed, __ATOMIC_RELAXED);
  1301. netdata_log_info("REPLICATION SUMMARY: finished, executed %zu replication requests, %zu charts pending replication",
  1302. executed - replication_globals.main_thread.last_executed, errors);
  1303. replication_globals.main_thread.last_executed = executed;
  1304. }
  1305. static void replication_initialize_workers(bool master) {
  1306. worker_register("REPLICATION");
  1307. worker_register_job_name(WORKER_JOB_FIND_NEXT, "find next");
  1308. worker_register_job_name(WORKER_JOB_QUERYING, "querying");
  1309. worker_register_job_name(WORKER_JOB_DELETE_ENTRY, "dict delete");
  1310. worker_register_job_name(WORKER_JOB_FIND_CHART, "find chart");
  1311. worker_register_job_name(WORKER_JOB_PREPARE_QUERY, "prepare query");
  1312. worker_register_job_name(WORKER_JOB_CHECK_CONSISTENCY, "check consistency");
  1313. worker_register_job_name(WORKER_JOB_BUFFER_COMMIT, "commit");
  1314. worker_register_job_name(WORKER_JOB_CLEANUP, "cleanup");
  1315. worker_register_job_name(WORKER_JOB_WAIT, "wait");
  1316. if(master) {
  1317. worker_register_job_name(WORKER_JOB_STATISTICS, "statistics");
  1318. worker_register_job_custom_metric(WORKER_JOB_CUSTOM_METRIC_PENDING_REQUESTS, "pending requests", "requests", WORKER_METRIC_ABSOLUTE);
  1319. worker_register_job_custom_metric(WORKER_JOB_CUSTOM_METRIC_SKIPPED_NO_ROOM, "no room requests", "requests", WORKER_METRIC_ABSOLUTE);
  1320. worker_register_job_custom_metric(WORKER_JOB_CUSTOM_METRIC_COMPLETION, "completion", "%", WORKER_METRIC_ABSOLUTE);
  1321. worker_register_job_custom_metric(WORKER_JOB_CUSTOM_METRIC_ADDED, "added requests", "requests/s", WORKER_METRIC_INCREMENTAL_TOTAL);
  1322. worker_register_job_custom_metric(WORKER_JOB_CUSTOM_METRIC_DONE, "finished requests", "requests/s", WORKER_METRIC_INCREMENTAL_TOTAL);
  1323. worker_register_job_custom_metric(WORKER_JOB_CUSTOM_METRIC_SENDER_RESETS, "sender resets", "resets/s", WORKER_METRIC_INCREMENTAL_TOTAL);
  1324. worker_register_job_custom_metric(WORKER_JOB_CUSTOM_METRIC_SENDER_FULL, "senders full", "senders", WORKER_METRIC_ABSOLUTE);
  1325. }
  1326. }
  1327. #define REQUEST_OK (0)
  1328. #define REQUEST_QUEUE_EMPTY (-1)
  1329. #define REQUEST_CHART_NOT_FOUND (-2)
  1330. static __thread struct replication_thread_pipeline {
  1331. int max_requests_ahead;
  1332. struct replication_request *rqs;
  1333. int rqs_last_executed, rqs_last_prepared;
  1334. size_t queue_rounds;
  1335. } rtp = {
  1336. .max_requests_ahead = 0,
  1337. .rqs = NULL,
  1338. .rqs_last_executed = 0,
  1339. .rqs_last_prepared = 0,
  1340. .queue_rounds = 0,
  1341. };
  1342. static void replication_pipeline_cancel_and_cleanup(void) {
  1343. if(!rtp.rqs)
  1344. return;
  1345. struct replication_request *rq;
  1346. size_t cancelled = 0;
  1347. do {
  1348. if (++rtp.rqs_last_executed >= rtp.max_requests_ahead)
  1349. rtp.rqs_last_executed = 0;
  1350. rq = &rtp.rqs[rtp.rqs_last_executed];
  1351. if (rq->q) {
  1352. internal_fatal(rq->executed, "REPLAY FATAL: query has already been executed!");
  1353. internal_fatal(!rq->found, "REPLAY FATAL: orphan q in rq");
  1354. replication_response_cancel_and_finalize(rq->q);
  1355. rq->q = NULL;
  1356. cancelled++;
  1357. }
  1358. rq->executed = true;
  1359. rq->found = false;
  1360. } while (rtp.rqs_last_executed != rtp.rqs_last_prepared);
  1361. internal_error(true, "REPLICATION: cancelled %zu inflight queries", cancelled);
  1362. freez(rtp.rqs);
  1363. rtp.rqs = NULL;
  1364. rtp.max_requests_ahead = 0;
  1365. rtp.rqs_last_executed = 0;
  1366. rtp.rqs_last_prepared = 0;
  1367. rtp.queue_rounds = 0;
  1368. }
  1369. static int replication_pipeline_execute_next(void) {
  1370. struct replication_request *rq;
  1371. if(unlikely(!rtp.rqs)) {
  1372. rtp.max_requests_ahead = (int)get_netdata_cpus() / 2;
  1373. if(rtp.max_requests_ahead > libuv_worker_threads * 2)
  1374. rtp.max_requests_ahead = libuv_worker_threads * 2;
  1375. if(rtp.max_requests_ahead < 2)
  1376. rtp.max_requests_ahead = 2;
  1377. rtp.rqs = callocz(rtp.max_requests_ahead, sizeof(struct replication_request));
  1378. __atomic_add_fetch(&replication_buffers_allocated, rtp.max_requests_ahead * sizeof(struct replication_request), __ATOMIC_RELAXED);
  1379. }
  1380. // fill the queue
  1381. do {
  1382. if(++rtp.rqs_last_prepared >= rtp.max_requests_ahead) {
  1383. rtp.rqs_last_prepared = 0;
  1384. rtp.queue_rounds++;
  1385. }
  1386. internal_fatal(rtp.rqs[rtp.rqs_last_prepared].q,
  1387. "REPLAY FATAL: slot is used by query that has not been executed!");
  1388. worker_is_busy(WORKER_JOB_FIND_NEXT);
  1389. rtp.rqs[rtp.rqs_last_prepared] = replication_request_get_first_available();
  1390. rq = &rtp.rqs[rtp.rqs_last_prepared];
  1391. if(rq->found) {
  1392. if (!rq->st) {
  1393. worker_is_busy(WORKER_JOB_FIND_CHART);
  1394. rq->st = rrdset_find(rq->sender->host, string2str(rq->chart_id));
  1395. }
  1396. if (rq->st && !rq->q) {
  1397. worker_is_busy(WORKER_JOB_PREPARE_QUERY);
  1398. rq->q = replication_response_prepare(
  1399. rq->st,
  1400. rq->start_streaming,
  1401. rq->after,
  1402. rq->before,
  1403. rq->sender->capabilities);
  1404. }
  1405. rq->executed = false;
  1406. }
  1407. } while(rq->found && rtp.rqs_last_prepared != rtp.rqs_last_executed);
  1408. // pick the first usable
  1409. do {
  1410. if (++rtp.rqs_last_executed >= rtp.max_requests_ahead)
  1411. rtp.rqs_last_executed = 0;
  1412. rq = &rtp.rqs[rtp.rqs_last_executed];
  1413. if(rq->found) {
  1414. internal_fatal(rq->executed, "REPLAY FATAL: query has already been executed!");
  1415. if (rq->sender_last_flush_ut != rrdpush_sender_get_flush_time(rq->sender)) {
  1416. // the sender has reconnected since this request was queued,
  1417. // we can safely throw it away, since the parent will resend it
  1418. replication_response_cancel_and_finalize(rq->q);
  1419. rq->executed = true;
  1420. rq->found = false;
  1421. rq->q = NULL;
  1422. }
  1423. else if (rrdpush_sender_replication_buffer_full_get(rq->sender)) {
  1424. // the sender buffer is full, so we can ignore this request,
  1425. // it has already been marked as 'preprocessed' in the dictionary,
  1426. // and the sender will put it back in when there is
  1427. // enough room in the buffer for processing replication requests
  1428. replication_response_cancel_and_finalize(rq->q);
  1429. rq->executed = true;
  1430. rq->found = false;
  1431. rq->q = NULL;
  1432. }
  1433. else {
  1434. // we can execute this,
  1435. // delete it from the dictionary
  1436. worker_is_busy(WORKER_JOB_DELETE_ENTRY);
  1437. dictionary_del(rq->sender->replication.requests, string2str(rq->chart_id));
  1438. }
  1439. }
  1440. else
  1441. internal_fatal(rq->q, "REPLAY FATAL: slot status says slot is empty, but it has a pending query!");
  1442. } while(!rq->found && rtp.rqs_last_executed != rtp.rqs_last_prepared);
  1443. if(unlikely(!rq->found)) {
  1444. worker_is_idle();
  1445. return REQUEST_QUEUE_EMPTY;
  1446. }
  1447. replication_set_latest_first_time(rq->after);
  1448. bool chart_found = replication_execute_request(rq, true);
  1449. rq->executed = true;
  1450. rq->found = false;
  1451. rq->q = NULL;
  1452. if(unlikely(!chart_found)) {
  1453. worker_is_idle();
  1454. return REQUEST_CHART_NOT_FOUND;
  1455. }
  1456. worker_is_idle();
  1457. return REQUEST_OK;
  1458. }
  1459. static void replication_worker_cleanup(void *ptr __maybe_unused) {
  1460. replication_pipeline_cancel_and_cleanup();
  1461. worker_unregister();
  1462. }
  1463. static void *replication_worker_thread(void *ptr) {
  1464. replication_initialize_workers(false);
  1465. netdata_thread_cleanup_push(replication_worker_cleanup, ptr) {
  1466. while (service_running(SERVICE_REPLICATION)) {
  1467. if (unlikely(replication_pipeline_execute_next() == REQUEST_QUEUE_EMPTY)) {
  1468. sender_thread_buffer_free();
  1469. worker_is_busy(WORKER_JOB_WAIT);
  1470. worker_is_idle();
  1471. sleep_usec(1 * USEC_PER_SEC);
  1472. }
  1473. }
  1474. }
  1475. netdata_thread_cleanup_pop(1);
  1476. return NULL;
  1477. }
  1478. static void replication_main_cleanup(void *ptr) {
  1479. struct netdata_static_thread *static_thread = (struct netdata_static_thread *)ptr;
  1480. static_thread->enabled = NETDATA_MAIN_THREAD_EXITING;
  1481. replication_pipeline_cancel_and_cleanup();
  1482. int threads = (int)replication_globals.main_thread.threads;
  1483. for(int i = 0; i < threads ;i++) {
  1484. netdata_thread_join(*replication_globals.main_thread.threads_ptrs[i], NULL);
  1485. freez(replication_globals.main_thread.threads_ptrs[i]);
  1486. __atomic_sub_fetch(&replication_buffers_allocated, sizeof(netdata_thread_t), __ATOMIC_RELAXED);
  1487. }
  1488. freez(replication_globals.main_thread.threads_ptrs);
  1489. replication_globals.main_thread.threads_ptrs = NULL;
  1490. __atomic_sub_fetch(&replication_buffers_allocated, threads * sizeof(netdata_thread_t), __ATOMIC_RELAXED);
  1491. aral_destroy(replication_globals.aral_rse);
  1492. replication_globals.aral_rse = NULL;
  1493. // custom code
  1494. worker_unregister();
  1495. static_thread->enabled = NETDATA_MAIN_THREAD_EXITED;
  1496. }
  1497. void replication_initialize(void) {
  1498. replication_globals.aral_rse = aral_create("rse", sizeof(struct replication_sort_entry),
  1499. 0, 65536, aral_by_size_statistics(),
  1500. NULL, NULL, false, false);
  1501. }
  1502. void *replication_thread_main(void *ptr __maybe_unused) {
  1503. replication_initialize_workers(true);
  1504. int threads = config_get_number(CONFIG_SECTION_DB, "replication threads", 1);
  1505. if(threads < 1 || threads > MAX_REPLICATION_THREADS) {
  1506. netdata_log_error("replication threads given %d is invalid, resetting to 1", threads);
  1507. threads = 1;
  1508. }
  1509. if(--threads) {
  1510. replication_globals.main_thread.threads = threads;
  1511. replication_globals.main_thread.threads_ptrs = mallocz(threads * sizeof(netdata_thread_t *));
  1512. __atomic_add_fetch(&replication_buffers_allocated, threads * sizeof(netdata_thread_t), __ATOMIC_RELAXED);
  1513. for(int i = 0; i < threads ;i++) {
  1514. char tag[NETDATA_THREAD_TAG_MAX + 1];
  1515. snprintfz(tag, NETDATA_THREAD_TAG_MAX, "REPLAY[%d]", i + 2);
  1516. replication_globals.main_thread.threads_ptrs[i] = mallocz(sizeof(netdata_thread_t));
  1517. __atomic_add_fetch(&replication_buffers_allocated, sizeof(netdata_thread_t), __ATOMIC_RELAXED);
  1518. netdata_thread_create(replication_globals.main_thread.threads_ptrs[i], tag,
  1519. NETDATA_THREAD_OPTION_JOINABLE, replication_worker_thread, NULL);
  1520. }
  1521. }
  1522. netdata_thread_cleanup_push(replication_main_cleanup, ptr);
  1523. // start from 100% completed
  1524. worker_set_metric(WORKER_JOB_CUSTOM_METRIC_COMPLETION, 100.0);
  1525. long run_verification_countdown = LONG_MAX; // LONG_MAX to prevent an initial verification when no replication ever took place
  1526. bool slow = true; // control the time we sleep - it has to start with true!
  1527. usec_t last_now_mono_ut = now_monotonic_usec();
  1528. time_t replication_reset_next_point_in_time_countdown = SECONDS_TO_RESET_POINT_IN_TIME; // restart from the beginning every 10 seconds
  1529. size_t last_executed = 0;
  1530. size_t last_sender_resets = 0;
  1531. while(service_running(SERVICE_REPLICATION)) {
  1532. // statistics
  1533. usec_t now_mono_ut = now_monotonic_usec();
  1534. if(unlikely(now_mono_ut - last_now_mono_ut > default_rrd_update_every * USEC_PER_SEC)) {
  1535. last_now_mono_ut = now_mono_ut;
  1536. worker_is_busy(WORKER_JOB_STATISTICS);
  1537. replication_recursive_lock();
  1538. size_t current_executed = __atomic_load_n(&replication_globals.atomic.executed, __ATOMIC_RELAXED);
  1539. if(last_executed != current_executed) {
  1540. run_verification_countdown = ITERATIONS_IDLE_WITHOUT_PENDING_TO_RUN_SENDER_VERIFICATION;
  1541. last_executed = current_executed;
  1542. slow = false;
  1543. }
  1544. if(replication_reset_next_point_in_time_countdown-- == 0) {
  1545. // once per second, make it scan all the pending requests next time
  1546. replication_set_next_point_in_time(0, 0);
  1547. // replication_globals.protected.skipped_no_room_since_last_reset = 0;
  1548. replication_reset_next_point_in_time_countdown = SECONDS_TO_RESET_POINT_IN_TIME;
  1549. }
  1550. if(--run_verification_countdown == 0) {
  1551. if (!replication_globals.unsafe.pending && !replication_globals.unsafe.pending_no_room) {
  1552. // reset the statistics about completion percentage
  1553. replication_globals.unsafe.first_time_t = 0;
  1554. replication_set_latest_first_time(0);
  1555. verify_all_hosts_charts_are_streaming_now();
  1556. run_verification_countdown = LONG_MAX;
  1557. slow = true;
  1558. }
  1559. else
  1560. run_verification_countdown = ITERATIONS_IDLE_WITHOUT_PENDING_TO_RUN_SENDER_VERIFICATION;
  1561. }
  1562. time_t latest_first_time_t = replication_get_latest_first_time();
  1563. if(latest_first_time_t && replication_globals.unsafe.pending) {
  1564. // completion percentage statistics
  1565. time_t now = now_realtime_sec();
  1566. time_t total = now - replication_globals.unsafe.first_time_t;
  1567. time_t done = latest_first_time_t - replication_globals.unsafe.first_time_t;
  1568. worker_set_metric(WORKER_JOB_CUSTOM_METRIC_COMPLETION,
  1569. (NETDATA_DOUBLE) done * 100.0 / (NETDATA_DOUBLE) total);
  1570. }
  1571. else
  1572. worker_set_metric(WORKER_JOB_CUSTOM_METRIC_COMPLETION, 100.0);
  1573. worker_set_metric(WORKER_JOB_CUSTOM_METRIC_PENDING_REQUESTS, (NETDATA_DOUBLE)replication_globals.unsafe.pending);
  1574. worker_set_metric(WORKER_JOB_CUSTOM_METRIC_ADDED, (NETDATA_DOUBLE)replication_globals.unsafe.added);
  1575. worker_set_metric(WORKER_JOB_CUSTOM_METRIC_DONE, (NETDATA_DOUBLE)__atomic_load_n(&replication_globals.atomic.executed, __ATOMIC_RELAXED));
  1576. worker_set_metric(WORKER_JOB_CUSTOM_METRIC_SKIPPED_NO_ROOM, (NETDATA_DOUBLE)replication_globals.unsafe.pending_no_room);
  1577. worker_set_metric(WORKER_JOB_CUSTOM_METRIC_SENDER_RESETS, (NETDATA_DOUBLE)replication_globals.unsafe.sender_resets);
  1578. worker_set_metric(WORKER_JOB_CUSTOM_METRIC_SENDER_FULL, (NETDATA_DOUBLE)replication_globals.unsafe.senders_full);
  1579. replication_recursive_unlock();
  1580. worker_is_idle();
  1581. }
  1582. if(unlikely(replication_pipeline_execute_next() == REQUEST_QUEUE_EMPTY)) {
  1583. worker_is_busy(WORKER_JOB_WAIT);
  1584. replication_recursive_lock();
  1585. // the timeout also defines now frequently we will traverse all the pending requests
  1586. // when the outbound buffers of all senders is full
  1587. usec_t timeout;
  1588. if(slow) {
  1589. // no work to be done, wait for a request to come in
  1590. timeout = 1000 * USEC_PER_MS;
  1591. sender_thread_buffer_free();
  1592. }
  1593. else if(replication_globals.unsafe.pending > 0) {
  1594. if(replication_globals.unsafe.sender_resets == last_sender_resets)
  1595. timeout = 1000 * USEC_PER_MS;
  1596. else {
  1597. // there are pending requests waiting to be executed,
  1598. // but none could be executed at this time.
  1599. // try again after this time.
  1600. timeout = 100 * USEC_PER_MS;
  1601. }
  1602. last_sender_resets = replication_globals.unsafe.sender_resets;
  1603. }
  1604. else {
  1605. // no requests pending, but there were requests recently (run_verification_countdown)
  1606. // so, try in a short time.
  1607. // if this is big, one chart replicating will be slow to finish (ping - pong just one chart)
  1608. timeout = 10 * USEC_PER_MS;
  1609. last_sender_resets = replication_globals.unsafe.sender_resets;
  1610. }
  1611. replication_recursive_unlock();
  1612. worker_is_idle();
  1613. sleep_usec(timeout);
  1614. // make it scan all the pending requests next time
  1615. replication_set_next_point_in_time(0, 0);
  1616. replication_reset_next_point_in_time_countdown = SECONDS_TO_RESET_POINT_IN_TIME;
  1617. continue;
  1618. }
  1619. }
  1620. netdata_thread_cleanup_pop(1);
  1621. return NULL;
  1622. }