replication.c 80 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737738739740741742743744745746747748749750751752753754755756757758759760761762763764765766767768769770771772773774775776777778779780781782783784785786787788789790791792793794795796797798799800801802803804805806807808809810811812813814815816817818819820821822823824825826827828829830831832833834835836837838839840841842843844845846847848849850851852853854855856857858859860861862863864865866867868869870871872873874875876877878879880881882883884885886887888889890891892893894895896897898899900901902903904905906907908909910911912913914915916917918919920921922923924925926927928929930931932933934935936937938939940941942943944945946947948949950951952953954955956957958959960961962963964965966967968969970971972973974975976977978979980981982983984985986987988989990991992993994995996997998999100010011002100310041005100610071008100910101011101210131014101510161017101810191020102110221023102410251026102710281029103010311032103310341035103610371038103910401041104210431044104510461047104810491050105110521053105410551056105710581059106010611062106310641065106610671068106910701071107210731074107510761077107810791080108110821083108410851086108710881089109010911092109310941095109610971098109911001101110211031104110511061107110811091110111111121113111411151116111711181119112011211122112311241125112611271128112911301131113211331134113511361137113811391140114111421143114411451146114711481149115011511152115311541155115611571158115911601161116211631164116511661167116811691170117111721173117411751176117711781179118011811182118311841185118611871188118911901191119211931194119511961197119811991200120112021203120412051206120712081209121012111212121312141215121612171218121912201221122212231224122512261227122812291230123112321233123412351236123712381239124012411242124312441245124612471248124912501251125212531254125512561257125812591260126112621263126412651266126712681269127012711272127312741275127612771278127912801281128212831284128512861287128812891290129112921293129412951296129712981299130013011302130313041305130613071308130913101311131213131314131513161317131813191320132113221323132413251326132713281329133013311332133313341335133613371338133913401341134213431344134513461347134813491350135113521353135413551356135713581359136013611362136313641365136613671368136913701371137213731374137513761377137813791380138113821383138413851386138713881389139013911392139313941395139613971398139914001401140214031404140514061407140814091410141114121413141414151416141714181419142014211422142314241425142614271428142914301431143214331434143514361437143814391440144114421443144414451446144714481449145014511452145314541455145614571458145914601461146214631464146514661467146814691470147114721473147414751476147714781479148014811482148314841485148614871488148914901491149214931494149514961497149814991500150115021503150415051506150715081509151015111512151315141515151615171518151915201521152215231524152515261527152815291530153115321533153415351536153715381539154015411542154315441545154615471548154915501551155215531554155515561557155815591560156115621563156415651566156715681569157015711572157315741575157615771578157915801581158215831584158515861587158815891590159115921593159415951596159715981599160016011602160316041605160616071608160916101611161216131614161516161617161816191620162116221623162416251626162716281629163016311632163316341635163616371638163916401641164216431644164516461647164816491650165116521653165416551656165716581659166016611662166316641665166616671668166916701671167216731674167516761677167816791680168116821683168416851686168716881689169016911692169316941695169616971698169917001701170217031704170517061707170817091710171117121713171417151716171717181719172017211722172317241725172617271728172917301731173217331734173517361737173817391740174117421743174417451746174717481749175017511752175317541755175617571758175917601761176217631764176517661767176817691770177117721773177417751776177717781779178017811782178317841785178617871788178917901791179217931794179517961797179817991800180118021803180418051806180718081809181018111812181318141815181618171818181918201821182218231824182518261827182818291830183118321833183418351836183718381839184018411842184318441845184618471848184918501851185218531854185518561857185818591860186118621863186418651866186718681869187018711872187318741875187618771878187918801881188218831884188518861887188818891890189118921893189418951896189718981899190019011902190319041905190619071908190919101911191219131914191519161917191819191920192119221923192419251926192719281929193019311932193319341935193619371938193919401941194219431944194519461947194819491950195119521953195419551956195719581959196019611962196319641965196619671968196919701971197219731974197519761977197819791980198119821983198419851986198719881989199019911992199319941995199619971998199920002001200220032004200520062007
  1. // SPDX-License-Identifier: GPL-3.0-or-later
  2. #include "replication.h"
  3. #include "Judy.h"
  4. #define STREAMING_START_MAX_SENDER_BUFFER_PERCENTAGE_ALLOWED 50ULL
  5. #define MAX_REPLICATION_MESSAGE_PERCENT_SENDER_BUFFER 25ULL
  6. #define MAX_SENDER_BUFFER_PERCENTAGE_ALLOWED 50ULL
  7. #define MIN_SENDER_BUFFER_PERCENTAGE_ALLOWED 10ULL
  8. #define WORKER_JOB_FIND_NEXT 1
  9. #define WORKER_JOB_QUERYING 2
  10. #define WORKER_JOB_DELETE_ENTRY 3
  11. #define WORKER_JOB_FIND_CHART 4
  12. #define WORKER_JOB_PREPARE_QUERY 5
  13. #define WORKER_JOB_CHECK_CONSISTENCY 6
  14. #define WORKER_JOB_BUFFER_COMMIT 7
  15. #define WORKER_JOB_CLEANUP 8
  16. #define WORKER_JOB_WAIT 9
  17. // master thread worker jobs
  18. #define WORKER_JOB_STATISTICS 10
  19. #define WORKER_JOB_CUSTOM_METRIC_PENDING_REQUESTS 11
  20. #define WORKER_JOB_CUSTOM_METRIC_SKIPPED_NO_ROOM 12
  21. #define WORKER_JOB_CUSTOM_METRIC_COMPLETION 13
  22. #define WORKER_JOB_CUSTOM_METRIC_ADDED 14
  23. #define WORKER_JOB_CUSTOM_METRIC_DONE 15
  24. #define WORKER_JOB_CUSTOM_METRIC_SENDER_RESETS 16
  25. #define WORKER_JOB_CUSTOM_METRIC_SENDER_FULL 17
  26. #define ITERATIONS_IDLE_WITHOUT_PENDING_TO_RUN_SENDER_VERIFICATION 30
  27. #define SECONDS_TO_RESET_POINT_IN_TIME 10
  28. static struct replication_query_statistics replication_queries = {
  29. .spinlock = NETDATA_SPINLOCK_INITIALIZER,
  30. .queries_started = 0,
  31. .queries_finished = 0,
  32. .points_read = 0,
  33. .points_generated = 0,
  34. };
  35. struct replication_query_statistics replication_get_query_statistics(void) {
  36. spinlock_lock(&replication_queries.spinlock);
  37. struct replication_query_statistics ret = replication_queries;
  38. spinlock_unlock(&replication_queries.spinlock);
  39. return ret;
  40. }
  41. size_t replication_buffers_allocated = 0;
  42. size_t replication_allocated_buffers(void) {
  43. return __atomic_load_n(&replication_buffers_allocated, __ATOMIC_RELAXED);
  44. }
  45. // ----------------------------------------------------------------------------
  46. // sending replication replies
  47. struct replication_dimension {
  48. STORAGE_POINT sp;
  49. struct storage_engine_query_handle handle;
  50. bool enabled;
  51. bool skip;
  52. DICTIONARY *dict;
  53. const DICTIONARY_ITEM *rda;
  54. RRDDIM *rd;
  55. };
  56. struct replication_query {
  57. RRDSET *st;
  58. struct {
  59. time_t first_entry_t;
  60. time_t last_entry_t;
  61. } db;
  62. struct { // what the parent requested
  63. time_t after;
  64. time_t before;
  65. bool enable_streaming;
  66. } request;
  67. struct { // what the child will do
  68. time_t after;
  69. time_t before;
  70. bool enable_streaming;
  71. bool locked_data_collection;
  72. bool execute;
  73. bool interrupted;
  74. STREAM_CAPABILITIES capabilities;
  75. } query;
  76. time_t wall_clock_time;
  77. size_t points_read;
  78. size_t points_generated;
  79. STORAGE_ENGINE_BACKEND backend;
  80. struct replication_request *rq;
  81. size_t dimensions;
  82. struct replication_dimension data[];
  83. };
  84. static struct replication_query *replication_query_prepare(
  85. RRDSET *st,
  86. time_t db_first_entry,
  87. time_t db_last_entry,
  88. time_t requested_after,
  89. time_t requested_before,
  90. bool requested_enable_streaming,
  91. time_t query_after,
  92. time_t query_before,
  93. bool query_enable_streaming,
  94. time_t wall_clock_time,
  95. STREAM_CAPABILITIES capabilities
  96. ) {
  97. size_t dimensions = rrdset_number_of_dimensions(st);
  98. struct replication_query *q = callocz(1, sizeof(struct replication_query) + dimensions * sizeof(struct replication_dimension));
  99. __atomic_add_fetch(&replication_buffers_allocated, sizeof(struct replication_query) + dimensions * sizeof(struct replication_dimension), __ATOMIC_RELAXED);
  100. q->dimensions = dimensions;
  101. q->st = st;
  102. q->db.first_entry_t = db_first_entry;
  103. q->db.last_entry_t = db_last_entry;
  104. q->request.after = requested_after,
  105. q->request.before = requested_before,
  106. q->request.enable_streaming = requested_enable_streaming,
  107. q->query.after = query_after;
  108. q->query.before = query_before;
  109. q->query.enable_streaming = query_enable_streaming;
  110. q->query.capabilities = capabilities;
  111. q->wall_clock_time = wall_clock_time;
  112. if (!q->dimensions || !q->query.after || !q->query.before) {
  113. q->query.execute = false;
  114. q->dimensions = 0;
  115. return q;
  116. }
  117. if(q->query.enable_streaming) {
  118. spinlock_lock(&st->data_collection_lock);
  119. q->query.locked_data_collection = true;
  120. if (st->last_updated.tv_sec > q->query.before) {
  121. #ifdef NETDATA_LOG_REPLICATION_REQUESTS
  122. internal_error(true,
  123. "STREAM_SENDER REPLAY: 'host:%s/chart:%s' "
  124. "has start_streaming = true, "
  125. "adjusting replication before timestamp from %llu to %llu",
  126. rrdhost_hostname(st->rrdhost), rrdset_id(st),
  127. (unsigned long long) q->query.before,
  128. (unsigned long long) st->last_updated.tv_sec
  129. );
  130. #endif
  131. q->query.before = MIN(st->last_updated.tv_sec, wall_clock_time);
  132. }
  133. }
  134. q->backend = st->rrdhost->db[0].eng->backend;
  135. // prepare our array of dimensions
  136. size_t count = 0;
  137. RRDDIM *rd;
  138. rrddim_foreach_read(rd, st) {
  139. if (unlikely(!rd || !rd_dfe.item || !rrddim_check_exposed(rd)))
  140. continue;
  141. if (unlikely(rd_dfe.counter >= q->dimensions)) {
  142. internal_error(true,
  143. "STREAM_SENDER REPLAY ERROR: 'host:%s/chart:%s' has more dimensions than the replicated ones",
  144. rrdhost_hostname(st->rrdhost), rrdset_id(st));
  145. break;
  146. }
  147. struct replication_dimension *d = &q->data[rd_dfe.counter];
  148. d->dict = rd_dfe.dict;
  149. d->rda = dictionary_acquired_item_dup(rd_dfe.dict, rd_dfe.item);
  150. d->rd = rd;
  151. storage_engine_query_init(q->backend, rd->tiers[0].db_metric_handle, &d->handle, q->query.after, q->query.before,
  152. q->query.locked_data_collection ? STORAGE_PRIORITY_HIGH : STORAGE_PRIORITY_LOW);
  153. d->enabled = true;
  154. d->skip = false;
  155. count++;
  156. }
  157. rrddim_foreach_done(rd);
  158. if(!count) {
  159. // no data for this chart
  160. q->query.execute = false;
  161. if(q->query.locked_data_collection) {
  162. spinlock_unlock(&st->data_collection_lock);
  163. q->query.locked_data_collection = false;
  164. }
  165. }
  166. else {
  167. // we have data for this chart
  168. q->query.execute = true;
  169. }
  170. return q;
  171. }
  172. static void replication_send_chart_collection_state(BUFFER *wb, RRDSET *st, STREAM_CAPABILITIES capabilities) {
  173. NUMBER_ENCODING encoding = (capabilities & STREAM_CAP_IEEE754) ? NUMBER_ENCODING_BASE64 : NUMBER_ENCODING_DECIMAL;
  174. RRDDIM *rd;
  175. rrddim_foreach_read(rd, st){
  176. if (!rrddim_check_exposed(rd)) continue;
  177. buffer_fast_strcat(wb, PLUGINSD_KEYWORD_REPLAY_RRDDIM_STATE " '",
  178. sizeof(PLUGINSD_KEYWORD_REPLAY_RRDDIM_STATE) - 1 + 2);
  179. buffer_fast_strcat(wb, rrddim_id(rd), string_strlen(rd->id));
  180. buffer_fast_strcat(wb, "' ", 2);
  181. buffer_print_uint64_encoded(wb, encoding, (usec_t) rd->collector.last_collected_time.tv_sec * USEC_PER_SEC +
  182. (usec_t) rd->collector.last_collected_time.tv_usec);
  183. buffer_fast_strcat(wb, " ", 1);
  184. buffer_print_int64_encoded(wb, encoding, rd->collector.last_collected_value);
  185. buffer_fast_strcat(wb, " ", 1);
  186. buffer_print_netdata_double_encoded(wb, encoding, rd->collector.last_calculated_value);
  187. buffer_fast_strcat(wb, " ", 1);
  188. buffer_print_netdata_double_encoded(wb, encoding, rd->collector.last_stored_value);
  189. buffer_fast_strcat(wb, "\n", 1);
  190. }
  191. rrddim_foreach_done(rd);
  192. buffer_fast_strcat(wb, PLUGINSD_KEYWORD_REPLAY_RRDSET_STATE " ", sizeof(PLUGINSD_KEYWORD_REPLAY_RRDSET_STATE) - 1 + 1);
  193. buffer_print_uint64_encoded(wb, encoding, (usec_t) st->last_collected_time.tv_sec * USEC_PER_SEC + (usec_t) st->last_collected_time.tv_usec);
  194. buffer_fast_strcat(wb, " ", 1);
  195. buffer_print_uint64_encoded(wb, encoding, (usec_t) st->last_updated.tv_sec * USEC_PER_SEC + (usec_t) st->last_updated.tv_usec);
  196. buffer_fast_strcat(wb, "\n", 1);
  197. }
  198. static void replication_query_finalize(BUFFER *wb, struct replication_query *q, bool executed) {
  199. size_t dimensions = q->dimensions;
  200. if(wb && q->query.enable_streaming)
  201. replication_send_chart_collection_state(wb, q->st, q->query.capabilities);
  202. if(q->query.locked_data_collection) {
  203. spinlock_unlock(&q->st->data_collection_lock);
  204. q->query.locked_data_collection = false;
  205. }
  206. // release all the dictionary items acquired
  207. // finalize the queries
  208. size_t queries = 0;
  209. for (size_t i = 0; i < dimensions; i++) {
  210. struct replication_dimension *d = &q->data[i];
  211. if (unlikely(!d->enabled)) continue;
  212. storage_engine_query_finalize(&d->handle);
  213. dictionary_acquired_item_release(d->dict, d->rda);
  214. // update global statistics
  215. queries++;
  216. }
  217. if(executed) {
  218. spinlock_lock(&replication_queries.spinlock);
  219. replication_queries.queries_started += queries;
  220. replication_queries.queries_finished += queries;
  221. replication_queries.points_read += q->points_read;
  222. replication_queries.points_generated += q->points_generated;
  223. if(q->st && q->st->rrdhost->sender) {
  224. struct sender_state *s = q->st->rrdhost->sender;
  225. s->replication.latest_completed_before_t = q->query.before;
  226. }
  227. spinlock_unlock(&replication_queries.spinlock);
  228. }
  229. __atomic_sub_fetch(&replication_buffers_allocated, sizeof(struct replication_query) + dimensions * sizeof(struct replication_dimension), __ATOMIC_RELAXED);
  230. freez(q);
  231. }
  232. static void replication_query_align_to_optimal_before(struct replication_query *q) {
  233. if(!q->query.execute || q->query.enable_streaming)
  234. return;
  235. size_t dimensions = q->dimensions;
  236. time_t expanded_before = 0;
  237. for (size_t i = 0; i < dimensions; i++) {
  238. struct replication_dimension *d = &q->data[i];
  239. if(unlikely(!d->enabled)) continue;
  240. time_t new_before = storage_engine_align_to_optimal_before(&d->handle);
  241. if (!expanded_before || new_before < expanded_before)
  242. expanded_before = new_before;
  243. }
  244. if(expanded_before > q->query.before && // it is later than the original
  245. (expanded_before - q->query.before) / q->st->update_every < 1024 && // it is reasonable (up to a page)
  246. expanded_before < q->st->last_updated.tv_sec && // it is not the chart's last updated time
  247. expanded_before < q->wall_clock_time) // it is not later than the wall clock time
  248. q->query.before = expanded_before;
  249. }
  250. static bool replication_query_execute(BUFFER *wb, struct replication_query *q, size_t max_msg_size) {
  251. replication_query_align_to_optimal_before(q);
  252. NUMBER_ENCODING encoding = (q->query.capabilities & STREAM_CAP_IEEE754) ? NUMBER_ENCODING_BASE64 : NUMBER_ENCODING_DECIMAL;
  253. time_t after = q->query.after;
  254. time_t before = q->query.before;
  255. size_t dimensions = q->dimensions;
  256. time_t wall_clock_time = q->wall_clock_time;
  257. bool finished_with_gap = false;
  258. size_t points_read = 0, points_generated = 0;
  259. #ifdef NETDATA_LOG_REPLICATION_REQUESTS
  260. time_t actual_after = 0, actual_before = 0;
  261. #endif
  262. time_t now = after + 1;
  263. time_t last_end_time_in_buffer = 0;
  264. while(now <= before) {
  265. time_t min_start_time = 0, max_start_time = 0, min_end_time = 0, max_end_time = 0, min_update_every = 0, max_update_every = 0;
  266. for (size_t i = 0; i < dimensions ;i++) {
  267. struct replication_dimension *d = &q->data[i];
  268. if(unlikely(!d->enabled || d->skip)) continue;
  269. // fetch the first valid point for the dimension
  270. int max_skip = 1000;
  271. while(d->sp.end_time_s < now && !storage_engine_query_is_finished(&d->handle) && max_skip-- >= 0) {
  272. d->sp = storage_engine_query_next_metric(&d->handle);
  273. points_read++;
  274. }
  275. if(max_skip <= 0) {
  276. d->skip = true;
  277. error_limit_static_global_var(erl, 1, 0);
  278. error_limit(&erl,
  279. "STREAM_SENDER REPLAY ERROR: 'host:%s/chart:%s/dim:%s': db does not advance the query "
  280. "beyond time %llu (tried 1000 times to get the next point and always got back a point in the past)",
  281. rrdhost_hostname(q->st->rrdhost), rrdset_id(q->st), rrddim_id(d->rd),
  282. (unsigned long long) now);
  283. continue;
  284. }
  285. if(unlikely(d->sp.end_time_s < now || d->sp.end_time_s < d->sp.start_time_s))
  286. // this dimension does not provide any data
  287. continue;
  288. time_t update_every = d->sp.end_time_s - d->sp.start_time_s;
  289. if(unlikely(!update_every))
  290. update_every = q->st->update_every;
  291. if(unlikely(!min_update_every))
  292. min_update_every = update_every;
  293. if(unlikely(!min_start_time))
  294. min_start_time = d->sp.start_time_s;
  295. if(unlikely(!min_end_time))
  296. min_end_time = d->sp.end_time_s;
  297. min_update_every = MIN(min_update_every, update_every);
  298. max_update_every = MAX(max_update_every, update_every);
  299. min_start_time = MIN(min_start_time, d->sp.start_time_s);
  300. max_start_time = MAX(max_start_time, d->sp.start_time_s);
  301. min_end_time = MIN(min_end_time, d->sp.end_time_s);
  302. max_end_time = MAX(max_end_time, d->sp.end_time_s);
  303. }
  304. if (unlikely(min_update_every != max_update_every ||
  305. min_start_time != max_start_time)) {
  306. time_t fix_min_start_time;
  307. if(last_end_time_in_buffer &&
  308. last_end_time_in_buffer >= min_start_time &&
  309. last_end_time_in_buffer <= max_start_time) {
  310. fix_min_start_time = last_end_time_in_buffer;
  311. }
  312. else
  313. fix_min_start_time = min_end_time - min_update_every;
  314. #ifdef NETDATA_INTERNAL_CHECKS
  315. error_limit_static_global_var(erl, 1, 0);
  316. error_limit(&erl, "REPLAY WARNING: 'host:%s/chart:%s' "
  317. "misaligned dimensions, "
  318. "update every (min: %ld, max: %ld), "
  319. "start time (min: %ld, max: %ld), "
  320. "end time (min %ld, max %ld), "
  321. "now %ld, last end time sent %ld, "
  322. "min start time is fixed to %ld",
  323. rrdhost_hostname(q->st->rrdhost), rrdset_id(q->st),
  324. min_update_every, max_update_every,
  325. min_start_time, max_start_time,
  326. min_end_time, max_end_time,
  327. now, last_end_time_in_buffer,
  328. fix_min_start_time
  329. );
  330. #endif
  331. min_start_time = fix_min_start_time;
  332. }
  333. if(likely(min_start_time <= now && min_end_time >= now)) {
  334. // we have a valid point
  335. if (unlikely(min_end_time == min_start_time))
  336. min_start_time = min_end_time - q->st->update_every;
  337. #ifdef NETDATA_LOG_REPLICATION_REQUESTS
  338. if (unlikely(!actual_after))
  339. actual_after = min_end_time;
  340. actual_before = min_end_time;
  341. #endif
  342. if(buffer_strlen(wb) > max_msg_size && last_end_time_in_buffer) {
  343. q->query.before = last_end_time_in_buffer;
  344. q->query.enable_streaming = false;
  345. internal_error(true, "REPLICATION: current buffer size %zu is more than the "
  346. "max message size %zu for chart '%s' of host '%s'. "
  347. "Interrupting replication request (%ld to %ld, %s) at %ld to %ld, %s.",
  348. buffer_strlen(wb), max_msg_size, rrdset_id(q->st), rrdhost_hostname(q->st->rrdhost),
  349. q->request.after, q->request.before, q->request.enable_streaming?"true":"false",
  350. q->query.after, q->query.before, q->query.enable_streaming?"true":"false");
  351. q->query.interrupted = true;
  352. break;
  353. }
  354. last_end_time_in_buffer = min_end_time;
  355. buffer_fast_strcat(wb, PLUGINSD_KEYWORD_REPLAY_BEGIN " '' ", sizeof(PLUGINSD_KEYWORD_REPLAY_BEGIN) - 1 + 4);
  356. buffer_print_uint64_encoded(wb, encoding, min_start_time);
  357. buffer_fast_strcat(wb, " ", 1);
  358. buffer_print_uint64_encoded(wb, encoding, min_end_time);
  359. buffer_fast_strcat(wb, " ", 1);
  360. buffer_print_uint64_encoded(wb, encoding, wall_clock_time);
  361. buffer_fast_strcat(wb, "\n", 1);
  362. // output the replay values for this time
  363. for (size_t i = 0; i < dimensions; i++) {
  364. struct replication_dimension *d = &q->data[i];
  365. if (unlikely(!d->enabled)) continue;
  366. if (likely( d->sp.start_time_s <= min_end_time &&
  367. d->sp.end_time_s >= min_end_time &&
  368. !storage_point_is_unset(d->sp) &&
  369. !storage_point_is_gap(d->sp))) {
  370. buffer_fast_strcat(wb, PLUGINSD_KEYWORD_REPLAY_SET " \"", sizeof(PLUGINSD_KEYWORD_REPLAY_SET) - 1 + 2);
  371. buffer_fast_strcat(wb, rrddim_id(d->rd), string_strlen(d->rd->id));
  372. buffer_fast_strcat(wb, "\" ", 2);
  373. buffer_print_netdata_double_encoded(wb, encoding, d->sp.sum);
  374. buffer_fast_strcat(wb, " ", 1);
  375. buffer_print_sn_flags(wb, d->sp.flags, q->query.capabilities & STREAM_CAP_INTERPOLATED);
  376. buffer_fast_strcat(wb, "\n", 1);
  377. points_generated++;
  378. }
  379. }
  380. now = min_end_time + 1;
  381. }
  382. else if(unlikely(min_end_time < now))
  383. // the query does not progress
  384. break;
  385. else {
  386. // we have gap - all points are in the future
  387. now = min_start_time;
  388. if(min_start_time > before && !points_generated) {
  389. before = q->query.before = min_start_time - 1;
  390. finished_with_gap = true;
  391. break;
  392. }
  393. }
  394. }
  395. #ifdef NETDATA_LOG_REPLICATION_REQUESTS
  396. if(actual_after) {
  397. char actual_after_buf[LOG_DATE_LENGTH + 1], actual_before_buf[LOG_DATE_LENGTH + 1];
  398. log_date(actual_after_buf, LOG_DATE_LENGTH, actual_after);
  399. log_date(actual_before_buf, LOG_DATE_LENGTH, actual_before);
  400. internal_error(true,
  401. "STREAM_SENDER REPLAY: 'host:%s/chart:%s': sending data %llu [%s] to %llu [%s] (requested %llu [delta %lld] to %llu [delta %lld])",
  402. rrdhost_hostname(q->st->rrdhost), rrdset_id(q->st),
  403. (unsigned long long)actual_after, actual_after_buf, (unsigned long long)actual_before, actual_before_buf,
  404. (unsigned long long)after, (long long)(actual_after - after), (unsigned long long)before, (long long)(actual_before - before));
  405. }
  406. else
  407. internal_error(true,
  408. "STREAM_SENDER REPLAY: 'host:%s/chart:%s': nothing to send (requested %llu to %llu)",
  409. rrdhost_hostname(q->st->rrdhost), rrdset_id(q->st),
  410. (unsigned long long)after, (unsigned long long)before);
  411. #endif // NETDATA_LOG_REPLICATION_REQUESTS
  412. q->points_read += points_read;
  413. q->points_generated += points_generated;
  414. if(last_end_time_in_buffer < before - q->st->update_every)
  415. finished_with_gap = true;
  416. return finished_with_gap;
  417. }
  418. static struct replication_query *replication_response_prepare(
  419. RRDSET *st,
  420. bool requested_enable_streaming,
  421. time_t requested_after,
  422. time_t requested_before,
  423. STREAM_CAPABILITIES capabilities
  424. ) {
  425. time_t wall_clock_time = now_realtime_sec();
  426. if(requested_after > requested_before) {
  427. // flip them
  428. time_t t = requested_before;
  429. requested_before = requested_after;
  430. requested_after = t;
  431. }
  432. if(requested_after > wall_clock_time) {
  433. requested_after = 0;
  434. requested_before = 0;
  435. requested_enable_streaming = true;
  436. }
  437. if(requested_before > wall_clock_time) {
  438. requested_before = wall_clock_time;
  439. requested_enable_streaming = true;
  440. }
  441. time_t query_after = requested_after;
  442. time_t query_before = requested_before;
  443. bool query_enable_streaming = requested_enable_streaming;
  444. time_t db_first_entry = 0, db_last_entry = 0;
  445. rrdset_get_retention_of_tier_for_collected_chart(
  446. st, &db_first_entry, &db_last_entry, wall_clock_time, 0);
  447. if(requested_after == 0 && requested_before == 0 && requested_enable_streaming == true) {
  448. // no data requested - just enable streaming
  449. ;
  450. }
  451. else {
  452. if (query_after < db_first_entry)
  453. query_after = db_first_entry;
  454. if (query_before > db_last_entry)
  455. query_before = db_last_entry;
  456. // if the parent asked us to start streaming, then fill the rest with the data that we have
  457. if (requested_enable_streaming)
  458. query_before = db_last_entry;
  459. if (query_after > query_before) {
  460. time_t tmp = query_before;
  461. query_before = query_after;
  462. query_after = tmp;
  463. }
  464. query_enable_streaming = (requested_enable_streaming ||
  465. query_before == db_last_entry ||
  466. !requested_after ||
  467. !requested_before) ? true : false;
  468. }
  469. return replication_query_prepare(
  470. st,
  471. db_first_entry, db_last_entry,
  472. requested_after, requested_before, requested_enable_streaming,
  473. query_after, query_before, query_enable_streaming,
  474. wall_clock_time, capabilities);
  475. }
  476. void replication_response_cancel_and_finalize(struct replication_query *q) {
  477. replication_query_finalize(NULL, q, false);
  478. }
  479. static bool sender_is_still_connected_for_this_request(struct replication_request *rq);
  480. bool replication_response_execute_and_finalize(struct replication_query *q, size_t max_msg_size) {
  481. NUMBER_ENCODING encoding = (q->query.capabilities & STREAM_CAP_IEEE754) ? NUMBER_ENCODING_BASE64 : NUMBER_ENCODING_DECIMAL;
  482. struct replication_request *rq = q->rq;
  483. RRDSET *st = q->st;
  484. RRDHOST *host = st->rrdhost;
  485. // we might want to optimize this by filling a temporary buffer
  486. // and copying the result to the host's buffer in order to avoid
  487. // holding the host's buffer lock for too long
  488. BUFFER *wb = sender_start(host->sender);
  489. buffer_fast_strcat(wb, PLUGINSD_KEYWORD_REPLAY_BEGIN " '", sizeof(PLUGINSD_KEYWORD_REPLAY_BEGIN) - 1 + 2);
  490. buffer_fast_strcat(wb, rrdset_id(st), string_strlen(st->id));
  491. buffer_fast_strcat(wb, "'\n", 2);
  492. // buffer_sprintf(wb, PLUGINSD_KEYWORD_REPLAY_BEGIN " \"%s\"\n", rrdset_id(st));
  493. bool locked_data_collection = q->query.locked_data_collection;
  494. q->query.locked_data_collection = false;
  495. bool finished_with_gap = false;
  496. if(q->query.execute)
  497. finished_with_gap = replication_query_execute(wb, q, max_msg_size);
  498. time_t after = q->request.after;
  499. time_t before = q->query.before;
  500. bool enable_streaming = q->query.enable_streaming;
  501. replication_query_finalize(wb, q, q->query.execute);
  502. q = NULL; // IMPORTANT: q is invalid now
  503. // get a fresh retention to send to the parent
  504. time_t wall_clock_time = now_realtime_sec();
  505. time_t db_first_entry, db_last_entry;
  506. rrdset_get_retention_of_tier_for_collected_chart(st, &db_first_entry, &db_last_entry, wall_clock_time, 0);
  507. // end with first/last entries we have, and the first start time and
  508. // last end time of the data we sent
  509. buffer_fast_strcat(wb, PLUGINSD_KEYWORD_REPLAY_END " ", sizeof(PLUGINSD_KEYWORD_REPLAY_END) - 1 + 1);
  510. buffer_print_int64_encoded(wb, encoding, st->update_every);
  511. buffer_fast_strcat(wb, " ", 1);
  512. buffer_print_uint64_encoded(wb, encoding, db_first_entry);
  513. buffer_fast_strcat(wb, " ", 1);
  514. buffer_print_uint64_encoded(wb, encoding, db_last_entry);
  515. buffer_fast_strcat(wb, enable_streaming ? " true " : " false ", 7);
  516. buffer_print_uint64_encoded(wb, encoding, after);
  517. buffer_fast_strcat(wb, " ", 1);
  518. buffer_print_uint64_encoded(wb, encoding, before);
  519. buffer_fast_strcat(wb, " ", 1);
  520. buffer_print_uint64_encoded(wb, encoding, wall_clock_time);
  521. buffer_fast_strcat(wb, "\n", 1);
  522. worker_is_busy(WORKER_JOB_BUFFER_COMMIT);
  523. sender_commit(host->sender, wb, STREAM_TRAFFIC_TYPE_REPLICATION);
  524. worker_is_busy(WORKER_JOB_CLEANUP);
  525. if(enable_streaming) {
  526. if(sender_is_still_connected_for_this_request(rq)) {
  527. // enable normal streaming if we have to
  528. // but only if the sender buffer has not been flushed since we started
  529. if(rrdset_flag_check(st, RRDSET_FLAG_SENDER_REPLICATION_IN_PROGRESS)) {
  530. rrdset_flag_clear(st, RRDSET_FLAG_SENDER_REPLICATION_IN_PROGRESS);
  531. rrdset_flag_set(st, RRDSET_FLAG_SENDER_REPLICATION_FINISHED);
  532. rrdhost_sender_replicating_charts_minus_one(st->rrdhost);
  533. if(!finished_with_gap)
  534. st->upstream_resync_time_s = 0;
  535. #ifdef NETDATA_LOG_REPLICATION_REQUESTS
  536. internal_error(true, "STREAM_SENDER REPLAY: 'host:%s/chart:%s' streaming starts",
  537. rrdhost_hostname(st->rrdhost), rrdset_id(st));
  538. #endif
  539. }
  540. else
  541. internal_error(true, "REPLAY ERROR: 'host:%s/chart:%s' received start streaming command, but the chart is not in progress replicating",
  542. rrdhost_hostname(st->rrdhost), rrdset_id(st));
  543. }
  544. }
  545. if(locked_data_collection)
  546. spinlock_unlock(&st->data_collection_lock);
  547. return enable_streaming;
  548. }
  549. // ----------------------------------------------------------------------------
  550. // sending replication requests
  551. struct replication_request_details {
  552. struct {
  553. send_command callback;
  554. void *data;
  555. } caller;
  556. RRDHOST *host;
  557. RRDSET *st;
  558. struct {
  559. time_t first_entry_t; // the first entry time the child has
  560. time_t last_entry_t; // the last entry time the child has
  561. time_t wall_clock_time; // the current time of the child
  562. bool fixed_last_entry; // when set we set the last entry to wall clock time
  563. } child_db;
  564. struct {
  565. time_t first_entry_t; // the first entry time we have
  566. time_t last_entry_t; // the last entry time we have
  567. time_t wall_clock_time; // the current local world clock time
  568. } local_db;
  569. struct {
  570. time_t from; // the starting time of the entire gap we have
  571. time_t to; // the ending time of the entire gap we have
  572. } gap;
  573. struct {
  574. time_t after; // the start time we requested previously from this child
  575. time_t before; // the end time we requested previously from this child
  576. } last_request;
  577. struct {
  578. time_t after; // the start time of this replication request - the child will add 1 second
  579. time_t before; // the end time of this replication request
  580. bool start_streaming; // true when we want the child to send anything remaining and start streaming - the child will overwrite 'before'
  581. } wanted;
  582. };
  583. static void replicate_log_request(struct replication_request_details *r, const char *msg) {
  584. #ifdef NETDATA_INTERNAL_CHECKS
  585. internal_error(true,
  586. #else
  587. error_limit_static_global_var(erl, 1, 0);
  588. error_limit(&erl,
  589. #endif
  590. "REPLAY ERROR: 'host:%s/chart:%s' child sent: "
  591. "db from %ld to %ld%s, wall clock time %ld, "
  592. "last request from %ld to %ld, "
  593. "issue: %s - "
  594. "sending replication request from %ld to %ld, start streaming %s",
  595. rrdhost_hostname(r->st->rrdhost), rrdset_id(r->st),
  596. r->child_db.first_entry_t,
  597. r->child_db.last_entry_t, r->child_db.fixed_last_entry ? " (fixed)" : "",
  598. r->child_db.wall_clock_time,
  599. r->last_request.after,
  600. r->last_request.before,
  601. msg,
  602. r->wanted.after,
  603. r->wanted.before,
  604. r->wanted.start_streaming ? "true" : "false");
  605. }
  606. static bool send_replay_chart_cmd(struct replication_request_details *r, const char *msg, bool log) {
  607. RRDSET *st = r->st;
  608. if(log)
  609. replicate_log_request(r, msg);
  610. if(st->rrdhost->receiver && (!st->rrdhost->receiver->replication_first_time_t || r->wanted.after < st->rrdhost->receiver->replication_first_time_t))
  611. st->rrdhost->receiver->replication_first_time_t = r->wanted.after;
  612. #ifdef NETDATA_LOG_REPLICATION_REQUESTS
  613. st->replay.log_next_data_collection = true;
  614. char wanted_after_buf[LOG_DATE_LENGTH + 1] = "", wanted_before_buf[LOG_DATE_LENGTH + 1] = "";
  615. if(r->wanted.after)
  616. log_date(wanted_after_buf, LOG_DATE_LENGTH, r->wanted.after);
  617. if(r->wanted.before)
  618. log_date(wanted_before_buf, LOG_DATE_LENGTH, r->wanted.before);
  619. internal_error(true,
  620. "REPLAY: 'host:%s/chart:%s' sending replication request %ld [%s] to %ld [%s], start streaming '%s': %s: "
  621. "last[%ld - %ld] child[%ld - %ld, now %ld %s] local[%ld - %ld, now %ld] gap[%ld - %ld %s] %s"
  622. , rrdhost_hostname(r->host), rrdset_id(r->st)
  623. , r->wanted.after, wanted_after_buf
  624. , r->wanted.before, wanted_before_buf
  625. , r->wanted.start_streaming ? "YES" : "NO"
  626. , msg
  627. , r->last_request.after, r->last_request.before
  628. , r->child_db.first_entry_t, r->child_db.last_entry_t
  629. , r->child_db.wall_clock_time, (r->child_db.wall_clock_time == r->local_db.wall_clock_time) ? "SAME" : (r->child_db.wall_clock_time < r->local_db.wall_clock_time) ? "BEHIND" : "AHEAD"
  630. , r->local_db.first_entry_t, r->local_db.last_entry_t
  631. , r->local_db.wall_clock_time
  632. , r->gap.from, r->gap.to
  633. , (r->gap.from == r->wanted.after) ? "FULL" : "PARTIAL"
  634. , (st->replay.after != 0 || st->replay.before != 0) ? "OVERLAPPING" : ""
  635. );
  636. st->replay.start_streaming = r->wanted.start_streaming;
  637. st->replay.after = r->wanted.after;
  638. st->replay.before = r->wanted.before;
  639. #endif // NETDATA_LOG_REPLICATION_REQUESTS
  640. char buffer[2048 + 1];
  641. snprintfz(buffer, 2048, PLUGINSD_KEYWORD_REPLAY_CHART " \"%s\" \"%s\" %llu %llu\n",
  642. rrdset_id(st), r->wanted.start_streaming ? "true" : "false",
  643. (unsigned long long)r->wanted.after, (unsigned long long)r->wanted.before);
  644. ssize_t ret = r->caller.callback(buffer, r->caller.data);
  645. if (ret < 0) {
  646. netdata_log_error("REPLAY ERROR: 'host:%s/chart:%s' failed to send replication request to child (error %zd)",
  647. rrdhost_hostname(r->host), rrdset_id(r->st), ret);
  648. return false;
  649. }
  650. return true;
  651. }
  652. bool replicate_chart_request(send_command callback, void *callback_data, RRDHOST *host, RRDSET *st,
  653. time_t child_first_entry, time_t child_last_entry, time_t child_wall_clock_time,
  654. time_t prev_first_entry_wanted, time_t prev_last_entry_wanted)
  655. {
  656. struct replication_request_details r = {
  657. .caller = {
  658. .callback = callback,
  659. .data = callback_data,
  660. },
  661. .host = host,
  662. .st = st,
  663. .child_db = {
  664. .first_entry_t = child_first_entry,
  665. .last_entry_t = child_last_entry,
  666. .wall_clock_time = child_wall_clock_time,
  667. .fixed_last_entry = false,
  668. },
  669. .local_db = {
  670. .first_entry_t = 0,
  671. .last_entry_t = 0,
  672. .wall_clock_time = now_realtime_sec(),
  673. },
  674. .last_request = {
  675. .after = prev_first_entry_wanted,
  676. .before = prev_last_entry_wanted,
  677. },
  678. .wanted = {
  679. .after = 0,
  680. .before = 0,
  681. .start_streaming = true,
  682. },
  683. };
  684. if(r.child_db.last_entry_t > r.child_db.wall_clock_time) {
  685. replicate_log_request(&r, "child's db last entry > child's wall clock time");
  686. r.child_db.last_entry_t = r.child_db.wall_clock_time;
  687. r.child_db.fixed_last_entry = true;
  688. }
  689. rrdset_get_retention_of_tier_for_collected_chart(r.st, &r.local_db.first_entry_t, &r.local_db.last_entry_t, r.local_db.wall_clock_time, 0);
  690. // let's find the GAP we have
  691. if(!r.last_request.after || !r.last_request.before) {
  692. // there is no previous request
  693. if(r.local_db.last_entry_t)
  694. // we have some data, let's continue from the last point we have
  695. r.gap.from = r.local_db.last_entry_t;
  696. else
  697. // we don't have any data, the gap is the max timeframe we are allowed to replicate
  698. r.gap.from = r.local_db.wall_clock_time - r.host->rrdpush_seconds_to_replicate;
  699. }
  700. else {
  701. // we had sent a request - let's continue at the point we left it
  702. // for this we don't take into account the actual data in our db
  703. // because the child may also have gaps, and we need to get over it
  704. r.gap.from = r.last_request.before;
  705. }
  706. // we want all the data up to now
  707. r.gap.to = r.local_db.wall_clock_time;
  708. // The gap is now r.gap.from -> r.gap.to
  709. if (unlikely(!rrdhost_option_check(host, RRDHOST_OPTION_REPLICATION)))
  710. return send_replay_chart_cmd(&r, "empty replication request, replication is disabled", false);
  711. if (unlikely(!rrdset_number_of_dimensions(st)))
  712. return send_replay_chart_cmd(&r, "empty replication request, chart has no dimensions", false);
  713. if (unlikely(!r.child_db.first_entry_t || !r.child_db.last_entry_t))
  714. return send_replay_chart_cmd(&r, "empty replication request, child has no stored data", false);
  715. if (unlikely(r.child_db.first_entry_t < 0 || r.child_db.last_entry_t < 0))
  716. return send_replay_chart_cmd(&r, "empty replication request, child db timestamps are invalid", true);
  717. if (unlikely(r.child_db.first_entry_t > r.child_db.wall_clock_time))
  718. return send_replay_chart_cmd(&r, "empty replication request, child db first entry is after its wall clock time", true);
  719. if (unlikely(r.child_db.first_entry_t > r.child_db.last_entry_t))
  720. return send_replay_chart_cmd(&r, "empty replication request, child timings are invalid (first entry > last entry)", true);
  721. if (unlikely(r.local_db.last_entry_t > r.child_db.last_entry_t))
  722. return send_replay_chart_cmd(&r, "empty replication request, local last entry is later than the child one", false);
  723. // let's find what the child can provide to fill that gap
  724. if(r.child_db.first_entry_t > r.gap.from)
  725. // the child does not have all the data - let's get what it has
  726. r.wanted.after = r.child_db.first_entry_t;
  727. else
  728. // ok, the child can fill the entire gap we have
  729. r.wanted.after = r.gap.from;
  730. if(r.gap.to - r.wanted.after > host->rrdpush_replication_step)
  731. // the duration is too big for one request - let's take the first step
  732. r.wanted.before = r.wanted.after + host->rrdpush_replication_step;
  733. else
  734. // wow, we can do it in one request
  735. r.wanted.before = r.gap.to;
  736. // don't ask from the child more than it has
  737. if(r.wanted.before > r.child_db.last_entry_t)
  738. r.wanted.before = r.child_db.last_entry_t;
  739. if(r.wanted.after > r.wanted.before) {
  740. r.wanted.after = 0;
  741. r.wanted.before = 0;
  742. r.wanted.start_streaming = true;
  743. return send_replay_chart_cmd(&r, "empty replication request, wanted after computed bigger than wanted before", true);
  744. }
  745. // the child should start streaming immediately if the wanted duration is small, or we reached the last entry of the child
  746. r.wanted.start_streaming = (r.local_db.wall_clock_time - r.wanted.after <= host->rrdpush_replication_step ||
  747. r.wanted.before >= r.child_db.last_entry_t ||
  748. r.wanted.before >= r.child_db.wall_clock_time ||
  749. r.wanted.before >= r.local_db.wall_clock_time);
  750. // the wanted timeframe is now r.wanted.after -> r.wanted.before
  751. // send it
  752. return send_replay_chart_cmd(&r, "OK", false);
  753. }
  754. // ----------------------------------------------------------------------------
  755. // replication thread
  756. // replication request in sender DICTIONARY
  757. // used for de-duplicating the requests
  758. struct replication_request {
  759. struct sender_state *sender; // the sender we should put the reply at
  760. STRING *chart_id; // the chart of the request
  761. time_t after; // the start time of the query (maybe zero) key for sorting (JudyL)
  762. time_t before; // the end time of the query (maybe zero)
  763. usec_t sender_last_flush_ut; // the timestamp of the sender, at the time we indexed this request
  764. Word_t unique_id; // auto-increment, later requests have bigger
  765. bool start_streaming; // true, when the parent wants to send the rest of the data (before is overwritten) and enable normal streaming
  766. bool indexed_in_judy; // true when the request is indexed in judy
  767. bool not_indexed_buffer_full; // true when the request is not indexed because the sender is full
  768. bool not_indexed_preprocessing; // true when the request is not indexed, but it is pending in preprocessing
  769. // prepare ahead members - preprocessing
  770. bool found; // used as a result boolean for the find call
  771. bool executed; // used to detect if we have skipped requests while preprocessing
  772. RRDSET *st; // caching of the chart during preprocessing
  773. struct replication_query *q; // the preprocessing query initialization
  774. };
  775. // replication sort entry in JudyL array
  776. // used for sorting all requests, across all nodes
  777. struct replication_sort_entry {
  778. struct replication_request *rq;
  779. size_t unique_id; // used as a key to identify the sort entry - we never access its contents
  780. };
  781. #define MAX_REPLICATION_THREADS 20 // + 1 for the main thread
  782. // the global variables for the replication thread
  783. static struct replication_thread {
  784. ARAL *aral_rse;
  785. SPINLOCK spinlock;
  786. struct {
  787. size_t pending; // number of requests pending in the queue
  788. // statistics
  789. size_t added; // number of requests added to the queue
  790. size_t removed; // number of requests removed from the queue
  791. size_t pending_no_room; // number of requests skipped, because the sender has no room for responses
  792. size_t senders_full; // number of times a sender reset our last position in the queue
  793. size_t sender_resets; // number of times a sender reset our last position in the queue
  794. time_t first_time_t; // the minimum 'after' we encountered
  795. struct {
  796. Word_t after;
  797. Word_t unique_id;
  798. Pvoid_t JudyL_array;
  799. } queue;
  800. } unsafe; // protected from replication_recursive_lock()
  801. struct {
  802. Word_t unique_id; // the last unique id we gave to a request (auto-increment, starting from 1)
  803. size_t executed; // the number of replication requests executed
  804. size_t latest_first_time; // the 'after' timestamp of the last request we executed
  805. size_t memory; // the total memory allocated by replication
  806. } atomic; // access should be with atomic operations
  807. struct {
  808. size_t last_executed; // caching of the atomic.executed to report number of requests executed since last time
  809. netdata_thread_t **threads_ptrs;
  810. size_t threads;
  811. } main_thread; // access is allowed only by the main thread
  812. } replication_globals = {
  813. .aral_rse = NULL,
  814. .spinlock = NETDATA_SPINLOCK_INITIALIZER,
  815. .unsafe = {
  816. .pending = 0,
  817. .added = 0,
  818. .removed = 0,
  819. .pending_no_room = 0,
  820. .sender_resets = 0,
  821. .senders_full = 0,
  822. .first_time_t = 0,
  823. .queue = {
  824. .after = 0,
  825. .unique_id = 0,
  826. .JudyL_array = NULL,
  827. },
  828. },
  829. .atomic = {
  830. .unique_id = 0,
  831. .executed = 0,
  832. .latest_first_time = 0,
  833. .memory = 0,
  834. },
  835. .main_thread = {
  836. .last_executed = 0,
  837. .threads = 0,
  838. .threads_ptrs = NULL,
  839. },
  840. };
  841. size_t replication_allocated_memory(void) {
  842. return __atomic_load_n(&replication_globals.atomic.memory, __ATOMIC_RELAXED);
  843. }
  844. #define replication_set_latest_first_time(t) __atomic_store_n(&replication_globals.atomic.latest_first_time, t, __ATOMIC_RELAXED)
  845. #define replication_get_latest_first_time() __atomic_load_n(&replication_globals.atomic.latest_first_time, __ATOMIC_RELAXED)
  846. static inline bool replication_recursive_lock_mode(char mode) {
  847. static __thread int recursions = 0;
  848. if(mode == 'L') { // (L)ock
  849. if(++recursions == 1)
  850. spinlock_lock(&replication_globals.spinlock);
  851. }
  852. else if(mode == 'U') { // (U)nlock
  853. if(--recursions == 0)
  854. spinlock_unlock(&replication_globals.spinlock);
  855. }
  856. else if(mode == 'C') { // (C)heck
  857. if(recursions > 0)
  858. return true;
  859. else
  860. return false;
  861. }
  862. else
  863. fatal("REPLICATION: unknown lock mode '%c'", mode);
  864. #ifdef NETDATA_INTERNAL_CHECKS
  865. if(recursions < 0)
  866. fatal("REPLICATION: recursions is %d", recursions);
  867. #endif
  868. return true;
  869. }
  870. #define replication_recursive_lock() replication_recursive_lock_mode('L')
  871. #define replication_recursive_unlock() replication_recursive_lock_mode('U')
  872. #define fatal_when_replication_is_not_locked_for_me() do { \
  873. if(!replication_recursive_lock_mode('C')) \
  874. fatal("REPLICATION: reached %s, but replication is not locked by this thread.", __FUNCTION__); \
  875. } while(0)
  876. void replication_set_next_point_in_time(time_t after, size_t unique_id) {
  877. replication_recursive_lock();
  878. replication_globals.unsafe.queue.after = after;
  879. replication_globals.unsafe.queue.unique_id = unique_id;
  880. replication_recursive_unlock();
  881. }
  882. // ----------------------------------------------------------------------------
  883. // replication sort entry management
  884. static inline struct replication_sort_entry *replication_sort_entry_create(struct replication_request *rq) {
  885. struct replication_sort_entry *rse = aral_mallocz(replication_globals.aral_rse);
  886. __atomic_add_fetch(&replication_globals.atomic.memory, sizeof(struct replication_sort_entry), __ATOMIC_RELAXED);
  887. rrdpush_sender_pending_replication_requests_plus_one(rq->sender);
  888. // copy the request
  889. rse->rq = rq;
  890. rse->unique_id = __atomic_add_fetch(&replication_globals.atomic.unique_id, 1, __ATOMIC_SEQ_CST);
  891. // save the unique id into the request, to be able to delete it later
  892. rq->unique_id = rse->unique_id;
  893. rq->indexed_in_judy = false;
  894. rq->not_indexed_buffer_full = false;
  895. rq->not_indexed_preprocessing = false;
  896. return rse;
  897. }
  898. static void replication_sort_entry_destroy(struct replication_sort_entry *rse) {
  899. aral_freez(replication_globals.aral_rse, rse);
  900. __atomic_sub_fetch(&replication_globals.atomic.memory, sizeof(struct replication_sort_entry), __ATOMIC_RELAXED);
  901. }
  902. static void replication_sort_entry_add(struct replication_request *rq) {
  903. if(unlikely(rrdpush_sender_replication_buffer_full_get(rq->sender))) {
  904. rq->indexed_in_judy = false;
  905. rq->not_indexed_buffer_full = true;
  906. rq->not_indexed_preprocessing = false;
  907. replication_recursive_lock();
  908. replication_globals.unsafe.pending_no_room++;
  909. replication_recursive_unlock();
  910. return;
  911. }
  912. // cache this, because it will be changed
  913. bool decrement_no_room = rq->not_indexed_buffer_full;
  914. struct replication_sort_entry *rse = replication_sort_entry_create(rq);
  915. replication_recursive_lock();
  916. if(decrement_no_room)
  917. replication_globals.unsafe.pending_no_room--;
  918. // if(rq->after < (time_t)replication_globals.protected.queue.after &&
  919. // rq->sender->buffer_used_percentage <= MAX_SENDER_BUFFER_PERCENTAGE_ALLOWED &&
  920. // !replication_globals.protected.skipped_no_room_since_last_reset) {
  921. //
  922. // // make it find this request first
  923. // replication_set_next_point_in_time(rq->after, rq->unique_id);
  924. // }
  925. replication_globals.unsafe.added++;
  926. replication_globals.unsafe.pending++;
  927. Pvoid_t *inner_judy_ptr;
  928. // find the outer judy entry, using after as key
  929. size_t mem_before_outer_judyl = JudyLMemUsed(replication_globals.unsafe.queue.JudyL_array);
  930. inner_judy_ptr = JudyLIns(&replication_globals.unsafe.queue.JudyL_array, (Word_t) rq->after, PJE0);
  931. size_t mem_after_outer_judyl = JudyLMemUsed(replication_globals.unsafe.queue.JudyL_array);
  932. if(unlikely(!inner_judy_ptr || inner_judy_ptr == PJERR))
  933. fatal("REPLICATION: corrupted outer judyL");
  934. // add it to the inner judy, using unique_id as key
  935. size_t mem_before_inner_judyl = JudyLMemUsed(*inner_judy_ptr);
  936. Pvoid_t *item = JudyLIns(inner_judy_ptr, rq->unique_id, PJE0);
  937. size_t mem_after_inner_judyl = JudyLMemUsed(*inner_judy_ptr);
  938. if(unlikely(!item || item == PJERR))
  939. fatal("REPLICATION: corrupted inner judyL");
  940. *item = rse;
  941. rq->indexed_in_judy = true;
  942. rq->not_indexed_buffer_full = false;
  943. rq->not_indexed_preprocessing = false;
  944. if(!replication_globals.unsafe.first_time_t || rq->after < replication_globals.unsafe.first_time_t)
  945. replication_globals.unsafe.first_time_t = rq->after;
  946. replication_recursive_unlock();
  947. __atomic_add_fetch(&replication_globals.atomic.memory, (mem_after_inner_judyl - mem_before_inner_judyl) + (mem_after_outer_judyl - mem_before_outer_judyl), __ATOMIC_RELAXED);
  948. }
  949. static bool replication_sort_entry_unlink_and_free_unsafe(struct replication_sort_entry *rse, Pvoid_t **inner_judy_ppptr, bool preprocessing) {
  950. fatal_when_replication_is_not_locked_for_me();
  951. bool inner_judy_deleted = false;
  952. replication_globals.unsafe.removed++;
  953. replication_globals.unsafe.pending--;
  954. rrdpush_sender_pending_replication_requests_minus_one(rse->rq->sender);
  955. rse->rq->indexed_in_judy = false;
  956. rse->rq->not_indexed_preprocessing = preprocessing;
  957. size_t memory_saved = 0;
  958. // delete it from the inner judy
  959. size_t mem_before_inner_judyl = JudyLMemUsed(**inner_judy_ppptr);
  960. JudyLDel(*inner_judy_ppptr, rse->rq->unique_id, PJE0);
  961. size_t mem_after_inner_judyl = JudyLMemUsed(**inner_judy_ppptr);
  962. memory_saved = mem_before_inner_judyl - mem_after_inner_judyl;
  963. // if no items left, delete it from the outer judy
  964. if(**inner_judy_ppptr == NULL) {
  965. size_t mem_before_outer_judyl = JudyLMemUsed(replication_globals.unsafe.queue.JudyL_array);
  966. JudyLDel(&replication_globals.unsafe.queue.JudyL_array, rse->rq->after, PJE0);
  967. size_t mem_after_outer_judyl = JudyLMemUsed(replication_globals.unsafe.queue.JudyL_array);
  968. memory_saved += mem_before_outer_judyl - mem_after_outer_judyl;
  969. inner_judy_deleted = true;
  970. }
  971. // free memory
  972. replication_sort_entry_destroy(rse);
  973. __atomic_sub_fetch(&replication_globals.atomic.memory, memory_saved, __ATOMIC_RELAXED);
  974. return inner_judy_deleted;
  975. }
  976. static void replication_sort_entry_del(struct replication_request *rq, bool buffer_full) {
  977. Pvoid_t *inner_judy_pptr;
  978. struct replication_sort_entry *rse_to_delete = NULL;
  979. replication_recursive_lock();
  980. if(rq->indexed_in_judy) {
  981. inner_judy_pptr = JudyLGet(replication_globals.unsafe.queue.JudyL_array, rq->after, PJE0);
  982. if (inner_judy_pptr) {
  983. Pvoid_t *our_item_pptr = JudyLGet(*inner_judy_pptr, rq->unique_id, PJE0);
  984. if (our_item_pptr) {
  985. rse_to_delete = *our_item_pptr;
  986. replication_sort_entry_unlink_and_free_unsafe(rse_to_delete, &inner_judy_pptr, false);
  987. if(buffer_full) {
  988. replication_globals.unsafe.pending_no_room++;
  989. rq->not_indexed_buffer_full = true;
  990. }
  991. }
  992. }
  993. if (!rse_to_delete)
  994. fatal("REPLAY: 'host:%s/chart:%s' Cannot find sort entry to delete for time %ld.",
  995. rrdhost_hostname(rq->sender->host), string2str(rq->chart_id), rq->after);
  996. }
  997. replication_recursive_unlock();
  998. }
  999. static struct replication_request replication_request_get_first_available() {
  1000. Pvoid_t *inner_judy_pptr;
  1001. replication_recursive_lock();
  1002. struct replication_request rq_to_return = (struct replication_request){ .found = false };
  1003. if(unlikely(!replication_globals.unsafe.queue.after || !replication_globals.unsafe.queue.unique_id)) {
  1004. replication_globals.unsafe.queue.after = 0;
  1005. replication_globals.unsafe.queue.unique_id = 0;
  1006. }
  1007. Word_t started_after = replication_globals.unsafe.queue.after;
  1008. size_t round = 0;
  1009. while(!rq_to_return.found) {
  1010. round++;
  1011. if(round > 2)
  1012. break;
  1013. if(round == 2) {
  1014. if(started_after == 0)
  1015. break;
  1016. replication_globals.unsafe.queue.after = 0;
  1017. replication_globals.unsafe.queue.unique_id = 0;
  1018. }
  1019. bool find_same_after = true;
  1020. while (!rq_to_return.found && (inner_judy_pptr = JudyLFirstThenNext(replication_globals.unsafe.queue.JudyL_array, &replication_globals.unsafe.queue.after, &find_same_after))) {
  1021. Pvoid_t *our_item_pptr;
  1022. if(unlikely(round == 2 && replication_globals.unsafe.queue.after > started_after))
  1023. break;
  1024. while (!rq_to_return.found && (our_item_pptr = JudyLNext(*inner_judy_pptr, &replication_globals.unsafe.queue.unique_id, PJE0))) {
  1025. struct replication_sort_entry *rse = *our_item_pptr;
  1026. struct replication_request *rq = rse->rq;
  1027. // copy the request to return it
  1028. rq_to_return = *rq;
  1029. rq_to_return.chart_id = string_dup(rq_to_return.chart_id);
  1030. // set the return result to found
  1031. rq_to_return.found = true;
  1032. if (replication_sort_entry_unlink_and_free_unsafe(rse, &inner_judy_pptr, true))
  1033. // we removed the item from the outer JudyL
  1034. break;
  1035. }
  1036. // prepare for the next iteration on the outer loop
  1037. replication_globals.unsafe.queue.unique_id = 0;
  1038. }
  1039. }
  1040. replication_recursive_unlock();
  1041. return rq_to_return;
  1042. }
  1043. // ----------------------------------------------------------------------------
  1044. // replication request management
  1045. static void replication_request_react_callback(const DICTIONARY_ITEM *item __maybe_unused, void *value __maybe_unused, void *sender_state __maybe_unused) {
  1046. struct sender_state *s = sender_state; (void)s;
  1047. struct replication_request *rq = value;
  1048. // IMPORTANT:
  1049. // We use the react instead of the insert callback
  1050. // because we want the item to be atomically visible
  1051. // to our replication thread, immediately after.
  1052. // If we put this at the insert callback, the item is not guaranteed
  1053. // to be atomically visible to others, so the replication thread
  1054. // may see the replication sort entry, but fail to find the dictionary item
  1055. // related to it.
  1056. replication_sort_entry_add(rq);
  1057. // this request is about a unique chart for this sender
  1058. rrdpush_sender_replicating_charts_plus_one(s);
  1059. }
  1060. static bool replication_request_conflict_callback(const DICTIONARY_ITEM *item __maybe_unused, void *old_value, void *new_value, void *sender_state) {
  1061. struct sender_state *s = sender_state; (void)s;
  1062. struct replication_request *rq = old_value; (void)rq;
  1063. struct replication_request *rq_new = new_value;
  1064. replication_recursive_lock();
  1065. if(!rq->indexed_in_judy && rq->not_indexed_buffer_full && !rq->not_indexed_preprocessing) {
  1066. // we can replace this command
  1067. internal_error(
  1068. true,
  1069. "STREAM %s [send to %s]: REPLAY: 'host:%s/chart:%s' replacing duplicate replication command received (existing from %llu to %llu [%s], new from %llu to %llu [%s])",
  1070. rrdhost_hostname(s->host), s->connected_to, rrdhost_hostname(s->host), dictionary_acquired_item_name(item),
  1071. (unsigned long long)rq->after, (unsigned long long)rq->before, rq->start_streaming ? "true" : "false",
  1072. (unsigned long long)rq_new->after, (unsigned long long)rq_new->before, rq_new->start_streaming ? "true" : "false");
  1073. rq->after = rq_new->after;
  1074. rq->before = rq_new->before;
  1075. rq->start_streaming = rq_new->start_streaming;
  1076. }
  1077. else if(!rq->indexed_in_judy && !rq->not_indexed_preprocessing) {
  1078. replication_sort_entry_add(rq);
  1079. internal_error(
  1080. true,
  1081. "STREAM %s [send to %s]: REPLAY: 'host:%s/chart:%s' adding duplicate replication command received (existing from %llu to %llu [%s], new from %llu to %llu [%s])",
  1082. rrdhost_hostname(s->host), s->connected_to, rrdhost_hostname(s->host), dictionary_acquired_item_name(item),
  1083. (unsigned long long)rq->after, (unsigned long long)rq->before, rq->start_streaming ? "true" : "false",
  1084. (unsigned long long)rq_new->after, (unsigned long long)rq_new->before, rq_new->start_streaming ? "true" : "false");
  1085. }
  1086. else {
  1087. internal_error(
  1088. true,
  1089. "STREAM %s [send to %s]: REPLAY: 'host:%s/chart:%s' ignoring duplicate replication command received (existing from %llu to %llu [%s], new from %llu to %llu [%s])",
  1090. rrdhost_hostname(s->host), s->connected_to, rrdhost_hostname(s->host),
  1091. dictionary_acquired_item_name(item),
  1092. (unsigned long long) rq->after, (unsigned long long) rq->before, rq->start_streaming ? "true" : "false",
  1093. (unsigned long long) rq_new->after, (unsigned long long) rq_new->before, rq_new->start_streaming ? "true" : "false");
  1094. }
  1095. replication_recursive_unlock();
  1096. string_freez(rq_new->chart_id);
  1097. return false;
  1098. }
  1099. static void replication_request_delete_callback(const DICTIONARY_ITEM *item __maybe_unused, void *value, void *sender_state __maybe_unused) {
  1100. struct replication_request *rq = value;
  1101. // this request is about a unique chart for this sender
  1102. rrdpush_sender_replicating_charts_minus_one(rq->sender);
  1103. if(rq->indexed_in_judy)
  1104. replication_sort_entry_del(rq, false);
  1105. else if(rq->not_indexed_buffer_full) {
  1106. replication_recursive_lock();
  1107. replication_globals.unsafe.pending_no_room--;
  1108. replication_recursive_unlock();
  1109. }
  1110. string_freez(rq->chart_id);
  1111. }
  1112. static bool sender_is_still_connected_for_this_request(struct replication_request *rq) {
  1113. return rq->sender_last_flush_ut == rrdpush_sender_get_flush_time(rq->sender);
  1114. };
  1115. static bool replication_execute_request(struct replication_request *rq, bool workers) {
  1116. bool ret = false;
  1117. if(!rq->st) {
  1118. if(likely(workers))
  1119. worker_is_busy(WORKER_JOB_FIND_CHART);
  1120. rq->st = rrdset_find(rq->sender->host, string2str(rq->chart_id));
  1121. }
  1122. if(!rq->st) {
  1123. internal_error(true, "REPLAY ERROR: 'host:%s/chart:%s' not found",
  1124. rrdhost_hostname(rq->sender->host), string2str(rq->chart_id));
  1125. goto cleanup;
  1126. }
  1127. netdata_thread_disable_cancelability();
  1128. if(!rq->q) {
  1129. if(likely(workers))
  1130. worker_is_busy(WORKER_JOB_PREPARE_QUERY);
  1131. rq->q = replication_response_prepare(
  1132. rq->st,
  1133. rq->start_streaming,
  1134. rq->after,
  1135. rq->before,
  1136. rq->sender->capabilities);
  1137. }
  1138. if(likely(workers))
  1139. worker_is_busy(WORKER_JOB_QUERYING);
  1140. // send the replication data
  1141. rq->q->rq = rq;
  1142. replication_response_execute_and_finalize(
  1143. rq->q, (size_t)((unsigned long long)rq->sender->host->sender->buffer->max_size * MAX_REPLICATION_MESSAGE_PERCENT_SENDER_BUFFER / 100ULL));
  1144. rq->q = NULL;
  1145. netdata_thread_enable_cancelability();
  1146. __atomic_add_fetch(&replication_globals.atomic.executed, 1, __ATOMIC_RELAXED);
  1147. ret = true;
  1148. cleanup:
  1149. if(rq->q) {
  1150. replication_response_cancel_and_finalize(rq->q);
  1151. rq->q = NULL;
  1152. }
  1153. string_freez(rq->chart_id);
  1154. worker_is_idle();
  1155. return ret;
  1156. }
  1157. // ----------------------------------------------------------------------------
  1158. // public API
  1159. void replication_add_request(struct sender_state *sender, const char *chart_id, time_t after, time_t before, bool start_streaming) {
  1160. struct replication_request rq = {
  1161. .sender = sender,
  1162. .chart_id = string_strdupz(chart_id),
  1163. .after = after,
  1164. .before = before,
  1165. .start_streaming = start_streaming,
  1166. .sender_last_flush_ut = rrdpush_sender_get_flush_time(sender),
  1167. .indexed_in_judy = false,
  1168. .not_indexed_buffer_full = false,
  1169. .not_indexed_preprocessing = false,
  1170. };
  1171. if(!sender->replication.oldest_request_after_t || rq.after < sender->replication.oldest_request_after_t)
  1172. sender->replication.oldest_request_after_t = rq.after;
  1173. if(start_streaming && rrdpush_sender_get_buffer_used_percent(sender) <= STREAMING_START_MAX_SENDER_BUFFER_PERCENTAGE_ALLOWED)
  1174. replication_execute_request(&rq, false);
  1175. else
  1176. dictionary_set(sender->replication.requests, chart_id, &rq, sizeof(struct replication_request));
  1177. }
  1178. void replication_sender_delete_pending_requests(struct sender_state *sender) {
  1179. // allow the dictionary destructor to go faster on locks
  1180. dictionary_flush(sender->replication.requests);
  1181. }
  1182. void replication_init_sender(struct sender_state *sender) {
  1183. sender->replication.requests = dictionary_create_advanced(DICT_OPTION_DONT_OVERWRITE_VALUE | DICT_OPTION_FIXED_SIZE,
  1184. NULL, sizeof(struct replication_request));
  1185. dictionary_register_react_callback(sender->replication.requests, replication_request_react_callback, sender);
  1186. dictionary_register_conflict_callback(sender->replication.requests, replication_request_conflict_callback, sender);
  1187. dictionary_register_delete_callback(sender->replication.requests, replication_request_delete_callback, sender);
  1188. }
  1189. void replication_cleanup_sender(struct sender_state *sender) {
  1190. // allow the dictionary destructor to go faster on locks
  1191. replication_recursive_lock();
  1192. dictionary_destroy(sender->replication.requests);
  1193. replication_recursive_unlock();
  1194. }
  1195. void replication_recalculate_buffer_used_ratio_unsafe(struct sender_state *s) {
  1196. size_t available = cbuffer_available_size_unsafe(s->host->sender->buffer);
  1197. size_t percentage = (s->buffer->max_size - available) * 100 / s->buffer->max_size;
  1198. if(unlikely(percentage > MAX_SENDER_BUFFER_PERCENTAGE_ALLOWED && !rrdpush_sender_replication_buffer_full_get(s))) {
  1199. rrdpush_sender_replication_buffer_full_set(s, true);
  1200. struct replication_request *rq;
  1201. dfe_start_read(s->replication.requests, rq) {
  1202. if(rq->indexed_in_judy)
  1203. replication_sort_entry_del(rq, true);
  1204. }
  1205. dfe_done(rq);
  1206. replication_recursive_lock();
  1207. replication_globals.unsafe.senders_full++;
  1208. replication_recursive_unlock();
  1209. }
  1210. else if(unlikely(percentage < MIN_SENDER_BUFFER_PERCENTAGE_ALLOWED && rrdpush_sender_replication_buffer_full_get(s))) {
  1211. rrdpush_sender_replication_buffer_full_set(s, false);
  1212. struct replication_request *rq;
  1213. dfe_start_read(s->replication.requests, rq) {
  1214. if(!rq->indexed_in_judy && (rq->not_indexed_buffer_full || rq->not_indexed_preprocessing))
  1215. replication_sort_entry_add(rq);
  1216. }
  1217. dfe_done(rq);
  1218. replication_recursive_lock();
  1219. replication_globals.unsafe.senders_full--;
  1220. replication_globals.unsafe.sender_resets++;
  1221. // replication_set_next_point_in_time(0, 0);
  1222. replication_recursive_unlock();
  1223. }
  1224. rrdpush_sender_set_buffer_used_percent(s, percentage);
  1225. }
  1226. // ----------------------------------------------------------------------------
  1227. // replication thread
  1228. static size_t verify_host_charts_are_streaming_now(RRDHOST *host) {
  1229. internal_error(
  1230. host->sender &&
  1231. !rrdpush_sender_pending_replication_requests(host->sender) &&
  1232. dictionary_entries(host->sender->replication.requests) != 0,
  1233. "REPLICATION SUMMARY: 'host:%s' reports %zu pending replication requests, but its chart replication index says there are %zu charts pending replication",
  1234. rrdhost_hostname(host),
  1235. rrdpush_sender_pending_replication_requests(host->sender),
  1236. dictionary_entries(host->sender->replication.requests)
  1237. );
  1238. size_t ok = 0;
  1239. size_t errors = 0;
  1240. RRDSET *st;
  1241. rrdset_foreach_read(st, host) {
  1242. RRDSET_FLAGS flags = rrdset_flag_check(st, RRDSET_FLAG_SENDER_REPLICATION_IN_PROGRESS | RRDSET_FLAG_SENDER_REPLICATION_FINISHED);
  1243. bool is_error = false;
  1244. if(!flags) {
  1245. internal_error(
  1246. true,
  1247. "REPLICATION SUMMARY: 'host:%s/chart:%s' is neither IN PROGRESS nor FINISHED",
  1248. rrdhost_hostname(host), rrdset_id(st)
  1249. );
  1250. is_error = true;
  1251. }
  1252. if(!(flags & RRDSET_FLAG_SENDER_REPLICATION_FINISHED) || (flags & RRDSET_FLAG_SENDER_REPLICATION_IN_PROGRESS)) {
  1253. internal_error(
  1254. true,
  1255. "REPLICATION SUMMARY: 'host:%s/chart:%s' is IN PROGRESS although replication is finished",
  1256. rrdhost_hostname(host), rrdset_id(st)
  1257. );
  1258. is_error = true;
  1259. }
  1260. if(is_error)
  1261. errors++;
  1262. else
  1263. ok++;
  1264. }
  1265. rrdset_foreach_done(st);
  1266. internal_error(errors,
  1267. "REPLICATION SUMMARY: 'host:%s' finished replicating %zu charts, but %zu charts are still in progress although replication finished",
  1268. rrdhost_hostname(host), ok, errors);
  1269. return errors;
  1270. }
  1271. static void verify_all_hosts_charts_are_streaming_now(void) {
  1272. worker_is_busy(WORKER_JOB_CHECK_CONSISTENCY);
  1273. size_t errors = 0;
  1274. RRDHOST *host;
  1275. dfe_start_read(rrdhost_root_index, host)
  1276. errors += verify_host_charts_are_streaming_now(host);
  1277. dfe_done(host);
  1278. size_t executed = __atomic_load_n(&replication_globals.atomic.executed, __ATOMIC_RELAXED);
  1279. netdata_log_info("REPLICATION SUMMARY: finished, executed %zu replication requests, %zu charts pending replication",
  1280. executed - replication_globals.main_thread.last_executed, errors);
  1281. replication_globals.main_thread.last_executed = executed;
  1282. }
  1283. static void replication_initialize_workers(bool master) {
  1284. worker_register("REPLICATION");
  1285. worker_register_job_name(WORKER_JOB_FIND_NEXT, "find next");
  1286. worker_register_job_name(WORKER_JOB_QUERYING, "querying");
  1287. worker_register_job_name(WORKER_JOB_DELETE_ENTRY, "dict delete");
  1288. worker_register_job_name(WORKER_JOB_FIND_CHART, "find chart");
  1289. worker_register_job_name(WORKER_JOB_PREPARE_QUERY, "prepare query");
  1290. worker_register_job_name(WORKER_JOB_CHECK_CONSISTENCY, "check consistency");
  1291. worker_register_job_name(WORKER_JOB_BUFFER_COMMIT, "commit");
  1292. worker_register_job_name(WORKER_JOB_CLEANUP, "cleanup");
  1293. worker_register_job_name(WORKER_JOB_WAIT, "wait");
  1294. if(master) {
  1295. worker_register_job_name(WORKER_JOB_STATISTICS, "statistics");
  1296. worker_register_job_custom_metric(WORKER_JOB_CUSTOM_METRIC_PENDING_REQUESTS, "pending requests", "requests", WORKER_METRIC_ABSOLUTE);
  1297. worker_register_job_custom_metric(WORKER_JOB_CUSTOM_METRIC_SKIPPED_NO_ROOM, "no room requests", "requests", WORKER_METRIC_ABSOLUTE);
  1298. worker_register_job_custom_metric(WORKER_JOB_CUSTOM_METRIC_COMPLETION, "completion", "%", WORKER_METRIC_ABSOLUTE);
  1299. worker_register_job_custom_metric(WORKER_JOB_CUSTOM_METRIC_ADDED, "added requests", "requests/s", WORKER_METRIC_INCREMENTAL_TOTAL);
  1300. worker_register_job_custom_metric(WORKER_JOB_CUSTOM_METRIC_DONE, "finished requests", "requests/s", WORKER_METRIC_INCREMENTAL_TOTAL);
  1301. worker_register_job_custom_metric(WORKER_JOB_CUSTOM_METRIC_SENDER_RESETS, "sender resets", "resets/s", WORKER_METRIC_INCREMENTAL_TOTAL);
  1302. worker_register_job_custom_metric(WORKER_JOB_CUSTOM_METRIC_SENDER_FULL, "senders full", "senders", WORKER_METRIC_ABSOLUTE);
  1303. }
  1304. }
  1305. #define REQUEST_OK (0)
  1306. #define REQUEST_QUEUE_EMPTY (-1)
  1307. #define REQUEST_CHART_NOT_FOUND (-2)
  1308. static __thread struct replication_thread_pipeline {
  1309. int max_requests_ahead;
  1310. struct replication_request *rqs;
  1311. int rqs_last_executed, rqs_last_prepared;
  1312. size_t queue_rounds;
  1313. } rtp = {
  1314. .max_requests_ahead = 0,
  1315. .rqs = NULL,
  1316. .rqs_last_executed = 0,
  1317. .rqs_last_prepared = 0,
  1318. .queue_rounds = 0,
  1319. };
  1320. static void replication_pipeline_cancel_and_cleanup(void) {
  1321. if(!rtp.rqs)
  1322. return;
  1323. struct replication_request *rq;
  1324. size_t cancelled = 0;
  1325. do {
  1326. if (++rtp.rqs_last_executed >= rtp.max_requests_ahead)
  1327. rtp.rqs_last_executed = 0;
  1328. rq = &rtp.rqs[rtp.rqs_last_executed];
  1329. if (rq->q) {
  1330. internal_fatal(rq->executed, "REPLAY FATAL: query has already been executed!");
  1331. internal_fatal(!rq->found, "REPLAY FATAL: orphan q in rq");
  1332. replication_response_cancel_and_finalize(rq->q);
  1333. rq->q = NULL;
  1334. cancelled++;
  1335. }
  1336. rq->executed = true;
  1337. rq->found = false;
  1338. } while (rtp.rqs_last_executed != rtp.rqs_last_prepared);
  1339. internal_error(true, "REPLICATION: cancelled %zu inflight queries", cancelled);
  1340. freez(rtp.rqs);
  1341. rtp.rqs = NULL;
  1342. rtp.max_requests_ahead = 0;
  1343. rtp.rqs_last_executed = 0;
  1344. rtp.rqs_last_prepared = 0;
  1345. rtp.queue_rounds = 0;
  1346. }
  1347. static int replication_pipeline_execute_next(void) {
  1348. struct replication_request *rq;
  1349. if(unlikely(!rtp.rqs)) {
  1350. rtp.max_requests_ahead = (int)get_netdata_cpus() / 2;
  1351. if(rtp.max_requests_ahead > libuv_worker_threads * 2)
  1352. rtp.max_requests_ahead = libuv_worker_threads * 2;
  1353. if(rtp.max_requests_ahead < 2)
  1354. rtp.max_requests_ahead = 2;
  1355. rtp.rqs = callocz(rtp.max_requests_ahead, sizeof(struct replication_request));
  1356. __atomic_add_fetch(&replication_buffers_allocated, rtp.max_requests_ahead * sizeof(struct replication_request), __ATOMIC_RELAXED);
  1357. }
  1358. // fill the queue
  1359. do {
  1360. if(++rtp.rqs_last_prepared >= rtp.max_requests_ahead) {
  1361. rtp.rqs_last_prepared = 0;
  1362. rtp.queue_rounds++;
  1363. }
  1364. internal_fatal(rtp.rqs[rtp.rqs_last_prepared].q,
  1365. "REPLAY FATAL: slot is used by query that has not been executed!");
  1366. worker_is_busy(WORKER_JOB_FIND_NEXT);
  1367. rtp.rqs[rtp.rqs_last_prepared] = replication_request_get_first_available();
  1368. rq = &rtp.rqs[rtp.rqs_last_prepared];
  1369. if(rq->found) {
  1370. if (!rq->st) {
  1371. worker_is_busy(WORKER_JOB_FIND_CHART);
  1372. rq->st = rrdset_find(rq->sender->host, string2str(rq->chart_id));
  1373. }
  1374. if (rq->st && !rq->q) {
  1375. worker_is_busy(WORKER_JOB_PREPARE_QUERY);
  1376. rq->q = replication_response_prepare(
  1377. rq->st,
  1378. rq->start_streaming,
  1379. rq->after,
  1380. rq->before,
  1381. rq->sender->capabilities);
  1382. }
  1383. rq->executed = false;
  1384. }
  1385. } while(rq->found && rtp.rqs_last_prepared != rtp.rqs_last_executed);
  1386. // pick the first usable
  1387. do {
  1388. if (++rtp.rqs_last_executed >= rtp.max_requests_ahead)
  1389. rtp.rqs_last_executed = 0;
  1390. rq = &rtp.rqs[rtp.rqs_last_executed];
  1391. if(rq->found) {
  1392. internal_fatal(rq->executed, "REPLAY FATAL: query has already been executed!");
  1393. if (rq->sender_last_flush_ut != rrdpush_sender_get_flush_time(rq->sender)) {
  1394. // the sender has reconnected since this request was queued,
  1395. // we can safely throw it away, since the parent will resend it
  1396. replication_response_cancel_and_finalize(rq->q);
  1397. rq->executed = true;
  1398. rq->found = false;
  1399. rq->q = NULL;
  1400. }
  1401. else if (rrdpush_sender_replication_buffer_full_get(rq->sender)) {
  1402. // the sender buffer is full, so we can ignore this request,
  1403. // it has already been marked as 'preprocessed' in the dictionary,
  1404. // and the sender will put it back in when there is
  1405. // enough room in the buffer for processing replication requests
  1406. replication_response_cancel_and_finalize(rq->q);
  1407. rq->executed = true;
  1408. rq->found = false;
  1409. rq->q = NULL;
  1410. }
  1411. else {
  1412. // we can execute this,
  1413. // delete it from the dictionary
  1414. worker_is_busy(WORKER_JOB_DELETE_ENTRY);
  1415. dictionary_del(rq->sender->replication.requests, string2str(rq->chart_id));
  1416. }
  1417. }
  1418. else
  1419. internal_fatal(rq->q, "REPLAY FATAL: slot status says slot is empty, but it has a pending query!");
  1420. } while(!rq->found && rtp.rqs_last_executed != rtp.rqs_last_prepared);
  1421. if(unlikely(!rq->found)) {
  1422. worker_is_idle();
  1423. return REQUEST_QUEUE_EMPTY;
  1424. }
  1425. replication_set_latest_first_time(rq->after);
  1426. bool chart_found = replication_execute_request(rq, true);
  1427. rq->executed = true;
  1428. rq->found = false;
  1429. rq->q = NULL;
  1430. if(unlikely(!chart_found)) {
  1431. worker_is_idle();
  1432. return REQUEST_CHART_NOT_FOUND;
  1433. }
  1434. worker_is_idle();
  1435. return REQUEST_OK;
  1436. }
  1437. static void replication_worker_cleanup(void *ptr __maybe_unused) {
  1438. replication_pipeline_cancel_and_cleanup();
  1439. worker_unregister();
  1440. }
  1441. static void *replication_worker_thread(void *ptr) {
  1442. replication_initialize_workers(false);
  1443. netdata_thread_cleanup_push(replication_worker_cleanup, ptr);
  1444. while(service_running(SERVICE_REPLICATION)) {
  1445. if(unlikely(replication_pipeline_execute_next() == REQUEST_QUEUE_EMPTY)) {
  1446. sender_thread_buffer_free();
  1447. worker_is_busy(WORKER_JOB_WAIT);
  1448. worker_is_idle();
  1449. sleep_usec(1 * USEC_PER_SEC);
  1450. }
  1451. }
  1452. netdata_thread_cleanup_pop(1);
  1453. return NULL;
  1454. }
  1455. static void replication_main_cleanup(void *ptr) {
  1456. struct netdata_static_thread *static_thread = (struct netdata_static_thread *)ptr;
  1457. static_thread->enabled = NETDATA_MAIN_THREAD_EXITING;
  1458. replication_pipeline_cancel_and_cleanup();
  1459. int threads = (int)replication_globals.main_thread.threads;
  1460. for(int i = 0; i < threads ;i++) {
  1461. netdata_thread_join(*replication_globals.main_thread.threads_ptrs[i], NULL);
  1462. freez(replication_globals.main_thread.threads_ptrs[i]);
  1463. __atomic_sub_fetch(&replication_buffers_allocated, sizeof(netdata_thread_t), __ATOMIC_RELAXED);
  1464. }
  1465. freez(replication_globals.main_thread.threads_ptrs);
  1466. replication_globals.main_thread.threads_ptrs = NULL;
  1467. __atomic_sub_fetch(&replication_buffers_allocated, threads * sizeof(netdata_thread_t *), __ATOMIC_RELAXED);
  1468. aral_destroy(replication_globals.aral_rse);
  1469. replication_globals.aral_rse = NULL;
  1470. // custom code
  1471. worker_unregister();
  1472. static_thread->enabled = NETDATA_MAIN_THREAD_EXITED;
  1473. }
  1474. void replication_initialize(void) {
  1475. replication_globals.aral_rse = aral_create("rse", sizeof(struct replication_sort_entry),
  1476. 0, 65536, aral_by_size_statistics(),
  1477. NULL, NULL, false, false);
  1478. }
  1479. void *replication_thread_main(void *ptr __maybe_unused) {
  1480. replication_initialize_workers(true);
  1481. int threads = config_get_number(CONFIG_SECTION_DB, "replication threads", 1);
  1482. if(threads < 1 || threads > MAX_REPLICATION_THREADS) {
  1483. netdata_log_error("replication threads given %d is invalid, resetting to 1", threads);
  1484. threads = 1;
  1485. }
  1486. if(--threads) {
  1487. replication_globals.main_thread.threads = threads;
  1488. replication_globals.main_thread.threads_ptrs = mallocz(threads * sizeof(netdata_thread_t *));
  1489. __atomic_add_fetch(&replication_buffers_allocated, threads * sizeof(netdata_thread_t *), __ATOMIC_RELAXED);
  1490. for(int i = 0; i < threads ;i++) {
  1491. char tag[NETDATA_THREAD_TAG_MAX + 1];
  1492. snprintfz(tag, NETDATA_THREAD_TAG_MAX, "REPLAY[%d]", i + 2);
  1493. replication_globals.main_thread.threads_ptrs[i] = mallocz(sizeof(netdata_thread_t));
  1494. __atomic_add_fetch(&replication_buffers_allocated, sizeof(netdata_thread_t), __ATOMIC_RELAXED);
  1495. netdata_thread_create(replication_globals.main_thread.threads_ptrs[i], tag,
  1496. NETDATA_THREAD_OPTION_JOINABLE, replication_worker_thread, NULL);
  1497. }
  1498. }
  1499. netdata_thread_cleanup_push(replication_main_cleanup, ptr);
  1500. // start from 100% completed
  1501. worker_set_metric(WORKER_JOB_CUSTOM_METRIC_COMPLETION, 100.0);
  1502. long run_verification_countdown = LONG_MAX; // LONG_MAX to prevent an initial verification when no replication ever took place
  1503. bool slow = true; // control the time we sleep - it has to start with true!
  1504. usec_t last_now_mono_ut = now_monotonic_usec();
  1505. time_t replication_reset_next_point_in_time_countdown = SECONDS_TO_RESET_POINT_IN_TIME; // restart from the beginning every 10 seconds
  1506. size_t last_executed = 0;
  1507. size_t last_sender_resets = 0;
  1508. while(service_running(SERVICE_REPLICATION)) {
  1509. // statistics
  1510. usec_t now_mono_ut = now_monotonic_usec();
  1511. if(unlikely(now_mono_ut - last_now_mono_ut > default_rrd_update_every * USEC_PER_SEC)) {
  1512. last_now_mono_ut = now_mono_ut;
  1513. worker_is_busy(WORKER_JOB_STATISTICS);
  1514. replication_recursive_lock();
  1515. size_t current_executed = __atomic_load_n(&replication_globals.atomic.executed, __ATOMIC_RELAXED);
  1516. if(last_executed != current_executed) {
  1517. run_verification_countdown = ITERATIONS_IDLE_WITHOUT_PENDING_TO_RUN_SENDER_VERIFICATION;
  1518. last_executed = current_executed;
  1519. slow = false;
  1520. }
  1521. if(replication_reset_next_point_in_time_countdown-- == 0) {
  1522. // once per second, make it scan all the pending requests next time
  1523. replication_set_next_point_in_time(0, 0);
  1524. // replication_globals.protected.skipped_no_room_since_last_reset = 0;
  1525. replication_reset_next_point_in_time_countdown = SECONDS_TO_RESET_POINT_IN_TIME;
  1526. }
  1527. if(--run_verification_countdown == 0) {
  1528. if (!replication_globals.unsafe.pending && !replication_globals.unsafe.pending_no_room) {
  1529. // reset the statistics about completion percentage
  1530. replication_globals.unsafe.first_time_t = 0;
  1531. replication_set_latest_first_time(0);
  1532. verify_all_hosts_charts_are_streaming_now();
  1533. run_verification_countdown = LONG_MAX;
  1534. slow = true;
  1535. }
  1536. else
  1537. run_verification_countdown = ITERATIONS_IDLE_WITHOUT_PENDING_TO_RUN_SENDER_VERIFICATION;
  1538. }
  1539. time_t latest_first_time_t = replication_get_latest_first_time();
  1540. if(latest_first_time_t && replication_globals.unsafe.pending) {
  1541. // completion percentage statistics
  1542. time_t now = now_realtime_sec();
  1543. time_t total = now - replication_globals.unsafe.first_time_t;
  1544. time_t done = latest_first_time_t - replication_globals.unsafe.first_time_t;
  1545. worker_set_metric(WORKER_JOB_CUSTOM_METRIC_COMPLETION,
  1546. (NETDATA_DOUBLE) done * 100.0 / (NETDATA_DOUBLE) total);
  1547. }
  1548. else
  1549. worker_set_metric(WORKER_JOB_CUSTOM_METRIC_COMPLETION, 100.0);
  1550. worker_set_metric(WORKER_JOB_CUSTOM_METRIC_PENDING_REQUESTS, (NETDATA_DOUBLE)replication_globals.unsafe.pending);
  1551. worker_set_metric(WORKER_JOB_CUSTOM_METRIC_ADDED, (NETDATA_DOUBLE)replication_globals.unsafe.added);
  1552. worker_set_metric(WORKER_JOB_CUSTOM_METRIC_DONE, (NETDATA_DOUBLE)__atomic_load_n(&replication_globals.atomic.executed, __ATOMIC_RELAXED));
  1553. worker_set_metric(WORKER_JOB_CUSTOM_METRIC_SKIPPED_NO_ROOM, (NETDATA_DOUBLE)replication_globals.unsafe.pending_no_room);
  1554. worker_set_metric(WORKER_JOB_CUSTOM_METRIC_SENDER_RESETS, (NETDATA_DOUBLE)replication_globals.unsafe.sender_resets);
  1555. worker_set_metric(WORKER_JOB_CUSTOM_METRIC_SENDER_FULL, (NETDATA_DOUBLE)replication_globals.unsafe.senders_full);
  1556. replication_recursive_unlock();
  1557. worker_is_idle();
  1558. }
  1559. if(unlikely(replication_pipeline_execute_next() == REQUEST_QUEUE_EMPTY)) {
  1560. worker_is_busy(WORKER_JOB_WAIT);
  1561. replication_recursive_lock();
  1562. // the timeout also defines now frequently we will traverse all the pending requests
  1563. // when the outbound buffers of all senders is full
  1564. usec_t timeout;
  1565. if(slow) {
  1566. // no work to be done, wait for a request to come in
  1567. timeout = 1000 * USEC_PER_MS;
  1568. sender_thread_buffer_free();
  1569. }
  1570. else if(replication_globals.unsafe.pending > 0) {
  1571. if(replication_globals.unsafe.sender_resets == last_sender_resets)
  1572. timeout = 1000 * USEC_PER_MS;
  1573. else {
  1574. // there are pending requests waiting to be executed,
  1575. // but none could be executed at this time.
  1576. // try again after this time.
  1577. timeout = 100 * USEC_PER_MS;
  1578. }
  1579. last_sender_resets = replication_globals.unsafe.sender_resets;
  1580. }
  1581. else {
  1582. // no requests pending, but there were requests recently (run_verification_countdown)
  1583. // so, try in a short time.
  1584. // if this is big, one chart replicating will be slow to finish (ping - pong just one chart)
  1585. timeout = 10 * USEC_PER_MS;
  1586. last_sender_resets = replication_globals.unsafe.sender_resets;
  1587. }
  1588. replication_recursive_unlock();
  1589. worker_is_idle();
  1590. sleep_usec(timeout);
  1591. // make it scan all the pending requests next time
  1592. replication_set_next_point_in_time(0, 0);
  1593. replication_reset_next_point_in_time_countdown = SECONDS_TO_RESET_POINT_IN_TIME;
  1594. continue;
  1595. }
  1596. }
  1597. netdata_thread_cleanup_pop(1);
  1598. return NULL;
  1599. }