replication.c 79 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737738739740741742743744745746747748749750751752753754755756757758759760761762763764765766767768769770771772773774775776777778779780781782783784785786787788789790791792793794795796797798799800801802803804805806807808809810811812813814815816817818819820821822823824825826827828829830831832833834835836837838839840841842843844845846847848849850851852853854855856857858859860861862863864865866867868869870871872873874875876877878879880881882883884885886887888889890891892893894895896897898899900901902903904905906907908909910911912913914915916917918919920921922923924925926927928929930931932933934935936937938939940941942943944945946947948949950951952953954955956957958959960961962963964965966967968969970971972973974975976977978979980981982983984985986987988989990991992993994995996997998999100010011002100310041005100610071008100910101011101210131014101510161017101810191020102110221023102410251026102710281029103010311032103310341035103610371038103910401041104210431044104510461047104810491050105110521053105410551056105710581059106010611062106310641065106610671068106910701071107210731074107510761077107810791080108110821083108410851086108710881089109010911092109310941095109610971098109911001101110211031104110511061107110811091110111111121113111411151116111711181119112011211122112311241125112611271128112911301131113211331134113511361137113811391140114111421143114411451146114711481149115011511152115311541155115611571158115911601161116211631164116511661167116811691170117111721173117411751176117711781179118011811182118311841185118611871188118911901191119211931194119511961197119811991200120112021203120412051206120712081209121012111212121312141215121612171218121912201221122212231224122512261227122812291230123112321233123412351236123712381239124012411242124312441245124612471248124912501251125212531254125512561257125812591260126112621263126412651266126712681269127012711272127312741275127612771278127912801281128212831284128512861287128812891290129112921293129412951296129712981299130013011302130313041305130613071308130913101311131213131314131513161317131813191320132113221323132413251326132713281329133013311332133313341335133613371338133913401341134213431344134513461347134813491350135113521353135413551356135713581359136013611362136313641365136613671368136913701371137213731374137513761377137813791380138113821383138413851386138713881389139013911392139313941395139613971398139914001401140214031404140514061407140814091410141114121413141414151416141714181419142014211422142314241425142614271428142914301431143214331434143514361437143814391440144114421443144414451446144714481449145014511452145314541455145614571458145914601461146214631464146514661467146814691470147114721473147414751476147714781479148014811482148314841485148614871488148914901491149214931494149514961497149814991500150115021503150415051506150715081509151015111512151315141515151615171518151915201521152215231524152515261527152815291530153115321533153415351536153715381539154015411542154315441545154615471548154915501551155215531554155515561557155815591560156115621563156415651566156715681569157015711572157315741575157615771578157915801581158215831584158515861587158815891590159115921593159415951596159715981599160016011602160316041605160616071608160916101611161216131614161516161617161816191620162116221623162416251626162716281629163016311632163316341635163616371638163916401641164216431644164516461647164816491650165116521653165416551656165716581659166016611662166316641665166616671668166916701671167216731674167516761677167816791680168116821683168416851686168716881689169016911692169316941695169616971698169917001701170217031704170517061707170817091710171117121713171417151716171717181719172017211722172317241725172617271728172917301731173217331734173517361737173817391740174117421743174417451746174717481749175017511752175317541755175617571758175917601761176217631764176517661767176817691770177117721773177417751776177717781779178017811782178317841785178617871788178917901791179217931794179517961797179817991800180118021803180418051806180718081809181018111812181318141815181618171818181918201821182218231824182518261827182818291830183118321833183418351836183718381839184018411842184318441845184618471848184918501851185218531854185518561857185818591860186118621863186418651866186718681869187018711872187318741875187618771878187918801881188218831884188518861887188818891890189118921893189418951896189718981899190019011902190319041905190619071908190919101911191219131914191519161917191819191920192119221923192419251926192719281929193019311932193319341935193619371938193919401941194219431944194519461947194819491950195119521953195419551956195719581959196019611962196319641965196619671968196919701971197219731974197519761977197819791980198119821983198419851986198719881989199019911992199319941995199619971998
  1. // SPDX-License-Identifier: GPL-3.0-or-later
  2. #include "replication.h"
  3. #include "Judy.h"
  4. #define STREAMING_START_MAX_SENDER_BUFFER_PERCENTAGE_ALLOWED 50ULL
  5. #define MAX_REPLICATION_MESSAGE_PERCENT_SENDER_BUFFER 25ULL
  6. #define MAX_SENDER_BUFFER_PERCENTAGE_ALLOWED 50ULL
  7. #define MIN_SENDER_BUFFER_PERCENTAGE_ALLOWED 10ULL
  8. #define WORKER_JOB_FIND_NEXT 1
  9. #define WORKER_JOB_QUERYING 2
  10. #define WORKER_JOB_DELETE_ENTRY 3
  11. #define WORKER_JOB_FIND_CHART 4
  12. #define WORKER_JOB_PREPARE_QUERY 5
  13. #define WORKER_JOB_CHECK_CONSISTENCY 6
  14. #define WORKER_JOB_BUFFER_COMMIT 7
  15. #define WORKER_JOB_CLEANUP 8
  16. #define WORKER_JOB_WAIT 9
  17. // master thread worker jobs
  18. #define WORKER_JOB_STATISTICS 10
  19. #define WORKER_JOB_CUSTOM_METRIC_PENDING_REQUESTS 11
  20. #define WORKER_JOB_CUSTOM_METRIC_SKIPPED_NO_ROOM 12
  21. #define WORKER_JOB_CUSTOM_METRIC_COMPLETION 13
  22. #define WORKER_JOB_CUSTOM_METRIC_ADDED 14
  23. #define WORKER_JOB_CUSTOM_METRIC_DONE 15
  24. #define WORKER_JOB_CUSTOM_METRIC_SENDER_RESETS 16
  25. #define WORKER_JOB_CUSTOM_METRIC_SENDER_FULL 17
  26. #define ITERATIONS_IDLE_WITHOUT_PENDING_TO_RUN_SENDER_VERIFICATION 30
  27. #define SECONDS_TO_RESET_POINT_IN_TIME 10
  28. static struct replication_query_statistics replication_queries = {
  29. .spinlock = NETDATA_SPINLOCK_INITIALIZER,
  30. .queries_started = 0,
  31. .queries_finished = 0,
  32. .points_read = 0,
  33. .points_generated = 0,
  34. };
  35. struct replication_query_statistics replication_get_query_statistics(void) {
  36. netdata_spinlock_lock(&replication_queries.spinlock);
  37. struct replication_query_statistics ret = replication_queries;
  38. netdata_spinlock_unlock(&replication_queries.spinlock);
  39. return ret;
  40. }
  41. size_t replication_buffers_allocated = 0;
  42. size_t replication_allocated_buffers(void) {
  43. return __atomic_load_n(&replication_buffers_allocated, __ATOMIC_RELAXED);
  44. }
  45. // ----------------------------------------------------------------------------
  46. // sending replication replies
  47. struct replication_dimension {
  48. STORAGE_POINT sp;
  49. struct storage_engine_query_handle handle;
  50. bool enabled;
  51. bool skip;
  52. DICTIONARY *dict;
  53. const DICTIONARY_ITEM *rda;
  54. RRDDIM *rd;
  55. };
  56. struct replication_query {
  57. RRDSET *st;
  58. struct {
  59. time_t first_entry_t;
  60. time_t last_entry_t;
  61. } db;
  62. struct { // what the parent requested
  63. time_t after;
  64. time_t before;
  65. bool enable_streaming;
  66. } request;
  67. struct { // what the child will do
  68. time_t after;
  69. time_t before;
  70. bool enable_streaming;
  71. bool locked_data_collection;
  72. bool execute;
  73. bool interrupted;
  74. STREAM_CAPABILITIES capabilities;
  75. } query;
  76. time_t wall_clock_time;
  77. size_t points_read;
  78. size_t points_generated;
  79. STORAGE_ENGINE_BACKEND backend;
  80. struct replication_request *rq;
  81. size_t dimensions;
  82. struct replication_dimension data[];
  83. };
  84. static struct replication_query *replication_query_prepare(
  85. RRDSET *st,
  86. time_t db_first_entry,
  87. time_t db_last_entry,
  88. time_t requested_after,
  89. time_t requested_before,
  90. bool requested_enable_streaming,
  91. time_t query_after,
  92. time_t query_before,
  93. bool query_enable_streaming,
  94. time_t wall_clock_time,
  95. STREAM_CAPABILITIES capabilities
  96. ) {
  97. size_t dimensions = rrdset_number_of_dimensions(st);
  98. struct replication_query *q = callocz(1, sizeof(struct replication_query) + dimensions * sizeof(struct replication_dimension));
  99. __atomic_add_fetch(&replication_buffers_allocated, sizeof(struct replication_query) + dimensions * sizeof(struct replication_dimension), __ATOMIC_RELAXED);
  100. q->dimensions = dimensions;
  101. q->st = st;
  102. q->db.first_entry_t = db_first_entry;
  103. q->db.last_entry_t = db_last_entry;
  104. q->request.after = requested_after,
  105. q->request.before = requested_before,
  106. q->request.enable_streaming = requested_enable_streaming,
  107. q->query.after = query_after;
  108. q->query.before = query_before;
  109. q->query.enable_streaming = query_enable_streaming;
  110. q->query.capabilities = capabilities;
  111. q->wall_clock_time = wall_clock_time;
  112. if (!q->dimensions || !q->query.after || !q->query.before) {
  113. q->query.execute = false;
  114. q->dimensions = 0;
  115. return q;
  116. }
  117. if(q->query.enable_streaming) {
  118. netdata_spinlock_lock(&st->data_collection_lock);
  119. q->query.locked_data_collection = true;
  120. if (st->last_updated.tv_sec > q->query.before) {
  121. #ifdef NETDATA_LOG_REPLICATION_REQUESTS
  122. internal_error(true,
  123. "STREAM_SENDER REPLAY: 'host:%s/chart:%s' "
  124. "has start_streaming = true, "
  125. "adjusting replication before timestamp from %llu to %llu",
  126. rrdhost_hostname(st->rrdhost), rrdset_id(st),
  127. (unsigned long long) q->query.before,
  128. (unsigned long long) st->last_updated.tv_sec
  129. );
  130. #endif
  131. q->query.before = MIN(st->last_updated.tv_sec, wall_clock_time);
  132. }
  133. }
  134. q->backend = st->rrdhost->db[0].eng->backend;
  135. // prepare our array of dimensions
  136. size_t count = 0;
  137. RRDDIM *rd;
  138. rrddim_foreach_read(rd, st) {
  139. if (unlikely(!rd || !rd_dfe.item || !rd->exposed))
  140. continue;
  141. if (unlikely(rd_dfe.counter >= q->dimensions)) {
  142. internal_error(true,
  143. "STREAM_SENDER REPLAY ERROR: 'host:%s/chart:%s' has more dimensions than the replicated ones",
  144. rrdhost_hostname(st->rrdhost), rrdset_id(st));
  145. break;
  146. }
  147. struct replication_dimension *d = &q->data[rd_dfe.counter];
  148. d->dict = rd_dfe.dict;
  149. d->rda = dictionary_acquired_item_dup(rd_dfe.dict, rd_dfe.item);
  150. d->rd = rd;
  151. storage_engine_query_init(q->backend, rd->tiers[0].db_metric_handle, &d->handle, q->query.after, q->query.before,
  152. q->query.locked_data_collection ? STORAGE_PRIORITY_HIGH : STORAGE_PRIORITY_LOW);
  153. d->enabled = true;
  154. d->skip = false;
  155. count++;
  156. }
  157. rrddim_foreach_done(rd);
  158. if(!count) {
  159. // no data for this chart
  160. q->query.execute = false;
  161. if(q->query.locked_data_collection) {
  162. netdata_spinlock_unlock(&st->data_collection_lock);
  163. q->query.locked_data_collection = false;
  164. }
  165. }
  166. else {
  167. // we have data for this chart
  168. q->query.execute = true;
  169. }
  170. return q;
  171. }
  172. static void replication_send_chart_collection_state(BUFFER *wb, RRDSET *st, STREAM_CAPABILITIES capabilities) {
  173. NUMBER_ENCODING encoding = (capabilities & STREAM_CAP_IEEE754) ? NUMBER_ENCODING_BASE64 : NUMBER_ENCODING_DECIMAL;
  174. RRDDIM *rd;
  175. rrddim_foreach_read(rd, st){
  176. if (!rd->exposed) continue;
  177. buffer_fast_strcat(wb, PLUGINSD_KEYWORD_REPLAY_RRDDIM_STATE " '",
  178. sizeof(PLUGINSD_KEYWORD_REPLAY_RRDDIM_STATE) - 1 + 2);
  179. buffer_fast_strcat(wb, rrddim_id(rd), string_strlen(rd->id));
  180. buffer_fast_strcat(wb, "' ", 2);
  181. buffer_print_uint64_encoded(wb, encoding, (usec_t) rd->last_collected_time.tv_sec * USEC_PER_SEC +
  182. (usec_t) rd->last_collected_time.tv_usec);
  183. buffer_fast_strcat(wb, " ", 1);
  184. buffer_print_int64_encoded(wb, encoding, rd->last_collected_value);
  185. buffer_fast_strcat(wb, " ", 1);
  186. buffer_print_netdata_double_encoded(wb, encoding, rd->last_calculated_value);
  187. buffer_fast_strcat(wb, " ", 1);
  188. buffer_print_netdata_double_encoded(wb, encoding, rd->last_stored_value);
  189. buffer_fast_strcat(wb, "\n", 1);
  190. }
  191. rrddim_foreach_done(rd);
  192. buffer_fast_strcat(wb, PLUGINSD_KEYWORD_REPLAY_RRDSET_STATE " ", sizeof(PLUGINSD_KEYWORD_REPLAY_RRDSET_STATE) - 1 + 1);
  193. buffer_print_uint64_encoded(wb, encoding, (usec_t) st->last_collected_time.tv_sec * USEC_PER_SEC + (usec_t) st->last_collected_time.tv_usec);
  194. buffer_fast_strcat(wb, " ", 1);
  195. buffer_print_uint64_encoded(wb, encoding, (usec_t) st->last_updated.tv_sec * USEC_PER_SEC + (usec_t) st->last_updated.tv_usec);
  196. buffer_fast_strcat(wb, "\n", 1);
  197. }
  198. static void replication_query_finalize(BUFFER *wb, struct replication_query *q, bool executed) {
  199. size_t dimensions = q->dimensions;
  200. if(wb && q->query.enable_streaming)
  201. replication_send_chart_collection_state(wb, q->st, q->query.capabilities);
  202. if(q->query.locked_data_collection) {
  203. netdata_spinlock_unlock(&q->st->data_collection_lock);
  204. q->query.locked_data_collection = false;
  205. }
  206. // release all the dictionary items acquired
  207. // finalize the queries
  208. size_t queries = 0;
  209. for (size_t i = 0; i < dimensions; i++) {
  210. struct replication_dimension *d = &q->data[i];
  211. if (unlikely(!d->enabled)) continue;
  212. storage_engine_query_finalize(&d->handle);
  213. dictionary_acquired_item_release(d->dict, d->rda);
  214. // update global statistics
  215. queries++;
  216. }
  217. if(executed) {
  218. netdata_spinlock_lock(&replication_queries.spinlock);
  219. replication_queries.queries_started += queries;
  220. replication_queries.queries_finished += queries;
  221. replication_queries.points_read += q->points_read;
  222. replication_queries.points_generated += q->points_generated;
  223. netdata_spinlock_unlock(&replication_queries.spinlock);
  224. }
  225. __atomic_sub_fetch(&replication_buffers_allocated, sizeof(struct replication_query) + dimensions * sizeof(struct replication_dimension), __ATOMIC_RELAXED);
  226. freez(q);
  227. }
  228. static void replication_query_align_to_optimal_before(struct replication_query *q) {
  229. if(!q->query.execute || q->query.enable_streaming)
  230. return;
  231. size_t dimensions = q->dimensions;
  232. time_t expanded_before = 0;
  233. for (size_t i = 0; i < dimensions; i++) {
  234. struct replication_dimension *d = &q->data[i];
  235. if(unlikely(!d->enabled)) continue;
  236. time_t new_before = storage_engine_align_to_optimal_before(&d->handle);
  237. if (!expanded_before || new_before < expanded_before)
  238. expanded_before = new_before;
  239. }
  240. if(expanded_before > q->query.before && // it is later than the original
  241. (expanded_before - q->query.before) / q->st->update_every < 1024 && // it is reasonable (up to a page)
  242. expanded_before < q->st->last_updated.tv_sec && // it is not the chart's last updated time
  243. expanded_before < q->wall_clock_time) // it is not later than the wall clock time
  244. q->query.before = expanded_before;
  245. }
  246. static bool replication_query_execute(BUFFER *wb, struct replication_query *q, size_t max_msg_size) {
  247. replication_query_align_to_optimal_before(q);
  248. NUMBER_ENCODING encoding = (q->query.capabilities & STREAM_CAP_IEEE754) ? NUMBER_ENCODING_BASE64 : NUMBER_ENCODING_DECIMAL;
  249. time_t after = q->query.after;
  250. time_t before = q->query.before;
  251. size_t dimensions = q->dimensions;
  252. time_t wall_clock_time = q->wall_clock_time;
  253. bool finished_with_gap = false;
  254. size_t points_read = 0, points_generated = 0;
  255. #ifdef NETDATA_LOG_REPLICATION_REQUESTS
  256. time_t actual_after = 0, actual_before = 0;
  257. #endif
  258. time_t now = after + 1;
  259. time_t last_end_time_in_buffer = 0;
  260. while(now <= before) {
  261. time_t min_start_time = 0, max_start_time = 0, min_end_time = 0, max_end_time = 0, min_update_every = 0, max_update_every = 0;
  262. for (size_t i = 0; i < dimensions ;i++) {
  263. struct replication_dimension *d = &q->data[i];
  264. if(unlikely(!d->enabled || d->skip)) continue;
  265. // fetch the first valid point for the dimension
  266. int max_skip = 1000;
  267. while(d->sp.end_time_s < now && !storage_engine_query_is_finished(&d->handle) && max_skip-- >= 0) {
  268. d->sp = storage_engine_query_next_metric(&d->handle);
  269. points_read++;
  270. }
  271. if(max_skip <= 0) {
  272. d->skip = true;
  273. error_limit_static_global_var(erl, 1, 0);
  274. error_limit(&erl,
  275. "STREAM_SENDER REPLAY ERROR: 'host:%s/chart:%s/dim:%s': db does not advance the query "
  276. "beyond time %llu (tried 1000 times to get the next point and always got back a point in the past)",
  277. rrdhost_hostname(q->st->rrdhost), rrdset_id(q->st), rrddim_id(d->rd),
  278. (unsigned long long) now);
  279. continue;
  280. }
  281. if(unlikely(d->sp.end_time_s < now || d->sp.end_time_s < d->sp.start_time_s))
  282. // this dimension does not provide any data
  283. continue;
  284. time_t update_every = d->sp.end_time_s - d->sp.start_time_s;
  285. if(unlikely(!update_every))
  286. update_every = q->st->update_every;
  287. if(unlikely(!min_update_every))
  288. min_update_every = update_every;
  289. if(unlikely(!min_start_time))
  290. min_start_time = d->sp.start_time_s;
  291. if(unlikely(!min_end_time))
  292. min_end_time = d->sp.end_time_s;
  293. min_update_every = MIN(min_update_every, update_every);
  294. max_update_every = MAX(max_update_every, update_every);
  295. min_start_time = MIN(min_start_time, d->sp.start_time_s);
  296. max_start_time = MAX(max_start_time, d->sp.start_time_s);
  297. min_end_time = MIN(min_end_time, d->sp.end_time_s);
  298. max_end_time = MAX(max_end_time, d->sp.end_time_s);
  299. }
  300. if (unlikely(min_update_every != max_update_every ||
  301. min_start_time != max_start_time)) {
  302. time_t fix_min_start_time;
  303. if(last_end_time_in_buffer &&
  304. last_end_time_in_buffer >= min_start_time &&
  305. last_end_time_in_buffer <= max_start_time) {
  306. fix_min_start_time = last_end_time_in_buffer;
  307. }
  308. else
  309. fix_min_start_time = min_end_time - min_update_every;
  310. #ifdef NETDATA_INTERNAL_CHECKS
  311. error_limit_static_global_var(erl, 1, 0);
  312. error_limit(&erl, "REPLAY WARNING: 'host:%s/chart:%s' "
  313. "misaligned dimensions, "
  314. "update every (min: %ld, max: %ld), "
  315. "start time (min: %ld, max: %ld), "
  316. "end time (min %ld, max %ld), "
  317. "now %ld, last end time sent %ld, "
  318. "min start time is fixed to %ld",
  319. rrdhost_hostname(q->st->rrdhost), rrdset_id(q->st),
  320. min_update_every, max_update_every,
  321. min_start_time, max_start_time,
  322. min_end_time, max_end_time,
  323. now, last_end_time_in_buffer,
  324. fix_min_start_time
  325. );
  326. #endif
  327. min_start_time = fix_min_start_time;
  328. }
  329. if(likely(min_start_time <= now && min_end_time >= now)) {
  330. // we have a valid point
  331. if (unlikely(min_end_time == min_start_time))
  332. min_start_time = min_end_time - q->st->update_every;
  333. #ifdef NETDATA_LOG_REPLICATION_REQUESTS
  334. if (unlikely(!actual_after))
  335. actual_after = min_end_time;
  336. actual_before = min_end_time;
  337. #endif
  338. if(buffer_strlen(wb) > max_msg_size && last_end_time_in_buffer) {
  339. q->query.before = last_end_time_in_buffer;
  340. q->query.enable_streaming = false;
  341. internal_error(true, "REPLICATION: current buffer size %zu is more than the "
  342. "max message size %zu for chart '%s' of host '%s'. "
  343. "Interrupting replication request (%ld to %ld, %s) at %ld to %ld, %s.",
  344. buffer_strlen(wb), max_msg_size, rrdset_id(q->st), rrdhost_hostname(q->st->rrdhost),
  345. q->request.after, q->request.before, q->request.enable_streaming?"true":"false",
  346. q->query.after, q->query.before, q->query.enable_streaming?"true":"false");
  347. q->query.interrupted = true;
  348. break;
  349. }
  350. last_end_time_in_buffer = min_end_time;
  351. buffer_fast_strcat(wb, PLUGINSD_KEYWORD_REPLAY_BEGIN " '' ", sizeof(PLUGINSD_KEYWORD_REPLAY_BEGIN) - 1 + 4);
  352. buffer_print_uint64_encoded(wb, encoding, min_start_time);
  353. buffer_fast_strcat(wb, " ", 1);
  354. buffer_print_uint64_encoded(wb, encoding, min_end_time);
  355. buffer_fast_strcat(wb, " ", 1);
  356. buffer_print_uint64_encoded(wb, encoding, wall_clock_time);
  357. buffer_fast_strcat(wb, "\n", 1);
  358. // output the replay values for this time
  359. for (size_t i = 0; i < dimensions; i++) {
  360. struct replication_dimension *d = &q->data[i];
  361. if (unlikely(!d->enabled)) continue;
  362. if (likely( d->sp.start_time_s <= min_end_time &&
  363. d->sp.end_time_s >= min_end_time &&
  364. !storage_point_is_unset(d->sp) &&
  365. !storage_point_is_gap(d->sp))) {
  366. buffer_fast_strcat(wb, PLUGINSD_KEYWORD_REPLAY_SET " \"", sizeof(PLUGINSD_KEYWORD_REPLAY_SET) - 1 + 2);
  367. buffer_fast_strcat(wb, rrddim_id(d->rd), string_strlen(d->rd->id));
  368. buffer_fast_strcat(wb, "\" ", 2);
  369. buffer_print_netdata_double_encoded(wb, encoding, d->sp.sum);
  370. buffer_fast_strcat(wb, " ", 1);
  371. buffer_print_sn_flags(wb, d->sp.flags, q->query.capabilities & STREAM_CAP_INTERPOLATED);
  372. buffer_fast_strcat(wb, "\n", 1);
  373. points_generated++;
  374. }
  375. }
  376. now = min_end_time + 1;
  377. }
  378. else if(unlikely(min_end_time < now))
  379. // the query does not progress
  380. break;
  381. else {
  382. // we have gap - all points are in the future
  383. now = min_start_time;
  384. if(min_start_time > before && !points_generated) {
  385. before = q->query.before = min_start_time - 1;
  386. finished_with_gap = true;
  387. break;
  388. }
  389. }
  390. }
  391. #ifdef NETDATA_LOG_REPLICATION_REQUESTS
  392. if(actual_after) {
  393. char actual_after_buf[LOG_DATE_LENGTH + 1], actual_before_buf[LOG_DATE_LENGTH + 1];
  394. log_date(actual_after_buf, LOG_DATE_LENGTH, actual_after);
  395. log_date(actual_before_buf, LOG_DATE_LENGTH, actual_before);
  396. internal_error(true,
  397. "STREAM_SENDER REPLAY: 'host:%s/chart:%s': sending data %llu [%s] to %llu [%s] (requested %llu [delta %lld] to %llu [delta %lld])",
  398. rrdhost_hostname(q->st->rrdhost), rrdset_id(q->st),
  399. (unsigned long long)actual_after, actual_after_buf, (unsigned long long)actual_before, actual_before_buf,
  400. (unsigned long long)after, (long long)(actual_after - after), (unsigned long long)before, (long long)(actual_before - before));
  401. }
  402. else
  403. internal_error(true,
  404. "STREAM_SENDER REPLAY: 'host:%s/chart:%s': nothing to send (requested %llu to %llu)",
  405. rrdhost_hostname(q->st->rrdhost), rrdset_id(q->st),
  406. (unsigned long long)after, (unsigned long long)before);
  407. #endif // NETDATA_LOG_REPLICATION_REQUESTS
  408. q->points_read += points_read;
  409. q->points_generated += points_generated;
  410. if(last_end_time_in_buffer < before - q->st->update_every)
  411. finished_with_gap = true;
  412. return finished_with_gap;
  413. }
  414. static struct replication_query *replication_response_prepare(
  415. RRDSET *st,
  416. bool requested_enable_streaming,
  417. time_t requested_after,
  418. time_t requested_before,
  419. STREAM_CAPABILITIES capabilities
  420. ) {
  421. time_t wall_clock_time = now_realtime_sec();
  422. if(requested_after > requested_before) {
  423. // flip them
  424. time_t t = requested_before;
  425. requested_before = requested_after;
  426. requested_after = t;
  427. }
  428. if(requested_after > wall_clock_time) {
  429. requested_after = 0;
  430. requested_before = 0;
  431. requested_enable_streaming = true;
  432. }
  433. if(requested_before > wall_clock_time) {
  434. requested_before = wall_clock_time;
  435. requested_enable_streaming = true;
  436. }
  437. time_t query_after = requested_after;
  438. time_t query_before = requested_before;
  439. bool query_enable_streaming = requested_enable_streaming;
  440. time_t db_first_entry = 0, db_last_entry = 0;
  441. rrdset_get_retention_of_tier_for_collected_chart(
  442. st, &db_first_entry, &db_last_entry, wall_clock_time, 0);
  443. if(requested_after == 0 && requested_before == 0 && requested_enable_streaming == true) {
  444. // no data requested - just enable streaming
  445. ;
  446. }
  447. else {
  448. if (query_after < db_first_entry)
  449. query_after = db_first_entry;
  450. if (query_before > db_last_entry)
  451. query_before = db_last_entry;
  452. // if the parent asked us to start streaming, then fill the rest with the data that we have
  453. if (requested_enable_streaming)
  454. query_before = db_last_entry;
  455. if (query_after > query_before) {
  456. time_t tmp = query_before;
  457. query_before = query_after;
  458. query_after = tmp;
  459. }
  460. query_enable_streaming = (requested_enable_streaming ||
  461. query_before == db_last_entry ||
  462. !requested_after ||
  463. !requested_before) ? true : false;
  464. }
  465. return replication_query_prepare(
  466. st,
  467. db_first_entry, db_last_entry,
  468. requested_after, requested_before, requested_enable_streaming,
  469. query_after, query_before, query_enable_streaming,
  470. wall_clock_time, capabilities);
  471. }
  472. void replication_response_cancel_and_finalize(struct replication_query *q) {
  473. replication_query_finalize(NULL, q, false);
  474. }
  475. static bool sender_is_still_connected_for_this_request(struct replication_request *rq);
  476. bool replication_response_execute_and_finalize(struct replication_query *q, size_t max_msg_size) {
  477. NUMBER_ENCODING encoding = (q->query.capabilities & STREAM_CAP_IEEE754) ? NUMBER_ENCODING_BASE64 : NUMBER_ENCODING_DECIMAL;
  478. struct replication_request *rq = q->rq;
  479. RRDSET *st = q->st;
  480. RRDHOST *host = st->rrdhost;
  481. // we might want to optimize this by filling a temporary buffer
  482. // and copying the result to the host's buffer in order to avoid
  483. // holding the host's buffer lock for too long
  484. BUFFER *wb = sender_start(host->sender);
  485. buffer_fast_strcat(wb, PLUGINSD_KEYWORD_REPLAY_BEGIN " '", sizeof(PLUGINSD_KEYWORD_REPLAY_BEGIN) - 1 + 2);
  486. buffer_fast_strcat(wb, rrdset_id(st), string_strlen(st->id));
  487. buffer_fast_strcat(wb, "'\n", 2);
  488. // buffer_sprintf(wb, PLUGINSD_KEYWORD_REPLAY_BEGIN " \"%s\"\n", rrdset_id(st));
  489. bool locked_data_collection = q->query.locked_data_collection;
  490. q->query.locked_data_collection = false;
  491. bool finished_with_gap = false;
  492. if(q->query.execute)
  493. finished_with_gap = replication_query_execute(wb, q, max_msg_size);
  494. time_t after = q->request.after;
  495. time_t before = q->query.before;
  496. bool enable_streaming = q->query.enable_streaming;
  497. replication_query_finalize(wb, q, q->query.execute);
  498. q = NULL; // IMPORTANT: q is invalid now
  499. // get a fresh retention to send to the parent
  500. time_t wall_clock_time = now_realtime_sec();
  501. time_t db_first_entry, db_last_entry;
  502. rrdset_get_retention_of_tier_for_collected_chart(st, &db_first_entry, &db_last_entry, wall_clock_time, 0);
  503. // end with first/last entries we have, and the first start time and
  504. // last end time of the data we sent
  505. buffer_fast_strcat(wb, PLUGINSD_KEYWORD_REPLAY_END " ", sizeof(PLUGINSD_KEYWORD_REPLAY_END) - 1 + 1);
  506. buffer_print_int64_encoded(wb, encoding, st->update_every);
  507. buffer_fast_strcat(wb, " ", 1);
  508. buffer_print_uint64_encoded(wb, encoding, db_first_entry);
  509. buffer_fast_strcat(wb, " ", 1);
  510. buffer_print_uint64_encoded(wb, encoding, db_last_entry);
  511. buffer_fast_strcat(wb, enable_streaming ? " true " : " false ", 7);
  512. buffer_print_uint64_encoded(wb, encoding, after);
  513. buffer_fast_strcat(wb, " ", 1);
  514. buffer_print_uint64_encoded(wb, encoding, before);
  515. buffer_fast_strcat(wb, " ", 1);
  516. buffer_print_uint64_encoded(wb, encoding, wall_clock_time);
  517. buffer_fast_strcat(wb, "\n", 1);
  518. worker_is_busy(WORKER_JOB_BUFFER_COMMIT);
  519. sender_commit(host->sender, wb);
  520. worker_is_busy(WORKER_JOB_CLEANUP);
  521. if(enable_streaming) {
  522. if(sender_is_still_connected_for_this_request(rq)) {
  523. // enable normal streaming if we have to
  524. // but only if the sender buffer has not been flushed since we started
  525. if(rrdset_flag_check(st, RRDSET_FLAG_SENDER_REPLICATION_IN_PROGRESS)) {
  526. rrdset_flag_clear(st, RRDSET_FLAG_SENDER_REPLICATION_IN_PROGRESS);
  527. rrdset_flag_set(st, RRDSET_FLAG_SENDER_REPLICATION_FINISHED);
  528. rrdhost_sender_replicating_charts_minus_one(st->rrdhost);
  529. if(!finished_with_gap)
  530. st->upstream_resync_time_s = 0;
  531. #ifdef NETDATA_LOG_REPLICATION_REQUESTS
  532. internal_error(true, "STREAM_SENDER REPLAY: 'host:%s/chart:%s' streaming starts",
  533. rrdhost_hostname(st->rrdhost), rrdset_id(st));
  534. #endif
  535. }
  536. else
  537. internal_error(true, "REPLAY ERROR: 'host:%s/chart:%s' received start streaming command, but the chart is not in progress replicating",
  538. rrdhost_hostname(st->rrdhost), rrdset_id(st));
  539. }
  540. }
  541. if(locked_data_collection)
  542. netdata_spinlock_unlock(&st->data_collection_lock);
  543. return enable_streaming;
  544. }
  545. // ----------------------------------------------------------------------------
  546. // sending replication requests
  547. struct replication_request_details {
  548. struct {
  549. send_command callback;
  550. void *data;
  551. } caller;
  552. RRDHOST *host;
  553. RRDSET *st;
  554. struct {
  555. time_t first_entry_t; // the first entry time the child has
  556. time_t last_entry_t; // the last entry time the child has
  557. time_t wall_clock_time; // the current time of the child
  558. bool fixed_last_entry; // when set we set the last entry to wall clock time
  559. } child_db;
  560. struct {
  561. time_t first_entry_t; // the first entry time we have
  562. time_t last_entry_t; // the last entry time we have
  563. time_t wall_clock_time; // the current local world clock time
  564. } local_db;
  565. struct {
  566. time_t from; // the starting time of the entire gap we have
  567. time_t to; // the ending time of the entire gap we have
  568. } gap;
  569. struct {
  570. time_t after; // the start time we requested previously from this child
  571. time_t before; // the end time we requested previously from this child
  572. } last_request;
  573. struct {
  574. time_t after; // the start time of this replication request - the child will add 1 second
  575. time_t before; // the end time of this replication request
  576. bool start_streaming; // true when we want the child to send anything remaining and start streaming - the child will overwrite 'before'
  577. } wanted;
  578. };
  579. static void replicate_log_request(struct replication_request_details *r, const char *msg) {
  580. #ifdef NETDATA_INTERNAL_CHECKS
  581. internal_error(true,
  582. #else
  583. error_limit_static_global_var(erl, 1, 0);
  584. error_limit(&erl,
  585. #endif
  586. "REPLAY ERROR: 'host:%s/chart:%s' child sent: "
  587. "db from %ld to %ld%s, wall clock time %ld, "
  588. "last request from %ld to %ld, "
  589. "issue: %s - "
  590. "sending replication request from %ld to %ld, start streaming %s",
  591. rrdhost_hostname(r->st->rrdhost), rrdset_id(r->st),
  592. r->child_db.first_entry_t,
  593. r->child_db.last_entry_t, r->child_db.fixed_last_entry ? " (fixed)" : "",
  594. r->child_db.wall_clock_time,
  595. r->last_request.after,
  596. r->last_request.before,
  597. msg,
  598. r->wanted.after,
  599. r->wanted.before,
  600. r->wanted.start_streaming ? "true" : "false");
  601. }
  602. static bool send_replay_chart_cmd(struct replication_request_details *r, const char *msg, bool log) {
  603. RRDSET *st = r->st;
  604. if(log)
  605. replicate_log_request(r, msg);
  606. if(st->rrdhost->receiver && (!st->rrdhost->receiver->replication_first_time_t || r->wanted.after < st->rrdhost->receiver->replication_first_time_t))
  607. st->rrdhost->receiver->replication_first_time_t = r->wanted.after;
  608. #ifdef NETDATA_LOG_REPLICATION_REQUESTS
  609. st->replay.log_next_data_collection = true;
  610. char wanted_after_buf[LOG_DATE_LENGTH + 1] = "", wanted_before_buf[LOG_DATE_LENGTH + 1] = "";
  611. if(r->wanted.after)
  612. log_date(wanted_after_buf, LOG_DATE_LENGTH, r->wanted.after);
  613. if(r->wanted.before)
  614. log_date(wanted_before_buf, LOG_DATE_LENGTH, r->wanted.before);
  615. internal_error(true,
  616. "REPLAY: 'host:%s/chart:%s' sending replication request %ld [%s] to %ld [%s], start streaming '%s': %s: "
  617. "last[%ld - %ld] child[%ld - %ld, now %ld %s] local[%ld - %ld, now %ld] gap[%ld - %ld %s] %s"
  618. , rrdhost_hostname(r->host), rrdset_id(r->st)
  619. , r->wanted.after, wanted_after_buf
  620. , r->wanted.before, wanted_before_buf
  621. , r->wanted.start_streaming ? "YES" : "NO"
  622. , msg
  623. , r->last_request.after, r->last_request.before
  624. , r->child_db.first_entry_t, r->child_db.last_entry_t
  625. , r->child_db.wall_clock_time, (r->child_db.wall_clock_time == r->local_db.wall_clock_time) ? "SAME" : (r->child_db.wall_clock_time < r->local_db.wall_clock_time) ? "BEHIND" : "AHEAD"
  626. , r->local_db.first_entry_t, r->local_db.last_entry_t
  627. , r->local_db.wall_clock_time
  628. , r->gap.from, r->gap.to
  629. , (r->gap.from == r->wanted.after) ? "FULL" : "PARTIAL"
  630. , (st->replay.after != 0 || st->replay.before != 0) ? "OVERLAPPING" : ""
  631. );
  632. st->replay.start_streaming = r->wanted.start_streaming;
  633. st->replay.after = r->wanted.after;
  634. st->replay.before = r->wanted.before;
  635. #endif // NETDATA_LOG_REPLICATION_REQUESTS
  636. char buffer[2048 + 1];
  637. snprintfz(buffer, 2048, PLUGINSD_KEYWORD_REPLAY_CHART " \"%s\" \"%s\" %llu %llu\n",
  638. rrdset_id(st), r->wanted.start_streaming ? "true" : "false",
  639. (unsigned long long)r->wanted.after, (unsigned long long)r->wanted.before);
  640. int ret = r->caller.callback(buffer, r->caller.data);
  641. if (ret < 0) {
  642. error("REPLAY ERROR: 'host:%s/chart:%s' failed to send replication request to child (error %d)",
  643. rrdhost_hostname(r->host), rrdset_id(r->st), ret);
  644. return false;
  645. }
  646. return true;
  647. }
  648. bool replicate_chart_request(send_command callback, void *callback_data, RRDHOST *host, RRDSET *st,
  649. time_t child_first_entry, time_t child_last_entry, time_t child_wall_clock_time,
  650. time_t prev_first_entry_wanted, time_t prev_last_entry_wanted)
  651. {
  652. struct replication_request_details r = {
  653. .caller = {
  654. .callback = callback,
  655. .data = callback_data,
  656. },
  657. .host = host,
  658. .st = st,
  659. .child_db = {
  660. .first_entry_t = child_first_entry,
  661. .last_entry_t = child_last_entry,
  662. .wall_clock_time = child_wall_clock_time,
  663. .fixed_last_entry = false,
  664. },
  665. .local_db = {
  666. .first_entry_t = 0,
  667. .last_entry_t = 0,
  668. .wall_clock_time = now_realtime_sec(),
  669. },
  670. .last_request = {
  671. .after = prev_first_entry_wanted,
  672. .before = prev_last_entry_wanted,
  673. },
  674. .wanted = {
  675. .after = 0,
  676. .before = 0,
  677. .start_streaming = true,
  678. },
  679. };
  680. if(r.child_db.last_entry_t > r.child_db.wall_clock_time) {
  681. replicate_log_request(&r, "child's db last entry > child's wall clock time");
  682. r.child_db.last_entry_t = r.child_db.wall_clock_time;
  683. r.child_db.fixed_last_entry = true;
  684. }
  685. rrdset_get_retention_of_tier_for_collected_chart(r.st, &r.local_db.first_entry_t, &r.local_db.last_entry_t, r.local_db.wall_clock_time, 0);
  686. // let's find the GAP we have
  687. if(!r.last_request.after || !r.last_request.before) {
  688. // there is no previous request
  689. if(r.local_db.last_entry_t)
  690. // we have some data, let's continue from the last point we have
  691. r.gap.from = r.local_db.last_entry_t;
  692. else
  693. // we don't have any data, the gap is the max timeframe we are allowed to replicate
  694. r.gap.from = r.local_db.wall_clock_time - r.host->rrdpush_seconds_to_replicate;
  695. }
  696. else {
  697. // we had sent a request - let's continue at the point we left it
  698. // for this we don't take into account the actual data in our db
  699. // because the child may also have gaps, and we need to get over it
  700. r.gap.from = r.last_request.before;
  701. }
  702. // we want all the data up to now
  703. r.gap.to = r.local_db.wall_clock_time;
  704. // The gap is now r.gap.from -> r.gap.to
  705. if (unlikely(!rrdhost_option_check(host, RRDHOST_OPTION_REPLICATION)))
  706. return send_replay_chart_cmd(&r, "empty replication request, replication is disabled", false);
  707. if (unlikely(!rrdset_number_of_dimensions(st)))
  708. return send_replay_chart_cmd(&r, "empty replication request, chart has no dimensions", false);
  709. if (unlikely(!r.child_db.first_entry_t || !r.child_db.last_entry_t))
  710. return send_replay_chart_cmd(&r, "empty replication request, child has no stored data", false);
  711. if (unlikely(r.child_db.first_entry_t < 0 || r.child_db.last_entry_t < 0))
  712. return send_replay_chart_cmd(&r, "empty replication request, child db timestamps are invalid", true);
  713. if (unlikely(r.child_db.first_entry_t > r.child_db.wall_clock_time))
  714. return send_replay_chart_cmd(&r, "empty replication request, child db first entry is after its wall clock time", true);
  715. if (unlikely(r.child_db.first_entry_t > r.child_db.last_entry_t))
  716. return send_replay_chart_cmd(&r, "empty replication request, child timings are invalid (first entry > last entry)", true);
  717. if (unlikely(r.local_db.last_entry_t > r.child_db.last_entry_t))
  718. return send_replay_chart_cmd(&r, "empty replication request, local last entry is later than the child one", false);
  719. // let's find what the child can provide to fill that gap
  720. if(r.child_db.first_entry_t > r.gap.from)
  721. // the child does not have all the data - let's get what it has
  722. r.wanted.after = r.child_db.first_entry_t;
  723. else
  724. // ok, the child can fill the entire gap we have
  725. r.wanted.after = r.gap.from;
  726. if(r.gap.to - r.wanted.after > host->rrdpush_replication_step)
  727. // the duration is too big for one request - let's take the first step
  728. r.wanted.before = r.wanted.after + host->rrdpush_replication_step;
  729. else
  730. // wow, we can do it in one request
  731. r.wanted.before = r.gap.to;
  732. // don't ask from the child more than it has
  733. if(r.wanted.before > r.child_db.last_entry_t)
  734. r.wanted.before = r.child_db.last_entry_t;
  735. if(r.wanted.after > r.wanted.before) {
  736. r.wanted.after = 0;
  737. r.wanted.before = 0;
  738. r.wanted.start_streaming = true;
  739. return send_replay_chart_cmd(&r, "empty replication request, wanted after computed bigger than wanted before", true);
  740. }
  741. // the child should start streaming immediately if the wanted duration is small, or we reached the last entry of the child
  742. r.wanted.start_streaming = (r.local_db.wall_clock_time - r.wanted.after <= host->rrdpush_replication_step ||
  743. r.wanted.before >= r.child_db.last_entry_t ||
  744. r.wanted.before >= r.child_db.wall_clock_time ||
  745. r.wanted.before >= r.local_db.wall_clock_time);
  746. // the wanted timeframe is now r.wanted.after -> r.wanted.before
  747. // send it
  748. return send_replay_chart_cmd(&r, "OK", false);
  749. }
  750. // ----------------------------------------------------------------------------
  751. // replication thread
  752. // replication request in sender DICTIONARY
  753. // used for de-duplicating the requests
  754. struct replication_request {
  755. struct sender_state *sender; // the sender we should put the reply at
  756. STRING *chart_id; // the chart of the request
  757. time_t after; // the start time of the query (maybe zero) key for sorting (JudyL)
  758. time_t before; // the end time of the query (maybe zero)
  759. usec_t sender_last_flush_ut; // the timestamp of the sender, at the time we indexed this request
  760. Word_t unique_id; // auto-increment, later requests have bigger
  761. bool start_streaming; // true, when the parent wants to send the rest of the data (before is overwritten) and enable normal streaming
  762. bool indexed_in_judy; // true when the request is indexed in judy
  763. bool not_indexed_buffer_full; // true when the request is not indexed because the sender is full
  764. bool not_indexed_preprocessing; // true when the request is not indexed, but it is pending in preprocessing
  765. // prepare ahead members - preprocessing
  766. bool found; // used as a result boolean for the find call
  767. bool executed; // used to detect if we have skipped requests while preprocessing
  768. RRDSET *st; // caching of the chart during preprocessing
  769. struct replication_query *q; // the preprocessing query initialization
  770. };
  771. // replication sort entry in JudyL array
  772. // used for sorting all requests, across all nodes
  773. struct replication_sort_entry {
  774. struct replication_request *rq;
  775. size_t unique_id; // used as a key to identify the sort entry - we never access its contents
  776. };
  777. #define MAX_REPLICATION_THREADS 20 // + 1 for the main thread
  778. // the global variables for the replication thread
  779. static struct replication_thread {
  780. ARAL *aral_rse;
  781. SPINLOCK spinlock;
  782. struct {
  783. size_t pending; // number of requests pending in the queue
  784. // statistics
  785. size_t added; // number of requests added to the queue
  786. size_t removed; // number of requests removed from the queue
  787. size_t pending_no_room; // number of requests skipped, because the sender has no room for responses
  788. size_t senders_full; // number of times a sender reset our last position in the queue
  789. size_t sender_resets; // number of times a sender reset our last position in the queue
  790. time_t first_time_t; // the minimum 'after' we encountered
  791. struct {
  792. Word_t after;
  793. Word_t unique_id;
  794. Pvoid_t JudyL_array;
  795. } queue;
  796. } unsafe; // protected from replication_recursive_lock()
  797. struct {
  798. Word_t unique_id; // the last unique id we gave to a request (auto-increment, starting from 1)
  799. size_t executed; // the number of replication requests executed
  800. size_t latest_first_time; // the 'after' timestamp of the last request we executed
  801. size_t memory; // the total memory allocated by replication
  802. } atomic; // access should be with atomic operations
  803. struct {
  804. size_t last_executed; // caching of the atomic.executed to report number of requests executed since last time
  805. netdata_thread_t **threads_ptrs;
  806. size_t threads;
  807. } main_thread; // access is allowed only by the main thread
  808. } replication_globals = {
  809. .aral_rse = NULL,
  810. .spinlock = NETDATA_SPINLOCK_INITIALIZER,
  811. .unsafe = {
  812. .pending = 0,
  813. .added = 0,
  814. .removed = 0,
  815. .pending_no_room = 0,
  816. .sender_resets = 0,
  817. .senders_full = 0,
  818. .first_time_t = 0,
  819. .queue = {
  820. .after = 0,
  821. .unique_id = 0,
  822. .JudyL_array = NULL,
  823. },
  824. },
  825. .atomic = {
  826. .unique_id = 0,
  827. .executed = 0,
  828. .latest_first_time = 0,
  829. .memory = 0,
  830. },
  831. .main_thread = {
  832. .last_executed = 0,
  833. .threads = 0,
  834. .threads_ptrs = NULL,
  835. },
  836. };
  837. size_t replication_allocated_memory(void) {
  838. return __atomic_load_n(&replication_globals.atomic.memory, __ATOMIC_RELAXED);
  839. }
  840. #define replication_set_latest_first_time(t) __atomic_store_n(&replication_globals.atomic.latest_first_time, t, __ATOMIC_RELAXED)
  841. #define replication_get_latest_first_time() __atomic_load_n(&replication_globals.atomic.latest_first_time, __ATOMIC_RELAXED)
  842. static inline bool replication_recursive_lock_mode(char mode) {
  843. static __thread int recursions = 0;
  844. if(mode == 'L') { // (L)ock
  845. if(++recursions == 1)
  846. netdata_spinlock_lock(&replication_globals.spinlock);
  847. }
  848. else if(mode == 'U') { // (U)nlock
  849. if(--recursions == 0)
  850. netdata_spinlock_unlock(&replication_globals.spinlock);
  851. }
  852. else if(mode == 'C') { // (C)heck
  853. if(recursions > 0)
  854. return true;
  855. else
  856. return false;
  857. }
  858. else
  859. fatal("REPLICATION: unknown lock mode '%c'", mode);
  860. #ifdef NETDATA_INTERNAL_CHECKS
  861. if(recursions < 0)
  862. fatal("REPLICATION: recursions is %d", recursions);
  863. #endif
  864. return true;
  865. }
  866. #define replication_recursive_lock() replication_recursive_lock_mode('L')
  867. #define replication_recursive_unlock() replication_recursive_lock_mode('U')
  868. #define fatal_when_replication_is_not_locked_for_me() do { \
  869. if(!replication_recursive_lock_mode('C')) \
  870. fatal("REPLICATION: reached %s, but replication is not locked by this thread.", __FUNCTION__); \
  871. } while(0)
  872. void replication_set_next_point_in_time(time_t after, size_t unique_id) {
  873. replication_recursive_lock();
  874. replication_globals.unsafe.queue.after = after;
  875. replication_globals.unsafe.queue.unique_id = unique_id;
  876. replication_recursive_unlock();
  877. }
  878. // ----------------------------------------------------------------------------
  879. // replication sort entry management
  880. static struct replication_sort_entry *replication_sort_entry_create(struct replication_request *rq) {
  881. struct replication_sort_entry *rse = aral_mallocz(replication_globals.aral_rse);
  882. __atomic_add_fetch(&replication_globals.atomic.memory, sizeof(struct replication_sort_entry), __ATOMIC_RELAXED);
  883. rrdpush_sender_pending_replication_requests_plus_one(rq->sender);
  884. // copy the request
  885. rse->rq = rq;
  886. rse->unique_id = __atomic_add_fetch(&replication_globals.atomic.unique_id, 1, __ATOMIC_SEQ_CST);
  887. // save the unique id into the request, to be able to delete it later
  888. rq->unique_id = rse->unique_id;
  889. rq->indexed_in_judy = false;
  890. rq->not_indexed_buffer_full = false;
  891. rq->not_indexed_preprocessing = false;
  892. return rse;
  893. }
  894. static void replication_sort_entry_destroy(struct replication_sort_entry *rse) {
  895. aral_freez(replication_globals.aral_rse, rse);
  896. __atomic_sub_fetch(&replication_globals.atomic.memory, sizeof(struct replication_sort_entry), __ATOMIC_RELAXED);
  897. }
  898. static void replication_sort_entry_add(struct replication_request *rq) {
  899. if(rrdpush_sender_replication_buffer_full_get(rq->sender)) {
  900. rq->indexed_in_judy = false;
  901. rq->not_indexed_buffer_full = true;
  902. rq->not_indexed_preprocessing = false;
  903. replication_recursive_lock();
  904. replication_globals.unsafe.pending_no_room++;
  905. replication_recursive_unlock();
  906. return;
  907. }
  908. // cache this, because it will be changed
  909. bool decrement_no_room = rq->not_indexed_buffer_full;
  910. struct replication_sort_entry *rse = replication_sort_entry_create(rq);
  911. replication_recursive_lock();
  912. if(decrement_no_room)
  913. replication_globals.unsafe.pending_no_room--;
  914. // if(rq->after < (time_t)replication_globals.protected.queue.after &&
  915. // rq->sender->buffer_used_percentage <= MAX_SENDER_BUFFER_PERCENTAGE_ALLOWED &&
  916. // !replication_globals.protected.skipped_no_room_since_last_reset) {
  917. //
  918. // // make it find this request first
  919. // replication_set_next_point_in_time(rq->after, rq->unique_id);
  920. // }
  921. replication_globals.unsafe.added++;
  922. replication_globals.unsafe.pending++;
  923. Pvoid_t *inner_judy_ptr;
  924. // find the outer judy entry, using after as key
  925. size_t mem_before_outer_judyl = JudyLMemUsed(replication_globals.unsafe.queue.JudyL_array);
  926. inner_judy_ptr = JudyLIns(&replication_globals.unsafe.queue.JudyL_array, (Word_t) rq->after, PJE0);
  927. size_t mem_after_outer_judyl = JudyLMemUsed(replication_globals.unsafe.queue.JudyL_array);
  928. if(unlikely(!inner_judy_ptr || inner_judy_ptr == PJERR))
  929. fatal("REPLICATION: corrupted outer judyL");
  930. // add it to the inner judy, using unique_id as key
  931. size_t mem_before_inner_judyl = JudyLMemUsed(*inner_judy_ptr);
  932. Pvoid_t *item = JudyLIns(inner_judy_ptr, rq->unique_id, PJE0);
  933. size_t mem_after_inner_judyl = JudyLMemUsed(*inner_judy_ptr);
  934. if(unlikely(!item || item == PJERR))
  935. fatal("REPLICATION: corrupted inner judyL");
  936. *item = rse;
  937. rq->indexed_in_judy = true;
  938. rq->not_indexed_buffer_full = false;
  939. rq->not_indexed_preprocessing = false;
  940. if(!replication_globals.unsafe.first_time_t || rq->after < replication_globals.unsafe.first_time_t)
  941. replication_globals.unsafe.first_time_t = rq->after;
  942. replication_recursive_unlock();
  943. __atomic_add_fetch(&replication_globals.atomic.memory, (mem_after_inner_judyl - mem_before_inner_judyl) + (mem_after_outer_judyl - mem_before_outer_judyl), __ATOMIC_RELAXED);
  944. }
  945. static bool replication_sort_entry_unlink_and_free_unsafe(struct replication_sort_entry *rse, Pvoid_t **inner_judy_ppptr, bool preprocessing) {
  946. fatal_when_replication_is_not_locked_for_me();
  947. bool inner_judy_deleted = false;
  948. replication_globals.unsafe.removed++;
  949. replication_globals.unsafe.pending--;
  950. rrdpush_sender_pending_replication_requests_minus_one(rse->rq->sender);
  951. rse->rq->indexed_in_judy = false;
  952. rse->rq->not_indexed_preprocessing = preprocessing;
  953. size_t memory_saved = 0;
  954. // delete it from the inner judy
  955. size_t mem_before_inner_judyl = JudyLMemUsed(**inner_judy_ppptr);
  956. JudyLDel(*inner_judy_ppptr, rse->rq->unique_id, PJE0);
  957. size_t mem_after_inner_judyl = JudyLMemUsed(**inner_judy_ppptr);
  958. memory_saved = mem_before_inner_judyl - mem_after_inner_judyl;
  959. // if no items left, delete it from the outer judy
  960. if(**inner_judy_ppptr == NULL) {
  961. size_t mem_before_outer_judyl = JudyLMemUsed(replication_globals.unsafe.queue.JudyL_array);
  962. JudyLDel(&replication_globals.unsafe.queue.JudyL_array, rse->rq->after, PJE0);
  963. size_t mem_after_outer_judyl = JudyLMemUsed(replication_globals.unsafe.queue.JudyL_array);
  964. memory_saved += mem_before_outer_judyl - mem_after_outer_judyl;
  965. inner_judy_deleted = true;
  966. }
  967. // free memory
  968. replication_sort_entry_destroy(rse);
  969. __atomic_sub_fetch(&replication_globals.atomic.memory, memory_saved, __ATOMIC_RELAXED);
  970. return inner_judy_deleted;
  971. }
  972. static void replication_sort_entry_del(struct replication_request *rq, bool buffer_full) {
  973. Pvoid_t *inner_judy_pptr;
  974. struct replication_sort_entry *rse_to_delete = NULL;
  975. replication_recursive_lock();
  976. if(rq->indexed_in_judy) {
  977. inner_judy_pptr = JudyLGet(replication_globals.unsafe.queue.JudyL_array, rq->after, PJE0);
  978. if (inner_judy_pptr) {
  979. Pvoid_t *our_item_pptr = JudyLGet(*inner_judy_pptr, rq->unique_id, PJE0);
  980. if (our_item_pptr) {
  981. rse_to_delete = *our_item_pptr;
  982. replication_sort_entry_unlink_and_free_unsafe(rse_to_delete, &inner_judy_pptr, false);
  983. if(buffer_full) {
  984. replication_globals.unsafe.pending_no_room++;
  985. rq->not_indexed_buffer_full = true;
  986. }
  987. }
  988. }
  989. if (!rse_to_delete)
  990. fatal("REPLAY: 'host:%s/chart:%s' Cannot find sort entry to delete for time %ld.",
  991. rrdhost_hostname(rq->sender->host), string2str(rq->chart_id), rq->after);
  992. }
  993. replication_recursive_unlock();
  994. }
  995. static struct replication_request replication_request_get_first_available() {
  996. Pvoid_t *inner_judy_pptr;
  997. replication_recursive_lock();
  998. struct replication_request rq_to_return = (struct replication_request){ .found = false };
  999. if(unlikely(!replication_globals.unsafe.queue.after || !replication_globals.unsafe.queue.unique_id)) {
  1000. replication_globals.unsafe.queue.after = 0;
  1001. replication_globals.unsafe.queue.unique_id = 0;
  1002. }
  1003. Word_t started_after = replication_globals.unsafe.queue.after;
  1004. size_t round = 0;
  1005. while(!rq_to_return.found) {
  1006. round++;
  1007. if(round > 2)
  1008. break;
  1009. if(round == 2) {
  1010. if(started_after == 0)
  1011. break;
  1012. replication_globals.unsafe.queue.after = 0;
  1013. replication_globals.unsafe.queue.unique_id = 0;
  1014. }
  1015. bool find_same_after = true;
  1016. while (!rq_to_return.found && (inner_judy_pptr = JudyLFirstThenNext(replication_globals.unsafe.queue.JudyL_array, &replication_globals.unsafe.queue.after, &find_same_after))) {
  1017. Pvoid_t *our_item_pptr;
  1018. if(unlikely(round == 2 && replication_globals.unsafe.queue.after > started_after))
  1019. break;
  1020. while (!rq_to_return.found && (our_item_pptr = JudyLNext(*inner_judy_pptr, &replication_globals.unsafe.queue.unique_id, PJE0))) {
  1021. struct replication_sort_entry *rse = *our_item_pptr;
  1022. struct replication_request *rq = rse->rq;
  1023. // copy the request to return it
  1024. rq_to_return = *rq;
  1025. rq_to_return.chart_id = string_dup(rq_to_return.chart_id);
  1026. // set the return result to found
  1027. rq_to_return.found = true;
  1028. if (replication_sort_entry_unlink_and_free_unsafe(rse, &inner_judy_pptr, true))
  1029. // we removed the item from the outer JudyL
  1030. break;
  1031. }
  1032. // prepare for the next iteration on the outer loop
  1033. replication_globals.unsafe.queue.unique_id = 0;
  1034. }
  1035. }
  1036. replication_recursive_unlock();
  1037. return rq_to_return;
  1038. }
  1039. // ----------------------------------------------------------------------------
  1040. // replication request management
  1041. static void replication_request_react_callback(const DICTIONARY_ITEM *item __maybe_unused, void *value __maybe_unused, void *sender_state __maybe_unused) {
  1042. struct sender_state *s = sender_state; (void)s;
  1043. struct replication_request *rq = value;
  1044. // IMPORTANT:
  1045. // We use the react instead of the insert callback
  1046. // because we want the item to be atomically visible
  1047. // to our replication thread, immediately after.
  1048. // If we put this at the insert callback, the item is not guaranteed
  1049. // to be atomically visible to others, so the replication thread
  1050. // may see the replication sort entry, but fail to find the dictionary item
  1051. // related to it.
  1052. replication_sort_entry_add(rq);
  1053. // this request is about a unique chart for this sender
  1054. rrdpush_sender_replicating_charts_plus_one(s);
  1055. }
  1056. static bool replication_request_conflict_callback(const DICTIONARY_ITEM *item __maybe_unused, void *old_value, void *new_value, void *sender_state) {
  1057. struct sender_state *s = sender_state; (void)s;
  1058. struct replication_request *rq = old_value; (void)rq;
  1059. struct replication_request *rq_new = new_value;
  1060. replication_recursive_lock();
  1061. if(!rq->indexed_in_judy && rq->not_indexed_buffer_full && !rq->not_indexed_preprocessing) {
  1062. // we can replace this command
  1063. internal_error(
  1064. true,
  1065. "STREAM %s [send to %s]: REPLAY: 'host:%s/chart:%s' replacing duplicate replication command received (existing from %llu to %llu [%s], new from %llu to %llu [%s])",
  1066. rrdhost_hostname(s->host), s->connected_to, rrdhost_hostname(s->host), dictionary_acquired_item_name(item),
  1067. (unsigned long long)rq->after, (unsigned long long)rq->before, rq->start_streaming ? "true" : "false",
  1068. (unsigned long long)rq_new->after, (unsigned long long)rq_new->before, rq_new->start_streaming ? "true" : "false");
  1069. rq->after = rq_new->after;
  1070. rq->before = rq_new->before;
  1071. rq->start_streaming = rq_new->start_streaming;
  1072. }
  1073. else if(!rq->indexed_in_judy && !rq->not_indexed_preprocessing) {
  1074. replication_sort_entry_add(rq);
  1075. internal_error(
  1076. true,
  1077. "STREAM %s [send to %s]: REPLAY: 'host:%s/chart:%s' adding duplicate replication command received (existing from %llu to %llu [%s], new from %llu to %llu [%s])",
  1078. rrdhost_hostname(s->host), s->connected_to, rrdhost_hostname(s->host), dictionary_acquired_item_name(item),
  1079. (unsigned long long)rq->after, (unsigned long long)rq->before, rq->start_streaming ? "true" : "false",
  1080. (unsigned long long)rq_new->after, (unsigned long long)rq_new->before, rq_new->start_streaming ? "true" : "false");
  1081. }
  1082. else {
  1083. internal_error(
  1084. true,
  1085. "STREAM %s [send to %s]: REPLAY: 'host:%s/chart:%s' ignoring duplicate replication command received (existing from %llu to %llu [%s], new from %llu to %llu [%s])",
  1086. rrdhost_hostname(s->host), s->connected_to, rrdhost_hostname(s->host),
  1087. dictionary_acquired_item_name(item),
  1088. (unsigned long long) rq->after, (unsigned long long) rq->before, rq->start_streaming ? "true" : "false",
  1089. (unsigned long long) rq_new->after, (unsigned long long) rq_new->before, rq_new->start_streaming ? "true" : "false");
  1090. }
  1091. replication_recursive_unlock();
  1092. string_freez(rq_new->chart_id);
  1093. return false;
  1094. }
  1095. static void replication_request_delete_callback(const DICTIONARY_ITEM *item __maybe_unused, void *value, void *sender_state __maybe_unused) {
  1096. struct replication_request *rq = value;
  1097. // this request is about a unique chart for this sender
  1098. rrdpush_sender_replicating_charts_minus_one(rq->sender);
  1099. if(rq->indexed_in_judy)
  1100. replication_sort_entry_del(rq, false);
  1101. else if(rq->not_indexed_buffer_full) {
  1102. replication_recursive_lock();
  1103. replication_globals.unsafe.pending_no_room--;
  1104. replication_recursive_unlock();
  1105. }
  1106. string_freez(rq->chart_id);
  1107. }
  1108. static bool sender_is_still_connected_for_this_request(struct replication_request *rq) {
  1109. return rq->sender_last_flush_ut == rrdpush_sender_get_flush_time(rq->sender);
  1110. };
  1111. static bool replication_execute_request(struct replication_request *rq, bool workers) {
  1112. bool ret = false;
  1113. if(!rq->st) {
  1114. if(likely(workers))
  1115. worker_is_busy(WORKER_JOB_FIND_CHART);
  1116. rq->st = rrdset_find(rq->sender->host, string2str(rq->chart_id));
  1117. }
  1118. if(!rq->st) {
  1119. internal_error(true, "REPLAY ERROR: 'host:%s/chart:%s' not found",
  1120. rrdhost_hostname(rq->sender->host), string2str(rq->chart_id));
  1121. goto cleanup;
  1122. }
  1123. netdata_thread_disable_cancelability();
  1124. if(!rq->q) {
  1125. if(likely(workers))
  1126. worker_is_busy(WORKER_JOB_PREPARE_QUERY);
  1127. rq->q = replication_response_prepare(
  1128. rq->st,
  1129. rq->start_streaming,
  1130. rq->after,
  1131. rq->before,
  1132. rq->sender->capabilities);
  1133. }
  1134. if(likely(workers))
  1135. worker_is_busy(WORKER_JOB_QUERYING);
  1136. // send the replication data
  1137. rq->q->rq = rq;
  1138. replication_response_execute_and_finalize(
  1139. rq->q, (size_t)((unsigned long long)rq->sender->host->sender->buffer->max_size * MAX_REPLICATION_MESSAGE_PERCENT_SENDER_BUFFER / 100ULL));
  1140. rq->q = NULL;
  1141. netdata_thread_enable_cancelability();
  1142. __atomic_add_fetch(&replication_globals.atomic.executed, 1, __ATOMIC_RELAXED);
  1143. ret = true;
  1144. cleanup:
  1145. if(rq->q) {
  1146. replication_response_cancel_and_finalize(rq->q);
  1147. rq->q = NULL;
  1148. }
  1149. string_freez(rq->chart_id);
  1150. worker_is_idle();
  1151. return ret;
  1152. }
  1153. // ----------------------------------------------------------------------------
  1154. // public API
  1155. void replication_add_request(struct sender_state *sender, const char *chart_id, time_t after, time_t before, bool start_streaming) {
  1156. struct replication_request rq = {
  1157. .sender = sender,
  1158. .chart_id = string_strdupz(chart_id),
  1159. .after = after,
  1160. .before = before,
  1161. .start_streaming = start_streaming,
  1162. .sender_last_flush_ut = rrdpush_sender_get_flush_time(sender),
  1163. .indexed_in_judy = false,
  1164. .not_indexed_buffer_full = false,
  1165. .not_indexed_preprocessing = false,
  1166. };
  1167. if(start_streaming && rrdpush_sender_get_buffer_used_percent(sender) <= STREAMING_START_MAX_SENDER_BUFFER_PERCENTAGE_ALLOWED)
  1168. replication_execute_request(&rq, false);
  1169. else
  1170. dictionary_set(sender->replication.requests, chart_id, &rq, sizeof(struct replication_request));
  1171. }
  1172. void replication_sender_delete_pending_requests(struct sender_state *sender) {
  1173. // allow the dictionary destructor to go faster on locks
  1174. dictionary_flush(sender->replication.requests);
  1175. }
  1176. void replication_init_sender(struct sender_state *sender) {
  1177. sender->replication.requests = dictionary_create_advanced(DICT_OPTION_DONT_OVERWRITE_VALUE | DICT_OPTION_FIXED_SIZE,
  1178. NULL, sizeof(struct replication_request));
  1179. dictionary_register_react_callback(sender->replication.requests, replication_request_react_callback, sender);
  1180. dictionary_register_conflict_callback(sender->replication.requests, replication_request_conflict_callback, sender);
  1181. dictionary_register_delete_callback(sender->replication.requests, replication_request_delete_callback, sender);
  1182. }
  1183. void replication_cleanup_sender(struct sender_state *sender) {
  1184. // allow the dictionary destructor to go faster on locks
  1185. replication_recursive_lock();
  1186. dictionary_destroy(sender->replication.requests);
  1187. replication_recursive_unlock();
  1188. }
  1189. void replication_recalculate_buffer_used_ratio_unsafe(struct sender_state *s) {
  1190. size_t available = cbuffer_available_size_unsafe(s->host->sender->buffer);
  1191. size_t percentage = (s->buffer->max_size - available) * 100 / s->buffer->max_size;
  1192. if(unlikely(percentage > MAX_SENDER_BUFFER_PERCENTAGE_ALLOWED && !rrdpush_sender_replication_buffer_full_get(s))) {
  1193. rrdpush_sender_replication_buffer_full_set(s, true);
  1194. struct replication_request *rq;
  1195. dfe_start_read(s->replication.requests, rq) {
  1196. if(rq->indexed_in_judy)
  1197. replication_sort_entry_del(rq, true);
  1198. }
  1199. dfe_done(rq);
  1200. replication_recursive_lock();
  1201. replication_globals.unsafe.senders_full++;
  1202. replication_recursive_unlock();
  1203. }
  1204. else if(unlikely(percentage < MIN_SENDER_BUFFER_PERCENTAGE_ALLOWED && rrdpush_sender_replication_buffer_full_get(s))) {
  1205. rrdpush_sender_replication_buffer_full_set(s, false);
  1206. struct replication_request *rq;
  1207. dfe_start_read(s->replication.requests, rq) {
  1208. if(!rq->indexed_in_judy && (rq->not_indexed_buffer_full || rq->not_indexed_preprocessing))
  1209. replication_sort_entry_add(rq);
  1210. }
  1211. dfe_done(rq);
  1212. replication_recursive_lock();
  1213. replication_globals.unsafe.senders_full--;
  1214. replication_globals.unsafe.sender_resets++;
  1215. // replication_set_next_point_in_time(0, 0);
  1216. replication_recursive_unlock();
  1217. }
  1218. rrdpush_sender_set_buffer_used_percent(s, percentage);
  1219. }
  1220. // ----------------------------------------------------------------------------
  1221. // replication thread
  1222. static size_t verify_host_charts_are_streaming_now(RRDHOST *host) {
  1223. internal_error(
  1224. host->sender &&
  1225. !rrdpush_sender_pending_replication_requests(host->sender) &&
  1226. dictionary_entries(host->sender->replication.requests) != 0,
  1227. "REPLICATION SUMMARY: 'host:%s' reports %zu pending replication requests, but its chart replication index says there are %zu charts pending replication",
  1228. rrdhost_hostname(host),
  1229. rrdpush_sender_pending_replication_requests(host->sender),
  1230. dictionary_entries(host->sender->replication.requests)
  1231. );
  1232. size_t ok = 0;
  1233. size_t errors = 0;
  1234. RRDSET *st;
  1235. rrdset_foreach_read(st, host) {
  1236. RRDSET_FLAGS flags = rrdset_flag_check(st, RRDSET_FLAG_SENDER_REPLICATION_IN_PROGRESS | RRDSET_FLAG_SENDER_REPLICATION_FINISHED);
  1237. bool is_error = false;
  1238. if(!flags) {
  1239. internal_error(
  1240. true,
  1241. "REPLICATION SUMMARY: 'host:%s/chart:%s' is neither IN PROGRESS nor FINISHED",
  1242. rrdhost_hostname(host), rrdset_id(st)
  1243. );
  1244. is_error = true;
  1245. }
  1246. if(!(flags & RRDSET_FLAG_SENDER_REPLICATION_FINISHED) || (flags & RRDSET_FLAG_SENDER_REPLICATION_IN_PROGRESS)) {
  1247. internal_error(
  1248. true,
  1249. "REPLICATION SUMMARY: 'host:%s/chart:%s' is IN PROGRESS although replication is finished",
  1250. rrdhost_hostname(host), rrdset_id(st)
  1251. );
  1252. is_error = true;
  1253. }
  1254. if(is_error)
  1255. errors++;
  1256. else
  1257. ok++;
  1258. }
  1259. rrdset_foreach_done(st);
  1260. internal_error(errors,
  1261. "REPLICATION SUMMARY: 'host:%s' finished replicating %zu charts, but %zu charts are still in progress although replication finished",
  1262. rrdhost_hostname(host), ok, errors);
  1263. return errors;
  1264. }
  1265. static void verify_all_hosts_charts_are_streaming_now(void) {
  1266. worker_is_busy(WORKER_JOB_CHECK_CONSISTENCY);
  1267. size_t errors = 0;
  1268. RRDHOST *host;
  1269. dfe_start_read(rrdhost_root_index, host)
  1270. errors += verify_host_charts_are_streaming_now(host);
  1271. dfe_done(host);
  1272. size_t executed = __atomic_load_n(&replication_globals.atomic.executed, __ATOMIC_RELAXED);
  1273. info("REPLICATION SUMMARY: finished, executed %zu replication requests, %zu charts pending replication",
  1274. executed - replication_globals.main_thread.last_executed, errors);
  1275. replication_globals.main_thread.last_executed = executed;
  1276. }
  1277. static void replication_initialize_workers(bool master) {
  1278. worker_register("REPLICATION");
  1279. worker_register_job_name(WORKER_JOB_FIND_NEXT, "find next");
  1280. worker_register_job_name(WORKER_JOB_QUERYING, "querying");
  1281. worker_register_job_name(WORKER_JOB_DELETE_ENTRY, "dict delete");
  1282. worker_register_job_name(WORKER_JOB_FIND_CHART, "find chart");
  1283. worker_register_job_name(WORKER_JOB_PREPARE_QUERY, "prepare query");
  1284. worker_register_job_name(WORKER_JOB_CHECK_CONSISTENCY, "check consistency");
  1285. worker_register_job_name(WORKER_JOB_BUFFER_COMMIT, "commit");
  1286. worker_register_job_name(WORKER_JOB_CLEANUP, "cleanup");
  1287. worker_register_job_name(WORKER_JOB_WAIT, "wait");
  1288. if(master) {
  1289. worker_register_job_name(WORKER_JOB_STATISTICS, "statistics");
  1290. worker_register_job_custom_metric(WORKER_JOB_CUSTOM_METRIC_PENDING_REQUESTS, "pending requests", "requests", WORKER_METRIC_ABSOLUTE);
  1291. worker_register_job_custom_metric(WORKER_JOB_CUSTOM_METRIC_SKIPPED_NO_ROOM, "no room requests", "requests", WORKER_METRIC_ABSOLUTE);
  1292. worker_register_job_custom_metric(WORKER_JOB_CUSTOM_METRIC_COMPLETION, "completion", "%", WORKER_METRIC_ABSOLUTE);
  1293. worker_register_job_custom_metric(WORKER_JOB_CUSTOM_METRIC_ADDED, "added requests", "requests/s", WORKER_METRIC_INCREMENTAL_TOTAL);
  1294. worker_register_job_custom_metric(WORKER_JOB_CUSTOM_METRIC_DONE, "finished requests", "requests/s", WORKER_METRIC_INCREMENTAL_TOTAL);
  1295. worker_register_job_custom_metric(WORKER_JOB_CUSTOM_METRIC_SENDER_RESETS, "sender resets", "resets/s", WORKER_METRIC_INCREMENTAL_TOTAL);
  1296. worker_register_job_custom_metric(WORKER_JOB_CUSTOM_METRIC_SENDER_FULL, "senders full", "senders", WORKER_METRIC_ABSOLUTE);
  1297. }
  1298. }
  1299. #define REQUEST_OK (0)
  1300. #define REQUEST_QUEUE_EMPTY (-1)
  1301. #define REQUEST_CHART_NOT_FOUND (-2)
  1302. static __thread struct replication_thread_pipeline {
  1303. int max_requests_ahead;
  1304. struct replication_request *rqs;
  1305. int rqs_last_executed, rqs_last_prepared;
  1306. size_t queue_rounds;
  1307. } rtp = {
  1308. .max_requests_ahead = 0,
  1309. .rqs = NULL,
  1310. .rqs_last_executed = 0,
  1311. .rqs_last_prepared = 0,
  1312. .queue_rounds = 0,
  1313. };
  1314. static void replication_pipeline_cancel_and_cleanup(void) {
  1315. if(!rtp.rqs)
  1316. return;
  1317. struct replication_request *rq;
  1318. size_t cancelled = 0;
  1319. do {
  1320. if (++rtp.rqs_last_executed >= rtp.max_requests_ahead)
  1321. rtp.rqs_last_executed = 0;
  1322. rq = &rtp.rqs[rtp.rqs_last_executed];
  1323. if (rq->q) {
  1324. internal_fatal(rq->executed, "REPLAY FATAL: query has already been executed!");
  1325. internal_fatal(!rq->found, "REPLAY FATAL: orphan q in rq");
  1326. replication_response_cancel_and_finalize(rq->q);
  1327. rq->q = NULL;
  1328. cancelled++;
  1329. }
  1330. rq->executed = true;
  1331. rq->found = false;
  1332. } while (rtp.rqs_last_executed != rtp.rqs_last_prepared);
  1333. internal_error(true, "REPLICATION: cancelled %zu inflight queries", cancelled);
  1334. freez(rtp.rqs);
  1335. rtp.rqs = NULL;
  1336. rtp.max_requests_ahead = 0;
  1337. rtp.rqs_last_executed = 0;
  1338. rtp.rqs_last_prepared = 0;
  1339. rtp.queue_rounds = 0;
  1340. }
  1341. static int replication_pipeline_execute_next(void) {
  1342. struct replication_request *rq;
  1343. if(unlikely(!rtp.rqs)) {
  1344. rtp.max_requests_ahead = (int)get_netdata_cpus() / 2;
  1345. if(rtp.max_requests_ahead > libuv_worker_threads * 2)
  1346. rtp.max_requests_ahead = libuv_worker_threads * 2;
  1347. if(rtp.max_requests_ahead < 2)
  1348. rtp.max_requests_ahead = 2;
  1349. rtp.rqs = callocz(rtp.max_requests_ahead, sizeof(struct replication_request));
  1350. __atomic_add_fetch(&replication_buffers_allocated, rtp.max_requests_ahead * sizeof(struct replication_request), __ATOMIC_RELAXED);
  1351. }
  1352. // fill the queue
  1353. do {
  1354. if(++rtp.rqs_last_prepared >= rtp.max_requests_ahead) {
  1355. rtp.rqs_last_prepared = 0;
  1356. rtp.queue_rounds++;
  1357. }
  1358. internal_fatal(rtp.rqs[rtp.rqs_last_prepared].q,
  1359. "REPLAY FATAL: slot is used by query that has not been executed!");
  1360. worker_is_busy(WORKER_JOB_FIND_NEXT);
  1361. rtp.rqs[rtp.rqs_last_prepared] = replication_request_get_first_available();
  1362. rq = &rtp.rqs[rtp.rqs_last_prepared];
  1363. if(rq->found) {
  1364. if (!rq->st) {
  1365. worker_is_busy(WORKER_JOB_FIND_CHART);
  1366. rq->st = rrdset_find(rq->sender->host, string2str(rq->chart_id));
  1367. }
  1368. if (rq->st && !rq->q) {
  1369. worker_is_busy(WORKER_JOB_PREPARE_QUERY);
  1370. rq->q = replication_response_prepare(
  1371. rq->st,
  1372. rq->start_streaming,
  1373. rq->after,
  1374. rq->before,
  1375. rq->sender->capabilities);
  1376. }
  1377. rq->executed = false;
  1378. }
  1379. } while(rq->found && rtp.rqs_last_prepared != rtp.rqs_last_executed);
  1380. // pick the first usable
  1381. do {
  1382. if (++rtp.rqs_last_executed >= rtp.max_requests_ahead)
  1383. rtp.rqs_last_executed = 0;
  1384. rq = &rtp.rqs[rtp.rqs_last_executed];
  1385. if(rq->found) {
  1386. internal_fatal(rq->executed, "REPLAY FATAL: query has already been executed!");
  1387. if (rq->sender_last_flush_ut != rrdpush_sender_get_flush_time(rq->sender)) {
  1388. // the sender has reconnected since this request was queued,
  1389. // we can safely throw it away, since the parent will resend it
  1390. replication_response_cancel_and_finalize(rq->q);
  1391. rq->executed = true;
  1392. rq->found = false;
  1393. rq->q = NULL;
  1394. }
  1395. else if (rrdpush_sender_replication_buffer_full_get(rq->sender)) {
  1396. // the sender buffer is full, so we can ignore this request,
  1397. // it has already been marked as 'preprocessed' in the dictionary,
  1398. // and the sender will put it back in when there is
  1399. // enough room in the buffer for processing replication requests
  1400. replication_response_cancel_and_finalize(rq->q);
  1401. rq->executed = true;
  1402. rq->found = false;
  1403. rq->q = NULL;
  1404. }
  1405. else {
  1406. // we can execute this,
  1407. // delete it from the dictionary
  1408. worker_is_busy(WORKER_JOB_DELETE_ENTRY);
  1409. dictionary_del(rq->sender->replication.requests, string2str(rq->chart_id));
  1410. }
  1411. }
  1412. else
  1413. internal_fatal(rq->q, "REPLAY FATAL: slot status says slot is empty, but it has a pending query!");
  1414. } while(!rq->found && rtp.rqs_last_executed != rtp.rqs_last_prepared);
  1415. if(unlikely(!rq->found)) {
  1416. worker_is_idle();
  1417. return REQUEST_QUEUE_EMPTY;
  1418. }
  1419. replication_set_latest_first_time(rq->after);
  1420. bool chart_found = replication_execute_request(rq, true);
  1421. rq->executed = true;
  1422. rq->found = false;
  1423. rq->q = NULL;
  1424. if(unlikely(!chart_found)) {
  1425. worker_is_idle();
  1426. return REQUEST_CHART_NOT_FOUND;
  1427. }
  1428. worker_is_idle();
  1429. return REQUEST_OK;
  1430. }
  1431. static void replication_worker_cleanup(void *ptr __maybe_unused) {
  1432. replication_pipeline_cancel_and_cleanup();
  1433. worker_unregister();
  1434. }
  1435. static void *replication_worker_thread(void *ptr) {
  1436. replication_initialize_workers(false);
  1437. netdata_thread_cleanup_push(replication_worker_cleanup, ptr);
  1438. while(service_running(SERVICE_REPLICATION)) {
  1439. if(unlikely(replication_pipeline_execute_next() == REQUEST_QUEUE_EMPTY)) {
  1440. sender_thread_buffer_free();
  1441. worker_is_busy(WORKER_JOB_WAIT);
  1442. worker_is_idle();
  1443. sleep_usec(1 * USEC_PER_SEC);
  1444. }
  1445. }
  1446. netdata_thread_cleanup_pop(1);
  1447. return NULL;
  1448. }
  1449. static void replication_main_cleanup(void *ptr) {
  1450. struct netdata_static_thread *static_thread = (struct netdata_static_thread *)ptr;
  1451. static_thread->enabled = NETDATA_MAIN_THREAD_EXITING;
  1452. replication_pipeline_cancel_and_cleanup();
  1453. int threads = (int)replication_globals.main_thread.threads;
  1454. for(int i = 0; i < threads ;i++) {
  1455. netdata_thread_join(*replication_globals.main_thread.threads_ptrs[i], NULL);
  1456. freez(replication_globals.main_thread.threads_ptrs[i]);
  1457. __atomic_sub_fetch(&replication_buffers_allocated, sizeof(netdata_thread_t), __ATOMIC_RELAXED);
  1458. }
  1459. freez(replication_globals.main_thread.threads_ptrs);
  1460. replication_globals.main_thread.threads_ptrs = NULL;
  1461. __atomic_sub_fetch(&replication_buffers_allocated, threads * sizeof(netdata_thread_t *), __ATOMIC_RELAXED);
  1462. aral_destroy(replication_globals.aral_rse);
  1463. replication_globals.aral_rse = NULL;
  1464. // custom code
  1465. worker_unregister();
  1466. static_thread->enabled = NETDATA_MAIN_THREAD_EXITED;
  1467. }
  1468. void replication_initialize(void) {
  1469. replication_globals.aral_rse = aral_create("rse", sizeof(struct replication_sort_entry),
  1470. 0, 65536, aral_by_size_statistics(),
  1471. NULL, NULL, false, false);
  1472. }
  1473. void *replication_thread_main(void *ptr __maybe_unused) {
  1474. replication_initialize_workers(true);
  1475. int threads = config_get_number(CONFIG_SECTION_DB, "replication threads", 1);
  1476. if(threads < 1 || threads > MAX_REPLICATION_THREADS) {
  1477. error("replication threads given %d is invalid, resetting to 1", threads);
  1478. threads = 1;
  1479. }
  1480. if(--threads) {
  1481. replication_globals.main_thread.threads = threads;
  1482. replication_globals.main_thread.threads_ptrs = mallocz(threads * sizeof(netdata_thread_t *));
  1483. __atomic_add_fetch(&replication_buffers_allocated, threads * sizeof(netdata_thread_t *), __ATOMIC_RELAXED);
  1484. for(int i = 0; i < threads ;i++) {
  1485. char tag[NETDATA_THREAD_TAG_MAX + 1];
  1486. snprintfz(tag, NETDATA_THREAD_TAG_MAX, "REPLAY[%d]", i + 2);
  1487. replication_globals.main_thread.threads_ptrs[i] = mallocz(sizeof(netdata_thread_t));
  1488. __atomic_add_fetch(&replication_buffers_allocated, sizeof(netdata_thread_t), __ATOMIC_RELAXED);
  1489. netdata_thread_create(replication_globals.main_thread.threads_ptrs[i], tag,
  1490. NETDATA_THREAD_OPTION_JOINABLE, replication_worker_thread, NULL);
  1491. }
  1492. }
  1493. netdata_thread_cleanup_push(replication_main_cleanup, ptr);
  1494. // start from 100% completed
  1495. worker_set_metric(WORKER_JOB_CUSTOM_METRIC_COMPLETION, 100.0);
  1496. long run_verification_countdown = LONG_MAX; // LONG_MAX to prevent an initial verification when no replication ever took place
  1497. bool slow = true; // control the time we sleep - it has to start with true!
  1498. usec_t last_now_mono_ut = now_monotonic_usec();
  1499. time_t replication_reset_next_point_in_time_countdown = SECONDS_TO_RESET_POINT_IN_TIME; // restart from the beginning every 10 seconds
  1500. size_t last_executed = 0;
  1501. size_t last_sender_resets = 0;
  1502. while(service_running(SERVICE_REPLICATION)) {
  1503. // statistics
  1504. usec_t now_mono_ut = now_monotonic_usec();
  1505. if(unlikely(now_mono_ut - last_now_mono_ut > default_rrd_update_every * USEC_PER_SEC)) {
  1506. last_now_mono_ut = now_mono_ut;
  1507. worker_is_busy(WORKER_JOB_STATISTICS);
  1508. replication_recursive_lock();
  1509. size_t current_executed = __atomic_load_n(&replication_globals.atomic.executed, __ATOMIC_RELAXED);
  1510. if(last_executed != current_executed) {
  1511. run_verification_countdown = ITERATIONS_IDLE_WITHOUT_PENDING_TO_RUN_SENDER_VERIFICATION;
  1512. last_executed = current_executed;
  1513. slow = false;
  1514. }
  1515. if(replication_reset_next_point_in_time_countdown-- == 0) {
  1516. // once per second, make it scan all the pending requests next time
  1517. replication_set_next_point_in_time(0, 0);
  1518. // replication_globals.protected.skipped_no_room_since_last_reset = 0;
  1519. replication_reset_next_point_in_time_countdown = SECONDS_TO_RESET_POINT_IN_TIME;
  1520. }
  1521. if(--run_verification_countdown == 0) {
  1522. if (!replication_globals.unsafe.pending && !replication_globals.unsafe.pending_no_room) {
  1523. // reset the statistics about completion percentage
  1524. replication_globals.unsafe.first_time_t = 0;
  1525. replication_set_latest_first_time(0);
  1526. verify_all_hosts_charts_are_streaming_now();
  1527. run_verification_countdown = LONG_MAX;
  1528. slow = true;
  1529. }
  1530. else
  1531. run_verification_countdown = ITERATIONS_IDLE_WITHOUT_PENDING_TO_RUN_SENDER_VERIFICATION;
  1532. }
  1533. time_t latest_first_time_t = replication_get_latest_first_time();
  1534. if(latest_first_time_t && replication_globals.unsafe.pending) {
  1535. // completion percentage statistics
  1536. time_t now = now_realtime_sec();
  1537. time_t total = now - replication_globals.unsafe.first_time_t;
  1538. time_t done = latest_first_time_t - replication_globals.unsafe.first_time_t;
  1539. worker_set_metric(WORKER_JOB_CUSTOM_METRIC_COMPLETION,
  1540. (NETDATA_DOUBLE) done * 100.0 / (NETDATA_DOUBLE) total);
  1541. }
  1542. else
  1543. worker_set_metric(WORKER_JOB_CUSTOM_METRIC_COMPLETION, 100.0);
  1544. worker_set_metric(WORKER_JOB_CUSTOM_METRIC_PENDING_REQUESTS, (NETDATA_DOUBLE)replication_globals.unsafe.pending);
  1545. worker_set_metric(WORKER_JOB_CUSTOM_METRIC_ADDED, (NETDATA_DOUBLE)replication_globals.unsafe.added);
  1546. worker_set_metric(WORKER_JOB_CUSTOM_METRIC_DONE, (NETDATA_DOUBLE)__atomic_load_n(&replication_globals.atomic.executed, __ATOMIC_RELAXED));
  1547. worker_set_metric(WORKER_JOB_CUSTOM_METRIC_SKIPPED_NO_ROOM, (NETDATA_DOUBLE)replication_globals.unsafe.pending_no_room);
  1548. worker_set_metric(WORKER_JOB_CUSTOM_METRIC_SENDER_RESETS, (NETDATA_DOUBLE)replication_globals.unsafe.sender_resets);
  1549. worker_set_metric(WORKER_JOB_CUSTOM_METRIC_SENDER_FULL, (NETDATA_DOUBLE)replication_globals.unsafe.senders_full);
  1550. replication_recursive_unlock();
  1551. worker_is_idle();
  1552. }
  1553. if(unlikely(replication_pipeline_execute_next() == REQUEST_QUEUE_EMPTY)) {
  1554. worker_is_busy(WORKER_JOB_WAIT);
  1555. replication_recursive_lock();
  1556. // the timeout also defines now frequently we will traverse all the pending requests
  1557. // when the outbound buffers of all senders is full
  1558. usec_t timeout;
  1559. if(slow) {
  1560. // no work to be done, wait for a request to come in
  1561. timeout = 1000 * USEC_PER_MS;
  1562. sender_thread_buffer_free();
  1563. }
  1564. else if(replication_globals.unsafe.pending > 0) {
  1565. if(replication_globals.unsafe.sender_resets == last_sender_resets)
  1566. timeout = 1000 * USEC_PER_MS;
  1567. else {
  1568. // there are pending requests waiting to be executed,
  1569. // but none could be executed at this time.
  1570. // try again after this time.
  1571. timeout = 100 * USEC_PER_MS;
  1572. }
  1573. last_sender_resets = replication_globals.unsafe.sender_resets;
  1574. }
  1575. else {
  1576. // no requests pending, but there were requests recently (run_verification_countdown)
  1577. // so, try in a short time.
  1578. // if this is big, one chart replicating will be slow to finish (ping - pong just one chart)
  1579. timeout = 10 * USEC_PER_MS;
  1580. last_sender_resets = replication_globals.unsafe.sender_resets;
  1581. }
  1582. replication_recursive_unlock();
  1583. worker_is_idle();
  1584. sleep_usec(timeout);
  1585. // make it scan all the pending requests next time
  1586. replication_set_next_point_in_time(0, 0);
  1587. replication_reset_next_point_in_time_countdown = SECONDS_TO_RESET_POINT_IN_TIME;
  1588. continue;
  1589. }
  1590. }
  1591. netdata_thread_cleanup_pop(1);
  1592. return NULL;
  1593. }