replication.c 76 KB

12345678910111213141516171819202122232425262728293031323334353637383940414243444546474849505152535455565758596061626364656667686970717273747576777879808182838485868788899091929394959697989910010110210310410510610710810911011111211311411511611711811912012112212312412512612712812913013113213313413513613713813914014114214314414514614714814915015115215315415515615715815916016116216316416516616716816917017117217317417517617717817918018118218318418518618718818919019119219319419519619719819920020120220320420520620720820921021121221321421521621721821922022122222322422522622722822923023123223323423523623723823924024124224324424524624724824925025125225325425525625725825926026126226326426526626726826927027127227327427527627727827928028128228328428528628728828929029129229329429529629729829930030130230330430530630730830931031131231331431531631731831932032132232332432532632732832933033133233333433533633733833934034134234334434534634734834935035135235335435535635735835936036136236336436536636736836937037137237337437537637737837938038138238338438538638738838939039139239339439539639739839940040140240340440540640740840941041141241341441541641741841942042142242342442542642742842943043143243343443543643743843944044144244344444544644744844945045145245345445545645745845946046146246346446546646746846947047147247347447547647747847948048148248348448548648748848949049149249349449549649749849950050150250350450550650750850951051151251351451551651751851952052152252352452552652752852953053153253353453553653753853954054154254354454554654754854955055155255355455555655755855956056156256356456556656756856957057157257357457557657757857958058158258358458558658758858959059159259359459559659759859960060160260360460560660760860961061161261361461561661761861962062162262362462562662762862963063163263363463563663763863964064164264364464564664764864965065165265365465565665765865966066166266366466566666766866967067167267367467567667767867968068168268368468568668768868969069169269369469569669769869970070170270370470570670770870971071171271371471571671771871972072172272372472572672772872973073173273373473573673773873974074174274374474574674774874975075175275375475575675775875976076176276376476576676776876977077177277377477577677777877978078178278378478578678778878979079179279379479579679779879980080180280380480580680780880981081181281381481581681781881982082182282382482582682782882983083183283383483583683783883984084184284384484584684784884985085185285385485585685785885986086186286386486586686786886987087187287387487587687787887988088188288388488588688788888989089189289389489589689789889990090190290390490590690790890991091191291391491591691791891992092192292392492592692792892993093193293393493593693793893994094194294394494594694794894995095195295395495595695795895996096196296396496596696796896997097197297397497597697797897998098198298398498598698798898999099199299399499599699799899910001001100210031004100510061007100810091010101110121013101410151016101710181019102010211022102310241025102610271028102910301031103210331034103510361037103810391040104110421043104410451046104710481049105010511052105310541055105610571058105910601061106210631064106510661067106810691070107110721073107410751076107710781079108010811082108310841085108610871088108910901091109210931094109510961097109810991100110111021103110411051106110711081109111011111112111311141115111611171118111911201121112211231124112511261127112811291130113111321133113411351136113711381139114011411142114311441145114611471148114911501151115211531154115511561157115811591160116111621163116411651166116711681169117011711172117311741175117611771178117911801181118211831184118511861187118811891190119111921193119411951196119711981199120012011202120312041205120612071208120912101211121212131214121512161217121812191220122112221223122412251226122712281229123012311232123312341235123612371238123912401241124212431244124512461247124812491250125112521253125412551256125712581259126012611262126312641265126612671268126912701271127212731274127512761277127812791280128112821283128412851286128712881289129012911292129312941295129612971298129913001301130213031304130513061307130813091310131113121313131413151316131713181319132013211322132313241325132613271328132913301331133213331334133513361337133813391340134113421343134413451346134713481349135013511352135313541355135613571358135913601361136213631364136513661367136813691370137113721373137413751376137713781379138013811382138313841385138613871388138913901391139213931394139513961397139813991400140114021403140414051406140714081409141014111412141314141415141614171418141914201421142214231424142514261427142814291430143114321433143414351436143714381439144014411442144314441445144614471448144914501451145214531454145514561457145814591460146114621463146414651466146714681469147014711472147314741475147614771478147914801481148214831484148514861487148814891490149114921493149414951496149714981499150015011502150315041505150615071508150915101511151215131514151515161517151815191520152115221523152415251526152715281529153015311532153315341535153615371538153915401541154215431544154515461547154815491550155115521553155415551556155715581559156015611562156315641565156615671568156915701571157215731574157515761577157815791580158115821583158415851586158715881589159015911592159315941595159615971598159916001601160216031604160516061607160816091610161116121613161416151616161716181619162016211622162316241625162616271628162916301631163216331634163516361637163816391640164116421643164416451646164716481649165016511652165316541655165616571658165916601661166216631664166516661667166816691670167116721673167416751676167716781679168016811682168316841685168616871688168916901691169216931694169516961697169816991700170117021703170417051706170717081709171017111712171317141715171617171718171917201721172217231724172517261727172817291730173117321733173417351736173717381739174017411742174317441745174617471748174917501751175217531754175517561757175817591760176117621763176417651766176717681769177017711772177317741775177617771778177917801781178217831784178517861787178817891790179117921793179417951796179717981799180018011802180318041805180618071808180918101811181218131814181518161817181818191820182118221823182418251826182718281829183018311832183318341835183618371838183918401841184218431844184518461847184818491850185118521853185418551856185718581859186018611862186318641865186618671868186918701871187218731874187518761877187818791880188118821883188418851886188718881889189018911892189318941895189618971898189919001901190219031904190519061907190819091910191119121913191419151916
  1. // SPDX-License-Identifier: GPL-3.0-or-later
  2. #include "replication.h"
  3. #include "Judy.h"
  4. #define STREAMING_START_MAX_SENDER_BUFFER_PERCENTAGE_ALLOWED 50ULL
  5. #define MAX_REPLICATION_MESSAGE_PERCENT_SENDER_BUFFER 25ULL
  6. #define MAX_SENDER_BUFFER_PERCENTAGE_ALLOWED 50ULL
  7. #define MIN_SENDER_BUFFER_PERCENTAGE_ALLOWED 10ULL
  8. #define WORKER_JOB_FIND_NEXT 1
  9. #define WORKER_JOB_QUERYING 2
  10. #define WORKER_JOB_DELETE_ENTRY 3
  11. #define WORKER_JOB_FIND_CHART 4
  12. #define WORKER_JOB_PREPARE_QUERY 5
  13. #define WORKER_JOB_CHECK_CONSISTENCY 6
  14. #define WORKER_JOB_BUFFER_COMMIT 7
  15. #define WORKER_JOB_CLEANUP 8
  16. #define WORKER_JOB_WAIT 9
  17. // master thread worker jobs
  18. #define WORKER_JOB_STATISTICS 10
  19. #define WORKER_JOB_CUSTOM_METRIC_PENDING_REQUESTS 11
  20. #define WORKER_JOB_CUSTOM_METRIC_SKIPPED_NO_ROOM 12
  21. #define WORKER_JOB_CUSTOM_METRIC_COMPLETION 13
  22. #define WORKER_JOB_CUSTOM_METRIC_ADDED 14
  23. #define WORKER_JOB_CUSTOM_METRIC_DONE 15
  24. #define WORKER_JOB_CUSTOM_METRIC_SENDER_RESETS 16
  25. #define WORKER_JOB_CUSTOM_METRIC_SENDER_FULL 17
  26. #define ITERATIONS_IDLE_WITHOUT_PENDING_TO_RUN_SENDER_VERIFICATION 30
  27. #define SECONDS_TO_RESET_POINT_IN_TIME 10
  28. static struct replication_query_statistics replication_queries = {
  29. .spinlock = NETDATA_SPINLOCK_INITIALIZER,
  30. .queries_started = 0,
  31. .queries_finished = 0,
  32. .points_read = 0,
  33. .points_generated = 0,
  34. };
  35. struct replication_query_statistics replication_get_query_statistics(void) {
  36. netdata_spinlock_lock(&replication_queries.spinlock);
  37. struct replication_query_statistics ret = replication_queries;
  38. netdata_spinlock_unlock(&replication_queries.spinlock);
  39. return ret;
  40. }
  41. size_t replication_buffers_allocated = 0;
  42. size_t replication_allocated_buffers(void) {
  43. return __atomic_load_n(&replication_buffers_allocated, __ATOMIC_RELAXED);
  44. }
  45. // ----------------------------------------------------------------------------
  46. // sending replication replies
  47. struct replication_dimension {
  48. STORAGE_POINT sp;
  49. struct storage_engine_query_handle handle;
  50. bool enabled;
  51. bool skip;
  52. DICTIONARY *dict;
  53. const DICTIONARY_ITEM *rda;
  54. RRDDIM *rd;
  55. };
  56. struct replication_query {
  57. RRDSET *st;
  58. struct {
  59. time_t first_entry_t;
  60. time_t last_entry_t;
  61. } db;
  62. struct { // what the parent requested
  63. time_t after;
  64. time_t before;
  65. bool enable_streaming;
  66. } request;
  67. struct { // what the child will do
  68. time_t after;
  69. time_t before;
  70. bool enable_streaming;
  71. bool locked_data_collection;
  72. bool execute;
  73. bool interrupted;
  74. } query;
  75. time_t wall_clock_time;
  76. size_t points_read;
  77. size_t points_generated;
  78. struct storage_engine_query_ops *ops;
  79. struct replication_request *rq;
  80. size_t dimensions;
  81. struct replication_dimension data[];
  82. };
  83. static struct replication_query *replication_query_prepare(
  84. RRDSET *st,
  85. time_t db_first_entry,
  86. time_t db_last_entry,
  87. time_t requested_after,
  88. time_t requested_before,
  89. bool requested_enable_streaming,
  90. time_t query_after,
  91. time_t query_before,
  92. bool query_enable_streaming,
  93. time_t wall_clock_time
  94. ) {
  95. size_t dimensions = rrdset_number_of_dimensions(st);
  96. struct replication_query *q = callocz(1, sizeof(struct replication_query) + dimensions * sizeof(struct replication_dimension));
  97. __atomic_add_fetch(&replication_buffers_allocated, sizeof(struct replication_query) + dimensions * sizeof(struct replication_dimension), __ATOMIC_RELAXED);
  98. q->dimensions = dimensions;
  99. q->st = st;
  100. q->db.first_entry_t = db_first_entry;
  101. q->db.last_entry_t = db_last_entry;
  102. q->request.after = requested_after,
  103. q->request.before = requested_before,
  104. q->request.enable_streaming = requested_enable_streaming,
  105. q->query.after = query_after;
  106. q->query.before = query_before;
  107. q->query.enable_streaming = query_enable_streaming;
  108. q->wall_clock_time = wall_clock_time;
  109. if (!q->dimensions || !q->query.after || !q->query.before) {
  110. q->query.execute = false;
  111. q->dimensions = 0;
  112. return q;
  113. }
  114. if(q->query.enable_streaming) {
  115. netdata_spinlock_lock(&st->data_collection_lock);
  116. q->query.locked_data_collection = true;
  117. if (st->last_updated.tv_sec > q->query.before) {
  118. #ifdef NETDATA_LOG_REPLICATION_REQUESTS
  119. internal_error(true,
  120. "STREAM_SENDER REPLAY: 'host:%s/chart:%s' "
  121. "has start_streaming = true, "
  122. "adjusting replication before timestamp from %llu to %llu",
  123. rrdhost_hostname(st->rrdhost), rrdset_id(st),
  124. (unsigned long long) q->query.before,
  125. (unsigned long long) st->last_updated.tv_sec
  126. );
  127. #endif
  128. q->query.before = MIN(st->last_updated.tv_sec, wall_clock_time);
  129. }
  130. }
  131. q->ops = &st->rrdhost->db[0].eng->api.query_ops;
  132. // prepare our array of dimensions
  133. size_t count = 0;
  134. RRDDIM *rd;
  135. rrddim_foreach_read(rd, st) {
  136. if (unlikely(!rd || !rd_dfe.item || !rd->exposed))
  137. continue;
  138. if (unlikely(rd_dfe.counter >= q->dimensions)) {
  139. internal_error(true,
  140. "STREAM_SENDER REPLAY ERROR: 'host:%s/chart:%s' has more dimensions than the replicated ones",
  141. rrdhost_hostname(st->rrdhost), rrdset_id(st));
  142. break;
  143. }
  144. struct replication_dimension *d = &q->data[rd_dfe.counter];
  145. d->dict = rd_dfe.dict;
  146. d->rda = dictionary_acquired_item_dup(rd_dfe.dict, rd_dfe.item);
  147. d->rd = rd;
  148. q->ops->init(rd->tiers[0].db_metric_handle, &d->handle, q->query.after, q->query.before,
  149. q->query.locked_data_collection ? STORAGE_PRIORITY_HIGH : STORAGE_PRIORITY_LOW);
  150. d->enabled = true;
  151. d->skip = false;
  152. count++;
  153. }
  154. rrddim_foreach_done(rd);
  155. if(!count) {
  156. // no data for this chart
  157. q->query.execute = false;
  158. if(q->query.locked_data_collection) {
  159. netdata_spinlock_unlock(&st->data_collection_lock);
  160. q->query.locked_data_collection = false;
  161. }
  162. }
  163. else {
  164. // we have data for this chart
  165. q->query.execute = true;
  166. }
  167. return q;
  168. }
  169. static void replication_send_chart_collection_state(BUFFER *wb, RRDSET *st) {
  170. RRDDIM *rd;
  171. rrddim_foreach_read(rd, st) {
  172. if(!rd->exposed) continue;
  173. buffer_sprintf(wb, PLUGINSD_KEYWORD_REPLAY_RRDDIM_STATE " \"%s\" %llu %lld " NETDATA_DOUBLE_FORMAT " " NETDATA_DOUBLE_FORMAT "\n",
  174. rrddim_id(rd),
  175. (usec_t)rd->last_collected_time.tv_sec * USEC_PER_SEC + (usec_t)rd->last_collected_time.tv_usec,
  176. rd->last_collected_value,
  177. rd->last_calculated_value,
  178. rd->last_stored_value
  179. );
  180. }
  181. rrddim_foreach_done(rd);
  182. buffer_sprintf(wb, PLUGINSD_KEYWORD_REPLAY_RRDSET_STATE " %llu %llu\n",
  183. (usec_t)st->last_collected_time.tv_sec * USEC_PER_SEC + (usec_t)st->last_collected_time.tv_usec,
  184. (usec_t)st->last_updated.tv_sec * USEC_PER_SEC + (usec_t)st->last_updated.tv_usec
  185. );
  186. }
  187. static void replication_query_finalize(BUFFER *wb, struct replication_query *q, bool executed) {
  188. size_t dimensions = q->dimensions;
  189. if(wb && q->query.enable_streaming)
  190. replication_send_chart_collection_state(wb, q->st);
  191. if(q->query.locked_data_collection) {
  192. netdata_spinlock_unlock(&q->st->data_collection_lock);
  193. q->query.locked_data_collection = false;
  194. }
  195. // release all the dictionary items acquired
  196. // finalize the queries
  197. size_t queries = 0;
  198. for (size_t i = 0; i < dimensions; i++) {
  199. struct replication_dimension *d = &q->data[i];
  200. if (unlikely(!d->enabled)) continue;
  201. q->ops->finalize(&d->handle);
  202. dictionary_acquired_item_release(d->dict, d->rda);
  203. // update global statistics
  204. queries++;
  205. }
  206. if(executed) {
  207. netdata_spinlock_lock(&replication_queries.spinlock);
  208. replication_queries.queries_started += queries;
  209. replication_queries.queries_finished += queries;
  210. replication_queries.points_read += q->points_read;
  211. replication_queries.points_generated += q->points_generated;
  212. netdata_spinlock_unlock(&replication_queries.spinlock);
  213. }
  214. __atomic_sub_fetch(&replication_buffers_allocated, sizeof(struct replication_query) + dimensions * sizeof(struct replication_dimension), __ATOMIC_RELAXED);
  215. freez(q);
  216. }
  217. static void replication_query_align_to_optimal_before(struct replication_query *q) {
  218. if(!q->query.execute || q->query.enable_streaming)
  219. return;
  220. size_t dimensions = q->dimensions;
  221. time_t expanded_before = 0;
  222. for (size_t i = 0; i < dimensions; i++) {
  223. struct replication_dimension *d = &q->data[i];
  224. if(unlikely(!d->enabled)) continue;
  225. time_t new_before = q->ops->align_to_optimal_before(&d->handle);
  226. if (!expanded_before || new_before < expanded_before)
  227. expanded_before = new_before;
  228. }
  229. if(expanded_before > q->query.before && // it is later than the original
  230. (expanded_before - q->query.before) / q->st->update_every < 1024 && // it is reasonable (up to a page)
  231. expanded_before < q->st->last_updated.tv_sec && // it is not the chart's last updated time
  232. expanded_before < q->wall_clock_time) // it is not later than the wall clock time
  233. q->query.before = expanded_before;
  234. }
  235. static bool replication_query_execute(BUFFER *wb, struct replication_query *q, size_t max_msg_size) {
  236. replication_query_align_to_optimal_before(q);
  237. time_t after = q->query.after;
  238. time_t before = q->query.before;
  239. size_t dimensions = q->dimensions;
  240. struct storage_engine_query_ops *ops = q->ops;
  241. time_t wall_clock_time = q->wall_clock_time;
  242. size_t points_read = q->points_read, points_generated = q->points_generated;
  243. #ifdef NETDATA_LOG_REPLICATION_REQUESTS
  244. time_t actual_after = 0, actual_before = 0;
  245. #endif
  246. time_t now = after + 1;
  247. time_t last_end_time_in_buffer = 0;
  248. while(now <= before) {
  249. time_t min_start_time = 0, max_start_time = 0, min_end_time = 0, max_end_time = 0, min_update_every = 0, max_update_every = 0;
  250. for (size_t i = 0; i < dimensions ;i++) {
  251. struct replication_dimension *d = &q->data[i];
  252. if(unlikely(!d->enabled || d->skip)) continue;
  253. // fetch the first valid point for the dimension
  254. int max_skip = 1000;
  255. while(d->sp.end_time_s < now && !ops->is_finished(&d->handle) && max_skip-- >= 0) {
  256. d->sp = ops->next_metric(&d->handle);
  257. points_read++;
  258. }
  259. if(max_skip <= 0) {
  260. d->skip = true;
  261. error_limit_static_global_var(erl, 1, 0);
  262. error_limit(&erl,
  263. "STREAM_SENDER REPLAY ERROR: 'host:%s/chart:%s/dim:%s': db does not advance the query beyond time %llu (tried 1000 times to get the next point and always got back a point in the past)",
  264. rrdhost_hostname(q->st->rrdhost), rrdset_id(q->st), rrddim_id(d->rd),
  265. (unsigned long long) now);
  266. continue;
  267. }
  268. if(unlikely(d->sp.end_time_s < now || d->sp.end_time_s < d->sp.start_time_s))
  269. // this dimension does not provide any data
  270. continue;
  271. time_t update_every = d->sp.end_time_s - d->sp.start_time_s;
  272. if(unlikely(!update_every))
  273. update_every = q->st->update_every;
  274. if(unlikely(!min_update_every))
  275. min_update_every = update_every;
  276. if(unlikely(!min_start_time))
  277. min_start_time = d->sp.start_time_s;
  278. if(unlikely(!min_end_time))
  279. min_end_time = d->sp.end_time_s;
  280. min_update_every = MIN(min_update_every, update_every);
  281. max_update_every = MAX(max_update_every, update_every);
  282. min_start_time = MIN(min_start_time, d->sp.start_time_s);
  283. max_start_time = MAX(max_start_time, d->sp.start_time_s);
  284. min_end_time = MIN(min_end_time, d->sp.end_time_s);
  285. max_end_time = MAX(max_end_time, d->sp.end_time_s);
  286. }
  287. if (unlikely(min_update_every != max_update_every ||
  288. min_start_time != max_start_time)) {
  289. time_t fix_min_start_time;
  290. if(last_end_time_in_buffer &&
  291. last_end_time_in_buffer >= min_start_time &&
  292. last_end_time_in_buffer <= max_start_time) {
  293. fix_min_start_time = last_end_time_in_buffer;
  294. }
  295. else
  296. fix_min_start_time = min_end_time - min_update_every;
  297. error_limit_static_global_var(erl, 1, 0);
  298. error_limit(&erl, "REPLAY WARNING: 'host:%s/chart:%s' "
  299. "misaligned dimensions "
  300. "update every (min: %ld, max: %ld), "
  301. "start time (min: %ld, max: %ld), "
  302. "end time (min %ld, max %ld), "
  303. "now %ld, last end time sent %ld, "
  304. "min start time is fixed to %ld",
  305. rrdhost_hostname(q->st->rrdhost), rrdset_id(q->st),
  306. min_update_every, max_update_every,
  307. min_start_time, max_start_time,
  308. min_end_time, max_end_time,
  309. now, last_end_time_in_buffer,
  310. fix_min_start_time
  311. );
  312. min_start_time = fix_min_start_time;
  313. }
  314. if(likely(min_start_time <= now && min_end_time >= now)) {
  315. // we have a valid point
  316. if (unlikely(min_end_time == min_start_time))
  317. min_start_time = min_end_time - q->st->update_every;
  318. #ifdef NETDATA_LOG_REPLICATION_REQUESTS
  319. if (unlikely(!actual_after))
  320. actual_after = min_end_time;
  321. actual_before = min_end_time;
  322. #endif
  323. if(buffer_strlen(wb) > max_msg_size && last_end_time_in_buffer) {
  324. q->query.before = last_end_time_in_buffer;
  325. q->query.enable_streaming = false;
  326. internal_error(true, "REPLICATION: buffer size %zu is more than the max message size %zu for chart '%s' of host '%s'. "
  327. "Interrupting replication request (%ld to %ld, %s) at %ld to %ld, %s.",
  328. buffer_strlen(wb), max_msg_size, rrdset_id(q->st), rrdhost_hostname(q->st->rrdhost),
  329. q->request.after, q->request.before, q->request.enable_streaming?"true":"false",
  330. q->query.after, q->query.before, q->query.enable_streaming?"true":"false");
  331. q->query.interrupted = true;
  332. break;
  333. }
  334. last_end_time_in_buffer = min_end_time;
  335. buffer_sprintf(wb, PLUGINSD_KEYWORD_REPLAY_BEGIN " '' %llu %llu %llu\n",
  336. (unsigned long long) min_start_time,
  337. (unsigned long long) min_end_time,
  338. (unsigned long long) wall_clock_time
  339. );
  340. // output the replay values for this time
  341. for (size_t i = 0; i < dimensions; i++) {
  342. struct replication_dimension *d = &q->data[i];
  343. if (unlikely(!d->enabled)) continue;
  344. if (likely( d->sp.start_time_s <= min_end_time &&
  345. d->sp.end_time_s >= min_end_time &&
  346. !storage_point_is_unset(d->sp) &&
  347. !storage_point_is_gap(d->sp))) {
  348. buffer_sprintf(wb, PLUGINSD_KEYWORD_REPLAY_SET " \"%s\" " NETDATA_DOUBLE_FORMAT " \"%s\"\n",
  349. rrddim_id(d->rd), d->sp.sum, d->sp.flags & SN_FLAG_RESET ? "R" : "");
  350. points_generated++;
  351. }
  352. }
  353. now = min_end_time + 1;
  354. }
  355. else if(unlikely(min_end_time < now))
  356. // the query does not progress
  357. break;
  358. else
  359. // we have gap - all points are in the future
  360. now = min_start_time;
  361. }
  362. #ifdef NETDATA_LOG_REPLICATION_REQUESTS
  363. if(actual_after) {
  364. char actual_after_buf[LOG_DATE_LENGTH + 1], actual_before_buf[LOG_DATE_LENGTH + 1];
  365. log_date(actual_after_buf, LOG_DATE_LENGTH, actual_after);
  366. log_date(actual_before_buf, LOG_DATE_LENGTH, actual_before);
  367. internal_error(true,
  368. "STREAM_SENDER REPLAY: 'host:%s/chart:%s': sending data %llu [%s] to %llu [%s] (requested %llu [delta %lld] to %llu [delta %lld])",
  369. rrdhost_hostname(st->rrdhost), rrdset_id(st),
  370. (unsigned long long)actual_after, actual_after_buf, (unsigned long long)actual_before, actual_before_buf,
  371. (unsigned long long)after, (long long)(actual_after - after), (unsigned long long)before, (long long)(actual_before - before));
  372. }
  373. else
  374. internal_error(true,
  375. "STREAM_SENDER REPLAY: 'host:%s/chart:%s': nothing to send (requested %llu to %llu)",
  376. rrdhost_hostname(st->rrdhost), rrdset_id(st),
  377. (unsigned long long)after, (unsigned long long)before);
  378. #endif // NETDATA_LOG_REPLICATION_REQUESTS
  379. q->points_read = points_read;
  380. q->points_generated = points_generated;
  381. bool finished_with_gap = false;
  382. if(last_end_time_in_buffer < before - q->st->update_every)
  383. finished_with_gap = true;
  384. return finished_with_gap;
  385. }
  386. static struct replication_query *replication_response_prepare(RRDSET *st, bool requested_enable_streaming, time_t requested_after, time_t requested_before) {
  387. time_t wall_clock_time = now_realtime_sec();
  388. if(requested_after > requested_before) {
  389. // flip them
  390. time_t t = requested_before;
  391. requested_before = requested_after;
  392. requested_after = t;
  393. }
  394. if(requested_after > wall_clock_time) {
  395. requested_after = 0;
  396. requested_before = 0;
  397. requested_enable_streaming = true;
  398. }
  399. if(requested_before > wall_clock_time) {
  400. requested_before = wall_clock_time;
  401. requested_enable_streaming = true;
  402. }
  403. time_t query_after = requested_after;
  404. time_t query_before = requested_before;
  405. bool query_enable_streaming = requested_enable_streaming;
  406. time_t db_first_entry = 0, db_last_entry = 0;
  407. rrdset_get_retention_of_tier_for_collected_chart(st, &db_first_entry, &db_last_entry, wall_clock_time, 0);
  408. if(requested_after == 0 && requested_before == 0 && requested_enable_streaming == true) {
  409. // no data requested - just enable streaming
  410. ;
  411. }
  412. else {
  413. if (query_after < db_first_entry)
  414. query_after = db_first_entry;
  415. if (query_before > db_last_entry)
  416. query_before = db_last_entry;
  417. // if the parent asked us to start streaming, then fill the rest with the data that we have
  418. if (requested_enable_streaming)
  419. query_before = db_last_entry;
  420. if (query_after > query_before) {
  421. time_t tmp = query_before;
  422. query_before = query_after;
  423. query_after = tmp;
  424. }
  425. query_enable_streaming = (requested_enable_streaming ||
  426. query_before == db_last_entry ||
  427. !requested_after ||
  428. !requested_before) ? true : false;
  429. }
  430. return replication_query_prepare(
  431. st,
  432. db_first_entry, db_last_entry,
  433. requested_after, requested_before, requested_enable_streaming,
  434. query_after, query_before, query_enable_streaming,
  435. wall_clock_time);
  436. }
  437. void replication_response_cancel_and_finalize(struct replication_query *q) {
  438. replication_query_finalize(NULL, q, false);
  439. }
  440. static bool sender_is_still_connected_for_this_request(struct replication_request *rq);
  441. bool replication_response_execute_and_finalize(struct replication_query *q, size_t max_msg_size) {
  442. struct replication_request *rq = q->rq;
  443. RRDSET *st = q->st;
  444. RRDHOST *host = st->rrdhost;
  445. // we might want to optimize this by filling a temporary buffer
  446. // and copying the result to the host's buffer in order to avoid
  447. // holding the host's buffer lock for too long
  448. BUFFER *wb = sender_start(host->sender);
  449. buffer_sprintf(wb, PLUGINSD_KEYWORD_REPLAY_BEGIN " \"%s\"\n", rrdset_id(st));
  450. bool locked_data_collection = q->query.locked_data_collection;
  451. q->query.locked_data_collection = false;
  452. bool finished_with_gap = false;
  453. if(q->query.execute)
  454. finished_with_gap = replication_query_execute(wb, q, max_msg_size);
  455. time_t after = q->request.after;
  456. time_t before = q->query.before;
  457. bool enable_streaming = q->query.enable_streaming;
  458. replication_query_finalize(wb, q, q->query.execute);
  459. q = NULL; // IMPORTANT: q is invalid now
  460. // get a fresh retention to send to the parent
  461. time_t wall_clock_time = now_realtime_sec();
  462. time_t db_first_entry, db_last_entry;
  463. rrdset_get_retention_of_tier_for_collected_chart(st, &db_first_entry, &db_last_entry, wall_clock_time, 0);
  464. // end with first/last entries we have, and the first start time and
  465. // last end time of the data we sent
  466. buffer_sprintf(wb, PLUGINSD_KEYWORD_REPLAY_END " %d %llu %llu %s %llu %llu %llu\n",
  467. // current chart update every
  468. (int)st->update_every
  469. // child first db time, child end db time
  470. , (unsigned long long)db_first_entry, (unsigned long long)db_last_entry
  471. // start streaming boolean
  472. , enable_streaming ? "true" : "false"
  473. // after requested, before requested ('before' can be altered by the child when the request had enable_streaming true)
  474. , (unsigned long long)after, (unsigned long long)before
  475. // child world clock time
  476. , (unsigned long long)wall_clock_time
  477. );
  478. worker_is_busy(WORKER_JOB_BUFFER_COMMIT);
  479. sender_commit(host->sender, wb);
  480. worker_is_busy(WORKER_JOB_CLEANUP);
  481. if(enable_streaming) {
  482. if(sender_is_still_connected_for_this_request(rq)) {
  483. // enable normal streaming if we have to
  484. // but only if the sender buffer has not been flushed since we started
  485. if(rrdset_flag_check(st, RRDSET_FLAG_SENDER_REPLICATION_IN_PROGRESS)) {
  486. rrdset_flag_clear(st, RRDSET_FLAG_SENDER_REPLICATION_IN_PROGRESS);
  487. rrdset_flag_set(st, RRDSET_FLAG_SENDER_REPLICATION_FINISHED);
  488. rrdhost_sender_replicating_charts_minus_one(st->rrdhost);
  489. if(!finished_with_gap)
  490. st->upstream_resync_time_s = 0;
  491. #ifdef NETDATA_LOG_REPLICATION_REQUESTS
  492. internal_error(true, "STREAM_SENDER REPLAY: 'host:%s/chart:%s' streaming starts",
  493. rrdhost_hostname(st->rrdhost), rrdset_id(st));
  494. #endif
  495. }
  496. else
  497. internal_error(true, "REPLAY ERROR: 'host:%s/chart:%s' received start streaming command, but the chart is not in progress replicating",
  498. rrdhost_hostname(st->rrdhost), rrdset_id(st));
  499. }
  500. }
  501. if(locked_data_collection)
  502. netdata_spinlock_unlock(&st->data_collection_lock);
  503. return enable_streaming;
  504. }
  505. // ----------------------------------------------------------------------------
  506. // sending replication requests
  507. struct replication_request_details {
  508. struct {
  509. send_command callback;
  510. void *data;
  511. } caller;
  512. RRDHOST *host;
  513. RRDSET *st;
  514. struct {
  515. time_t first_entry_t; // the first entry time the child has
  516. time_t last_entry_t; // the last entry time the child has
  517. time_t wall_clock_time; // the current time of the child
  518. bool fixed_last_entry; // when set we set the last entry to wall clock time
  519. } child_db;
  520. struct {
  521. time_t first_entry_t; // the first entry time we have
  522. time_t last_entry_t; // the last entry time we have
  523. time_t wall_clock_time; // the current local world clock time
  524. } local_db;
  525. struct {
  526. time_t from; // the starting time of the entire gap we have
  527. time_t to; // the ending time of the entire gap we have
  528. } gap;
  529. struct {
  530. time_t after; // the start time we requested previously from this child
  531. time_t before; // the end time we requested previously from this child
  532. } last_request;
  533. struct {
  534. time_t after; // the start time of this replication request - the child will add 1 second
  535. time_t before; // the end time of this replication request
  536. bool start_streaming; // true when we want the child to send anything remaining and start streaming - the child will overwrite 'before'
  537. } wanted;
  538. };
  539. static void replicate_log_request(struct replication_request_details *r, const char *msg) {
  540. #ifdef NETDATA_INTERNAL_CHECKS
  541. internal_error(true,
  542. #else
  543. error_limit_static_global_var(erl, 1, 0);
  544. error_limit(&erl,
  545. #endif
  546. "REPLAY ERROR: 'host:%s/chart:%s' child sent: "
  547. "db from %ld to %ld%s, wall clock time %ld, "
  548. "last request from %ld to %ld, "
  549. "issue: %s - "
  550. "sending replication request from %ld to %ld, start streaming %s",
  551. rrdhost_hostname(r->st->rrdhost), rrdset_id(r->st),
  552. r->child_db.first_entry_t,
  553. r->child_db.last_entry_t, r->child_db.fixed_last_entry ? " (fixed)" : "",
  554. r->child_db.wall_clock_time,
  555. r->last_request.after,
  556. r->last_request.before,
  557. msg,
  558. r->wanted.after,
  559. r->wanted.before,
  560. r->wanted.start_streaming ? "true" : "false");
  561. }
  562. static bool send_replay_chart_cmd(struct replication_request_details *r, const char *msg, bool log) {
  563. RRDSET *st = r->st;
  564. if(log)
  565. replicate_log_request(r, msg);
  566. if(st->rrdhost->receiver && (!st->rrdhost->receiver->replication_first_time_t || r->wanted.after < st->rrdhost->receiver->replication_first_time_t))
  567. st->rrdhost->receiver->replication_first_time_t = r->wanted.after;
  568. #ifdef NETDATA_LOG_REPLICATION_REQUESTS
  569. st->replay.log_next_data_collection = true;
  570. char wanted_after_buf[LOG_DATE_LENGTH + 1] = "", wanted_before_buf[LOG_DATE_LENGTH + 1] = "";
  571. if(r->wanted.after)
  572. log_date(wanted_after_buf, LOG_DATE_LENGTH, r->wanted.after);
  573. if(r->wanted.before)
  574. log_date(wanted_before_buf, LOG_DATE_LENGTH, r->wanted.before);
  575. internal_error(true,
  576. "REPLAY: 'host:%s/chart:%s' sending replication request %ld [%s] to %ld [%s], start streaming '%s': %s: "
  577. "last[%ld - %ld] child[%ld - %ld, now %ld %s] local[%ld - %ld, now %ld] gap[%ld - %ld %s] %s"
  578. , rrdhost_hostname(r->host), rrdset_id(r->st)
  579. , r->wanted.after, wanted_after_buf
  580. , r->wanted.before, wanted_before_buf
  581. , r->wanted.start_streaming ? "YES" : "NO"
  582. , msg
  583. , r->last_request.after, r->last_request.before
  584. , r->child_db.first_entry_t, r->child_db.last_entry_t
  585. , r->child_db.world_time_t, (r->child_db.world_time_t == r->local_db.now) ? "SAME" : (r->child_db.world_time_t < r->local_db.now) ? "BEHIND" : "AHEAD"
  586. , r->local_db.first_entry_t, r->local_db.last_entry_t
  587. , r->local_db.now
  588. , r->gap.from, r->gap.to
  589. , (r->gap.from == r->wanted.after) ? "FULL" : "PARTIAL"
  590. , (st->replay.after != 0 || st->replay.before != 0) ? "OVERLAPPING" : ""
  591. );
  592. st->replay.start_streaming = r->wanted.start_streaming;
  593. st->replay.after = r->wanted.after;
  594. st->replay.before = r->wanted.before;
  595. #endif // NETDATA_LOG_REPLICATION_REQUESTS
  596. char buffer[2048 + 1];
  597. snprintfz(buffer, 2048, PLUGINSD_KEYWORD_REPLAY_CHART " \"%s\" \"%s\" %llu %llu\n",
  598. rrdset_id(st), r->wanted.start_streaming ? "true" : "false",
  599. (unsigned long long)r->wanted.after, (unsigned long long)r->wanted.before);
  600. int ret = r->caller.callback(buffer, r->caller.data);
  601. if (ret < 0) {
  602. error("REPLAY ERROR: 'host:%s/chart:%s' failed to send replication request to child (error %d)",
  603. rrdhost_hostname(r->host), rrdset_id(r->st), ret);
  604. return false;
  605. }
  606. return true;
  607. }
  608. bool replicate_chart_request(send_command callback, void *callback_data, RRDHOST *host, RRDSET *st,
  609. time_t child_first_entry, time_t child_last_entry, time_t child_wall_clock_time,
  610. time_t prev_first_entry_wanted, time_t prev_last_entry_wanted)
  611. {
  612. struct replication_request_details r = {
  613. .caller = {
  614. .callback = callback,
  615. .data = callback_data,
  616. },
  617. .host = host,
  618. .st = st,
  619. .child_db = {
  620. .first_entry_t = child_first_entry,
  621. .last_entry_t = child_last_entry,
  622. .wall_clock_time = child_wall_clock_time,
  623. .fixed_last_entry = false,
  624. },
  625. .local_db = {
  626. .first_entry_t = 0,
  627. .last_entry_t = 0,
  628. .wall_clock_time = now_realtime_sec(),
  629. },
  630. .last_request = {
  631. .after = prev_first_entry_wanted,
  632. .before = prev_last_entry_wanted,
  633. },
  634. .wanted = {
  635. .after = 0,
  636. .before = 0,
  637. .start_streaming = true,
  638. },
  639. };
  640. if(r.child_db.last_entry_t > r.child_db.wall_clock_time) {
  641. replicate_log_request(&r, "child's db last entry > child's wall clock time");
  642. r.child_db.last_entry_t = r.child_db.wall_clock_time;
  643. r.child_db.fixed_last_entry = true;
  644. }
  645. rrdset_get_retention_of_tier_for_collected_chart(r.st, &r.local_db.first_entry_t, &r.local_db.last_entry_t, r.local_db.wall_clock_time, 0);
  646. // let's find the GAP we have
  647. if(!r.last_request.after || !r.last_request.before) {
  648. // there is no previous request
  649. if(r.local_db.last_entry_t)
  650. // we have some data, let's continue from the last point we have
  651. r.gap.from = r.local_db.last_entry_t;
  652. else
  653. // we don't have any data, the gap is the max timeframe we are allowed to replicate
  654. r.gap.from = r.local_db.wall_clock_time - r.host->rrdpush_seconds_to_replicate;
  655. }
  656. else {
  657. // we had sent a request - let's continue at the point we left it
  658. // for this we don't take into account the actual data in our db
  659. // because the child may also have gaps, and we need to get over it
  660. r.gap.from = r.last_request.before;
  661. }
  662. // we want all the data up to now
  663. r.gap.to = r.local_db.wall_clock_time;
  664. // The gap is now r.gap.from -> r.gap.to
  665. if (unlikely(!rrdhost_option_check(host, RRDHOST_OPTION_REPLICATION)))
  666. return send_replay_chart_cmd(&r, "empty replication request, replication is disabled", false);
  667. if (unlikely(!rrdset_number_of_dimensions(st)))
  668. return send_replay_chart_cmd(&r, "empty replication request, chart has no dimensions", false);
  669. if (unlikely(!r.child_db.first_entry_t || !r.child_db.last_entry_t))
  670. return send_replay_chart_cmd(&r, "empty replication request, child has no stored data", false);
  671. if (unlikely(r.child_db.first_entry_t < 0 || r.child_db.last_entry_t < 0))
  672. return send_replay_chart_cmd(&r, "empty replication request, child db timestamps are invalid", true);
  673. if (unlikely(r.child_db.first_entry_t > r.child_db.wall_clock_time))
  674. return send_replay_chart_cmd(&r, "empty replication request, child db first entry is after its wall clock time", true);
  675. if (unlikely(r.child_db.first_entry_t > r.child_db.last_entry_t))
  676. return send_replay_chart_cmd(&r, "empty replication request, child timings are invalid (first entry > last entry)", true);
  677. if (unlikely(r.local_db.last_entry_t > r.child_db.last_entry_t))
  678. return send_replay_chart_cmd(&r, "empty replication request, local last entry is later than the child one", false);
  679. // let's find what the child can provide to fill that gap
  680. if(r.child_db.first_entry_t > r.gap.from)
  681. // the child does not have all the data - let's get what it has
  682. r.wanted.after = r.child_db.first_entry_t;
  683. else
  684. // ok, the child can fill the entire gap we have
  685. r.wanted.after = r.gap.from;
  686. if(r.gap.to - r.wanted.after > host->rrdpush_replication_step)
  687. // the duration is too big for one request - let's take the first step
  688. r.wanted.before = r.wanted.after + host->rrdpush_replication_step;
  689. else
  690. // wow, we can do it in one request
  691. r.wanted.before = r.gap.to;
  692. // don't ask from the child more than it has
  693. if(r.wanted.before > r.child_db.last_entry_t)
  694. r.wanted.before = r.child_db.last_entry_t;
  695. if(r.wanted.after > r.wanted.before) {
  696. r.wanted.after = 0;
  697. r.wanted.before = 0;
  698. r.wanted.start_streaming = true;
  699. return send_replay_chart_cmd(&r, "empty replication request, wanted after computed bigger than wanted before", true);
  700. }
  701. // the child should start streaming immediately if the wanted duration is small, or we reached the last entry of the child
  702. r.wanted.start_streaming = (r.local_db.wall_clock_time - r.wanted.after <= host->rrdpush_replication_step ||
  703. r.wanted.before >= r.child_db.last_entry_t ||
  704. r.wanted.before >= r.child_db.wall_clock_time ||
  705. r.wanted.before >= r.local_db.wall_clock_time);
  706. // the wanted timeframe is now r.wanted.after -> r.wanted.before
  707. // send it
  708. return send_replay_chart_cmd(&r, "OK", false);
  709. }
  710. // ----------------------------------------------------------------------------
  711. // replication thread
  712. // replication request in sender DICTIONARY
  713. // used for de-duplicating the requests
  714. struct replication_request {
  715. struct sender_state *sender; // the sender we should put the reply at
  716. STRING *chart_id; // the chart of the request
  717. time_t after; // the start time of the query (maybe zero) key for sorting (JudyL)
  718. time_t before; // the end time of the query (maybe zero)
  719. usec_t sender_last_flush_ut; // the timestamp of the sender, at the time we indexed this request
  720. Word_t unique_id; // auto-increment, later requests have bigger
  721. bool start_streaming; // true, when the parent wants to send the rest of the data (before is overwritten) and enable normal streaming
  722. bool indexed_in_judy; // true when the request is indexed in judy
  723. bool not_indexed_buffer_full; // true when the request is not indexed because the sender is full
  724. bool not_indexed_preprocessing; // true when the request is not indexed, but it is pending in preprocessing
  725. // prepare ahead members - preprocessing
  726. bool found; // used as a result boolean for the find call
  727. bool executed; // used to detect if we have skipped requests while preprocessing
  728. RRDSET *st; // caching of the chart during preprocessing
  729. struct replication_query *q; // the preprocessing query initialization
  730. };
  731. // replication sort entry in JudyL array
  732. // used for sorting all requests, across all nodes
  733. struct replication_sort_entry {
  734. struct replication_request *rq;
  735. size_t unique_id; // used as a key to identify the sort entry - we never access its contents
  736. };
  737. #define MAX_REPLICATION_THREADS 20 // + 1 for the main thread
  738. // the global variables for the replication thread
  739. static struct replication_thread {
  740. SPINLOCK spinlock;
  741. struct {
  742. size_t pending; // number of requests pending in the queue
  743. Word_t unique_id; // the last unique id we gave to a request (auto-increment, starting from 1)
  744. // statistics
  745. size_t added; // number of requests added to the queue
  746. size_t removed; // number of requests removed from the queue
  747. size_t pending_no_room; // number of requests skipped, because the sender has no room for responses
  748. size_t senders_full; // number of times a sender reset our last position in the queue
  749. size_t sender_resets; // number of times a sender reset our last position in the queue
  750. time_t first_time_t; // the minimum 'after' we encountered
  751. struct {
  752. Word_t after;
  753. Word_t unique_id;
  754. Pvoid_t JudyL_array;
  755. } queue;
  756. } unsafe; // protected from replication_recursive_lock()
  757. struct {
  758. size_t executed; // the number of replication requests executed
  759. size_t latest_first_time; // the 'after' timestamp of the last request we executed
  760. size_t memory; // the total memory allocated by replication
  761. } atomic; // access should be with atomic operations
  762. struct {
  763. size_t last_executed; // caching of the atomic.executed to report number of requests executed since last time
  764. netdata_thread_t **threads_ptrs;
  765. size_t threads;
  766. } main_thread; // access is allowed only by the main thread
  767. } replication_globals = {
  768. .spinlock = NETDATA_SPINLOCK_INITIALIZER,
  769. .unsafe = {
  770. .pending = 0,
  771. .unique_id = 0,
  772. .added = 0,
  773. .removed = 0,
  774. .pending_no_room = 0,
  775. .sender_resets = 0,
  776. .senders_full = 0,
  777. .first_time_t = 0,
  778. .queue = {
  779. .after = 0,
  780. .unique_id = 0,
  781. .JudyL_array = NULL,
  782. },
  783. },
  784. .atomic = {
  785. .executed = 0,
  786. .latest_first_time = 0,
  787. .memory = 0,
  788. },
  789. .main_thread = {
  790. .last_executed = 0,
  791. .threads = 0,
  792. .threads_ptrs = NULL,
  793. },
  794. };
  795. size_t replication_allocated_memory(void) {
  796. return __atomic_load_n(&replication_globals.atomic.memory, __ATOMIC_RELAXED);
  797. }
  798. #define replication_set_latest_first_time(t) __atomic_store_n(&replication_globals.atomic.latest_first_time, t, __ATOMIC_RELAXED)
  799. #define replication_get_latest_first_time() __atomic_load_n(&replication_globals.atomic.latest_first_time, __ATOMIC_RELAXED)
  800. static inline bool replication_recursive_lock_mode(char mode) {
  801. static __thread int recursions = 0;
  802. if(mode == 'L') { // (L)ock
  803. if(++recursions == 1)
  804. netdata_spinlock_lock(&replication_globals.spinlock);
  805. }
  806. else if(mode == 'U') { // (U)nlock
  807. if(--recursions == 0)
  808. netdata_spinlock_unlock(&replication_globals.spinlock);
  809. }
  810. else if(mode == 'C') { // (C)heck
  811. if(recursions > 0)
  812. return true;
  813. else
  814. return false;
  815. }
  816. else
  817. fatal("REPLICATION: unknown lock mode '%c'", mode);
  818. #ifdef NETDATA_INTERNAL_CHECKS
  819. if(recursions < 0)
  820. fatal("REPLICATION: recursions is %d", recursions);
  821. #endif
  822. return true;
  823. }
  824. #define replication_recursive_lock() replication_recursive_lock_mode('L')
  825. #define replication_recursive_unlock() replication_recursive_lock_mode('U')
  826. #define fatal_when_replication_is_not_locked_for_me() do { \
  827. if(!replication_recursive_lock_mode('C')) \
  828. fatal("REPLICATION: reached %s, but replication is not locked by this thread.", __FUNCTION__); \
  829. } while(0)
  830. void replication_set_next_point_in_time(time_t after, size_t unique_id) {
  831. replication_recursive_lock();
  832. replication_globals.unsafe.queue.after = after;
  833. replication_globals.unsafe.queue.unique_id = unique_id;
  834. replication_recursive_unlock();
  835. }
  836. // ----------------------------------------------------------------------------
  837. // replication sort entry management
  838. static struct replication_sort_entry *replication_sort_entry_create_unsafe(struct replication_request *rq) {
  839. fatal_when_replication_is_not_locked_for_me();
  840. struct replication_sort_entry *rse = mallocz(sizeof(struct replication_sort_entry));
  841. __atomic_add_fetch(&replication_globals.atomic.memory, sizeof(struct replication_sort_entry), __ATOMIC_RELAXED);
  842. rrdpush_sender_pending_replication_requests_plus_one(rq->sender);
  843. // copy the request
  844. rse->rq = rq;
  845. rse->unique_id = ++replication_globals.unsafe.unique_id;
  846. // save the unique id into the request, to be able to delete it later
  847. rq->unique_id = rse->unique_id;
  848. rq->indexed_in_judy = false;
  849. rq->not_indexed_buffer_full = false;
  850. rq->not_indexed_preprocessing = false;
  851. return rse;
  852. }
  853. static void replication_sort_entry_destroy(struct replication_sort_entry *rse) {
  854. freez(rse);
  855. __atomic_sub_fetch(&replication_globals.atomic.memory, sizeof(struct replication_sort_entry), __ATOMIC_RELAXED);
  856. }
  857. static void replication_sort_entry_add(struct replication_request *rq) {
  858. replication_recursive_lock();
  859. if(rrdpush_sender_replication_buffer_full_get(rq->sender)) {
  860. rq->indexed_in_judy = false;
  861. rq->not_indexed_buffer_full = true;
  862. rq->not_indexed_preprocessing = false;
  863. replication_globals.unsafe.pending_no_room++;
  864. replication_recursive_unlock();
  865. return;
  866. }
  867. if(rq->not_indexed_buffer_full)
  868. replication_globals.unsafe.pending_no_room--;
  869. struct replication_sort_entry *rse = replication_sort_entry_create_unsafe(rq);
  870. // if(rq->after < (time_t)replication_globals.protected.queue.after &&
  871. // rq->sender->buffer_used_percentage <= MAX_SENDER_BUFFER_PERCENTAGE_ALLOWED &&
  872. // !replication_globals.protected.skipped_no_room_since_last_reset) {
  873. //
  874. // // make it find this request first
  875. // replication_set_next_point_in_time(rq->after, rq->unique_id);
  876. // }
  877. replication_globals.unsafe.added++;
  878. replication_globals.unsafe.pending++;
  879. Pvoid_t *inner_judy_ptr;
  880. // find the outer judy entry, using after as key
  881. size_t mem_before_outer_judyl = JudyLMemUsed(replication_globals.unsafe.queue.JudyL_array);
  882. inner_judy_ptr = JudyLIns(&replication_globals.unsafe.queue.JudyL_array, (Word_t) rq->after, PJE0);
  883. size_t mem_after_outer_judyl = JudyLMemUsed(replication_globals.unsafe.queue.JudyL_array);
  884. if(unlikely(!inner_judy_ptr || inner_judy_ptr == PJERR))
  885. fatal("REPLICATION: corrupted outer judyL");
  886. // add it to the inner judy, using unique_id as key
  887. size_t mem_before_inner_judyl = JudyLMemUsed(*inner_judy_ptr);
  888. Pvoid_t *item = JudyLIns(inner_judy_ptr, rq->unique_id, PJE0);
  889. size_t mem_after_inner_judyl = JudyLMemUsed(*inner_judy_ptr);
  890. if(unlikely(!item || item == PJERR))
  891. fatal("REPLICATION: corrupted inner judyL");
  892. *item = rse;
  893. rq->indexed_in_judy = true;
  894. rq->not_indexed_buffer_full = false;
  895. rq->not_indexed_preprocessing = false;
  896. if(!replication_globals.unsafe.first_time_t || rq->after < replication_globals.unsafe.first_time_t)
  897. replication_globals.unsafe.first_time_t = rq->after;
  898. replication_recursive_unlock();
  899. __atomic_add_fetch(&replication_globals.atomic.memory, (mem_after_inner_judyl - mem_before_inner_judyl) + (mem_after_outer_judyl - mem_before_outer_judyl), __ATOMIC_RELAXED);
  900. }
  901. static bool replication_sort_entry_unlink_and_free_unsafe(struct replication_sort_entry *rse, Pvoid_t **inner_judy_ppptr, bool preprocessing) {
  902. fatal_when_replication_is_not_locked_for_me();
  903. bool inner_judy_deleted = false;
  904. replication_globals.unsafe.removed++;
  905. replication_globals.unsafe.pending--;
  906. rrdpush_sender_pending_replication_requests_minus_one(rse->rq->sender);
  907. rse->rq->indexed_in_judy = false;
  908. rse->rq->not_indexed_preprocessing = preprocessing;
  909. size_t memory_saved = 0;
  910. // delete it from the inner judy
  911. size_t mem_before_inner_judyl = JudyLMemUsed(**inner_judy_ppptr);
  912. JudyLDel(*inner_judy_ppptr, rse->rq->unique_id, PJE0);
  913. size_t mem_after_inner_judyl = JudyLMemUsed(**inner_judy_ppptr);
  914. memory_saved = mem_before_inner_judyl - mem_after_inner_judyl;
  915. // if no items left, delete it from the outer judy
  916. if(**inner_judy_ppptr == NULL) {
  917. size_t mem_before_outer_judyl = JudyLMemUsed(replication_globals.unsafe.queue.JudyL_array);
  918. JudyLDel(&replication_globals.unsafe.queue.JudyL_array, rse->rq->after, PJE0);
  919. size_t mem_after_outer_judyl = JudyLMemUsed(replication_globals.unsafe.queue.JudyL_array);
  920. memory_saved += mem_before_outer_judyl - mem_after_outer_judyl;
  921. inner_judy_deleted = true;
  922. }
  923. // free memory
  924. replication_sort_entry_destroy(rse);
  925. __atomic_sub_fetch(&replication_globals.atomic.memory, memory_saved, __ATOMIC_RELAXED);
  926. return inner_judy_deleted;
  927. }
  928. static void replication_sort_entry_del(struct replication_request *rq, bool buffer_full) {
  929. Pvoid_t *inner_judy_pptr;
  930. struct replication_sort_entry *rse_to_delete = NULL;
  931. replication_recursive_lock();
  932. if(rq->indexed_in_judy) {
  933. inner_judy_pptr = JudyLGet(replication_globals.unsafe.queue.JudyL_array, rq->after, PJE0);
  934. if (inner_judy_pptr) {
  935. Pvoid_t *our_item_pptr = JudyLGet(*inner_judy_pptr, rq->unique_id, PJE0);
  936. if (our_item_pptr) {
  937. rse_to_delete = *our_item_pptr;
  938. replication_sort_entry_unlink_and_free_unsafe(rse_to_delete, &inner_judy_pptr, false);
  939. if(buffer_full) {
  940. replication_globals.unsafe.pending_no_room++;
  941. rq->not_indexed_buffer_full = true;
  942. }
  943. }
  944. }
  945. if (!rse_to_delete)
  946. fatal("REPLAY: 'host:%s/chart:%s' Cannot find sort entry to delete for time %ld.",
  947. rrdhost_hostname(rq->sender->host), string2str(rq->chart_id), rq->after);
  948. }
  949. replication_recursive_unlock();
  950. }
  951. static struct replication_request replication_request_get_first_available() {
  952. Pvoid_t *inner_judy_pptr;
  953. replication_recursive_lock();
  954. struct replication_request rq_to_return = (struct replication_request){ .found = false };
  955. if(unlikely(!replication_globals.unsafe.queue.after || !replication_globals.unsafe.queue.unique_id)) {
  956. replication_globals.unsafe.queue.after = 0;
  957. replication_globals.unsafe.queue.unique_id = 0;
  958. }
  959. Word_t started_after = replication_globals.unsafe.queue.after;
  960. size_t round = 0;
  961. while(!rq_to_return.found) {
  962. round++;
  963. if(round > 2)
  964. break;
  965. if(round == 2) {
  966. if(started_after == 0)
  967. break;
  968. replication_globals.unsafe.queue.after = 0;
  969. replication_globals.unsafe.queue.unique_id = 0;
  970. }
  971. bool find_same_after = true;
  972. while (!rq_to_return.found && (inner_judy_pptr = JudyLFirstThenNext(replication_globals.unsafe.queue.JudyL_array, &replication_globals.unsafe.queue.after, &find_same_after))) {
  973. Pvoid_t *our_item_pptr;
  974. if(unlikely(round == 2 && replication_globals.unsafe.queue.after > started_after))
  975. break;
  976. while (!rq_to_return.found && (our_item_pptr = JudyLNext(*inner_judy_pptr, &replication_globals.unsafe.queue.unique_id, PJE0))) {
  977. struct replication_sort_entry *rse = *our_item_pptr;
  978. struct replication_request *rq = rse->rq;
  979. // copy the request to return it
  980. rq_to_return = *rq;
  981. rq_to_return.chart_id = string_dup(rq_to_return.chart_id);
  982. // set the return result to found
  983. rq_to_return.found = true;
  984. if (replication_sort_entry_unlink_and_free_unsafe(rse, &inner_judy_pptr, true))
  985. // we removed the item from the outer JudyL
  986. break;
  987. }
  988. // prepare for the next iteration on the outer loop
  989. replication_globals.unsafe.queue.unique_id = 0;
  990. }
  991. }
  992. replication_recursive_unlock();
  993. return rq_to_return;
  994. }
  995. // ----------------------------------------------------------------------------
  996. // replication request management
  997. static void replication_request_react_callback(const DICTIONARY_ITEM *item __maybe_unused, void *value __maybe_unused, void *sender_state __maybe_unused) {
  998. struct sender_state *s = sender_state; (void)s;
  999. struct replication_request *rq = value;
  1000. // IMPORTANT:
  1001. // We use the react instead of the insert callback
  1002. // because we want the item to be atomically visible
  1003. // to our replication thread, immediately after.
  1004. // If we put this at the insert callback, the item is not guaranteed
  1005. // to be atomically visible to others, so the replication thread
  1006. // may see the replication sort entry, but fail to find the dictionary item
  1007. // related to it.
  1008. replication_sort_entry_add(rq);
  1009. // this request is about a unique chart for this sender
  1010. rrdpush_sender_replicating_charts_plus_one(s);
  1011. }
  1012. static bool replication_request_conflict_callback(const DICTIONARY_ITEM *item __maybe_unused, void *old_value, void *new_value, void *sender_state) {
  1013. struct sender_state *s = sender_state; (void)s;
  1014. struct replication_request *rq = old_value; (void)rq;
  1015. struct replication_request *rq_new = new_value;
  1016. replication_recursive_lock();
  1017. if(!rq->indexed_in_judy && rq->not_indexed_buffer_full && !rq->not_indexed_preprocessing) {
  1018. // we can replace this command
  1019. internal_error(
  1020. true,
  1021. "STREAM %s [send to %s]: REPLAY: 'host:%s/chart:%s' replacing duplicate replication command received (existing from %llu to %llu [%s], new from %llu to %llu [%s])",
  1022. rrdhost_hostname(s->host), s->connected_to, rrdhost_hostname(s->host), dictionary_acquired_item_name(item),
  1023. (unsigned long long)rq->after, (unsigned long long)rq->before, rq->start_streaming ? "true" : "false",
  1024. (unsigned long long)rq_new->after, (unsigned long long)rq_new->before, rq_new->start_streaming ? "true" : "false");
  1025. rq->after = rq_new->after;
  1026. rq->before = rq_new->before;
  1027. rq->start_streaming = rq_new->start_streaming;
  1028. }
  1029. else if(!rq->indexed_in_judy && !rq->not_indexed_preprocessing) {
  1030. replication_sort_entry_add(rq);
  1031. internal_error(
  1032. true,
  1033. "STREAM %s [send to %s]: REPLAY: 'host:%s/chart:%s' adding duplicate replication command received (existing from %llu to %llu [%s], new from %llu to %llu [%s])",
  1034. rrdhost_hostname(s->host), s->connected_to, rrdhost_hostname(s->host), dictionary_acquired_item_name(item),
  1035. (unsigned long long)rq->after, (unsigned long long)rq->before, rq->start_streaming ? "true" : "false",
  1036. (unsigned long long)rq_new->after, (unsigned long long)rq_new->before, rq_new->start_streaming ? "true" : "false");
  1037. }
  1038. else {
  1039. internal_error(
  1040. true,
  1041. "STREAM %s [send to %s]: REPLAY: 'host:%s/chart:%s' ignoring duplicate replication command received (existing from %llu to %llu [%s], new from %llu to %llu [%s])",
  1042. rrdhost_hostname(s->host), s->connected_to, rrdhost_hostname(s->host),
  1043. dictionary_acquired_item_name(item),
  1044. (unsigned long long) rq->after, (unsigned long long) rq->before, rq->start_streaming ? "true" : "false",
  1045. (unsigned long long) rq_new->after, (unsigned long long) rq_new->before, rq_new->start_streaming ? "true" : "false");
  1046. }
  1047. replication_recursive_unlock();
  1048. string_freez(rq_new->chart_id);
  1049. return false;
  1050. }
  1051. static void replication_request_delete_callback(const DICTIONARY_ITEM *item __maybe_unused, void *value, void *sender_state __maybe_unused) {
  1052. struct replication_request *rq = value;
  1053. // this request is about a unique chart for this sender
  1054. rrdpush_sender_replicating_charts_minus_one(rq->sender);
  1055. if(rq->indexed_in_judy)
  1056. replication_sort_entry_del(rq, false);
  1057. else if(rq->not_indexed_buffer_full) {
  1058. replication_recursive_lock();
  1059. replication_globals.unsafe.pending_no_room--;
  1060. replication_recursive_unlock();
  1061. }
  1062. string_freez(rq->chart_id);
  1063. }
  1064. static bool sender_is_still_connected_for_this_request(struct replication_request *rq) {
  1065. return rq->sender_last_flush_ut == rrdpush_sender_get_flush_time(rq->sender);
  1066. };
  1067. static bool replication_execute_request(struct replication_request *rq, bool workers) {
  1068. bool ret = false;
  1069. if(!rq->st) {
  1070. if(likely(workers))
  1071. worker_is_busy(WORKER_JOB_FIND_CHART);
  1072. rq->st = rrdset_find(rq->sender->host, string2str(rq->chart_id));
  1073. }
  1074. if(!rq->st) {
  1075. internal_error(true, "REPLAY ERROR: 'host:%s/chart:%s' not found",
  1076. rrdhost_hostname(rq->sender->host), string2str(rq->chart_id));
  1077. goto cleanup;
  1078. }
  1079. netdata_thread_disable_cancelability();
  1080. if(!rq->q) {
  1081. if(likely(workers))
  1082. worker_is_busy(WORKER_JOB_PREPARE_QUERY);
  1083. rq->q = replication_response_prepare(rq->st, rq->start_streaming, rq->after, rq->before);
  1084. }
  1085. if(likely(workers))
  1086. worker_is_busy(WORKER_JOB_QUERYING);
  1087. // send the replication data
  1088. rq->q->rq = rq;
  1089. replication_response_execute_and_finalize(
  1090. rq->q, (size_t)((unsigned long long)rq->sender->host->sender->buffer->max_size * MAX_REPLICATION_MESSAGE_PERCENT_SENDER_BUFFER / 100ULL));
  1091. rq->q = NULL;
  1092. netdata_thread_enable_cancelability();
  1093. __atomic_add_fetch(&replication_globals.atomic.executed, 1, __ATOMIC_RELAXED);
  1094. ret = true;
  1095. cleanup:
  1096. if(rq->q) {
  1097. replication_response_cancel_and_finalize(rq->q);
  1098. rq->q = NULL;
  1099. }
  1100. string_freez(rq->chart_id);
  1101. worker_is_idle();
  1102. return ret;
  1103. }
  1104. // ----------------------------------------------------------------------------
  1105. // public API
  1106. void replication_add_request(struct sender_state *sender, const char *chart_id, time_t after, time_t before, bool start_streaming) {
  1107. struct replication_request rq = {
  1108. .sender = sender,
  1109. .chart_id = string_strdupz(chart_id),
  1110. .after = after,
  1111. .before = before,
  1112. .start_streaming = start_streaming,
  1113. .sender_last_flush_ut = rrdpush_sender_get_flush_time(sender),
  1114. .indexed_in_judy = false,
  1115. .not_indexed_buffer_full = false,
  1116. .not_indexed_preprocessing = false,
  1117. };
  1118. if(start_streaming && rrdpush_sender_get_buffer_used_percent(sender) <= STREAMING_START_MAX_SENDER_BUFFER_PERCENTAGE_ALLOWED)
  1119. replication_execute_request(&rq, false);
  1120. else
  1121. dictionary_set(sender->replication.requests, chart_id, &rq, sizeof(struct replication_request));
  1122. }
  1123. void replication_sender_delete_pending_requests(struct sender_state *sender) {
  1124. // allow the dictionary destructor to go faster on locks
  1125. dictionary_flush(sender->replication.requests);
  1126. }
  1127. void replication_init_sender(struct sender_state *sender) {
  1128. sender->replication.requests = dictionary_create_advanced(DICT_OPTION_DONT_OVERWRITE_VALUE | DICT_OPTION_FIXED_SIZE,
  1129. NULL, sizeof(struct replication_request));
  1130. dictionary_register_react_callback(sender->replication.requests, replication_request_react_callback, sender);
  1131. dictionary_register_conflict_callback(sender->replication.requests, replication_request_conflict_callback, sender);
  1132. dictionary_register_delete_callback(sender->replication.requests, replication_request_delete_callback, sender);
  1133. }
  1134. void replication_cleanup_sender(struct sender_state *sender) {
  1135. // allow the dictionary destructor to go faster on locks
  1136. replication_recursive_lock();
  1137. dictionary_destroy(sender->replication.requests);
  1138. replication_recursive_unlock();
  1139. }
  1140. void replication_recalculate_buffer_used_ratio_unsafe(struct sender_state *s) {
  1141. size_t available = cbuffer_available_size_unsafe(s->host->sender->buffer);
  1142. size_t percentage = (s->buffer->max_size - available) * 100 / s->buffer->max_size;
  1143. if(unlikely(percentage > MAX_SENDER_BUFFER_PERCENTAGE_ALLOWED && !rrdpush_sender_replication_buffer_full_get(s))) {
  1144. rrdpush_sender_replication_buffer_full_set(s, true);
  1145. struct replication_request *rq;
  1146. dfe_start_read(s->replication.requests, rq) {
  1147. if(rq->indexed_in_judy)
  1148. replication_sort_entry_del(rq, true);
  1149. }
  1150. dfe_done(rq);
  1151. replication_recursive_lock();
  1152. replication_globals.unsafe.senders_full++;
  1153. replication_recursive_unlock();
  1154. }
  1155. else if(unlikely(percentage < MIN_SENDER_BUFFER_PERCENTAGE_ALLOWED && rrdpush_sender_replication_buffer_full_get(s))) {
  1156. rrdpush_sender_replication_buffer_full_set(s, false);
  1157. struct replication_request *rq;
  1158. dfe_start_read(s->replication.requests, rq) {
  1159. if(!rq->indexed_in_judy && (rq->not_indexed_buffer_full || rq->not_indexed_preprocessing))
  1160. replication_sort_entry_add(rq);
  1161. }
  1162. dfe_done(rq);
  1163. replication_recursive_lock();
  1164. replication_globals.unsafe.senders_full--;
  1165. replication_globals.unsafe.sender_resets++;
  1166. // replication_set_next_point_in_time(0, 0);
  1167. replication_recursive_unlock();
  1168. }
  1169. rrdpush_sender_set_buffer_used_percent(s, percentage);
  1170. }
  1171. // ----------------------------------------------------------------------------
  1172. // replication thread
  1173. static size_t verify_host_charts_are_streaming_now(RRDHOST *host) {
  1174. internal_error(
  1175. host->sender &&
  1176. !rrdpush_sender_pending_replication_requests(host->sender) &&
  1177. dictionary_entries(host->sender->replication.requests) != 0,
  1178. "REPLICATION SUMMARY: 'host:%s' reports %zu pending replication requests, but its chart replication index says there are %zu charts pending replication",
  1179. rrdhost_hostname(host),
  1180. rrdpush_sender_pending_replication_requests(host->sender),
  1181. dictionary_entries(host->sender->replication.requests)
  1182. );
  1183. size_t ok = 0;
  1184. size_t errors = 0;
  1185. RRDSET *st;
  1186. rrdset_foreach_read(st, host) {
  1187. RRDSET_FLAGS flags = rrdset_flag_check(st, RRDSET_FLAG_SENDER_REPLICATION_IN_PROGRESS | RRDSET_FLAG_SENDER_REPLICATION_FINISHED);
  1188. bool is_error = false;
  1189. if(!flags) {
  1190. internal_error(
  1191. true,
  1192. "REPLICATION SUMMARY: 'host:%s/chart:%s' is neither IN PROGRESS nor FINISHED",
  1193. rrdhost_hostname(host), rrdset_id(st)
  1194. );
  1195. is_error = true;
  1196. }
  1197. if(!(flags & RRDSET_FLAG_SENDER_REPLICATION_FINISHED) || (flags & RRDSET_FLAG_SENDER_REPLICATION_IN_PROGRESS)) {
  1198. internal_error(
  1199. true,
  1200. "REPLICATION SUMMARY: 'host:%s/chart:%s' is IN PROGRESS although replication is finished",
  1201. rrdhost_hostname(host), rrdset_id(st)
  1202. );
  1203. is_error = true;
  1204. }
  1205. if(is_error)
  1206. errors++;
  1207. else
  1208. ok++;
  1209. }
  1210. rrdset_foreach_done(st);
  1211. internal_error(errors,
  1212. "REPLICATION SUMMARY: 'host:%s' finished replicating %zu charts, but %zu charts are still in progress although replication finished",
  1213. rrdhost_hostname(host), ok, errors);
  1214. return errors;
  1215. }
  1216. static void verify_all_hosts_charts_are_streaming_now(void) {
  1217. worker_is_busy(WORKER_JOB_CHECK_CONSISTENCY);
  1218. size_t errors = 0;
  1219. RRDHOST *host;
  1220. dfe_start_read(rrdhost_root_index, host)
  1221. errors += verify_host_charts_are_streaming_now(host);
  1222. dfe_done(host);
  1223. size_t executed = __atomic_load_n(&replication_globals.atomic.executed, __ATOMIC_RELAXED);
  1224. info("REPLICATION SUMMARY: finished, executed %zu replication requests, %zu charts pending replication",
  1225. executed - replication_globals.main_thread.last_executed, errors);
  1226. replication_globals.main_thread.last_executed = executed;
  1227. }
  1228. static void replication_initialize_workers(bool master) {
  1229. worker_register("REPLICATION");
  1230. worker_register_job_name(WORKER_JOB_FIND_NEXT, "find next");
  1231. worker_register_job_name(WORKER_JOB_QUERYING, "querying");
  1232. worker_register_job_name(WORKER_JOB_DELETE_ENTRY, "dict delete");
  1233. worker_register_job_name(WORKER_JOB_FIND_CHART, "find chart");
  1234. worker_register_job_name(WORKER_JOB_PREPARE_QUERY, "prepare query");
  1235. worker_register_job_name(WORKER_JOB_CHECK_CONSISTENCY, "check consistency");
  1236. worker_register_job_name(WORKER_JOB_BUFFER_COMMIT, "commit");
  1237. worker_register_job_name(WORKER_JOB_CLEANUP, "cleanup");
  1238. worker_register_job_name(WORKER_JOB_WAIT, "wait");
  1239. if(master) {
  1240. worker_register_job_name(WORKER_JOB_STATISTICS, "statistics");
  1241. worker_register_job_custom_metric(WORKER_JOB_CUSTOM_METRIC_PENDING_REQUESTS, "pending requests", "requests", WORKER_METRIC_ABSOLUTE);
  1242. worker_register_job_custom_metric(WORKER_JOB_CUSTOM_METRIC_SKIPPED_NO_ROOM, "no room requests", "requests", WORKER_METRIC_ABSOLUTE);
  1243. worker_register_job_custom_metric(WORKER_JOB_CUSTOM_METRIC_COMPLETION, "completion", "%", WORKER_METRIC_ABSOLUTE);
  1244. worker_register_job_custom_metric(WORKER_JOB_CUSTOM_METRIC_ADDED, "added requests", "requests/s", WORKER_METRIC_INCREMENTAL_TOTAL);
  1245. worker_register_job_custom_metric(WORKER_JOB_CUSTOM_METRIC_DONE, "finished requests", "requests/s", WORKER_METRIC_INCREMENTAL_TOTAL);
  1246. worker_register_job_custom_metric(WORKER_JOB_CUSTOM_METRIC_SENDER_RESETS, "sender resets", "resets/s", WORKER_METRIC_INCREMENTAL_TOTAL);
  1247. worker_register_job_custom_metric(WORKER_JOB_CUSTOM_METRIC_SENDER_FULL, "senders full", "senders", WORKER_METRIC_ABSOLUTE);
  1248. }
  1249. }
  1250. #define REQUEST_OK (0)
  1251. #define REQUEST_QUEUE_EMPTY (-1)
  1252. #define REQUEST_CHART_NOT_FOUND (-2)
  1253. static int replication_execute_next_pending_request(bool cancel) {
  1254. static __thread int max_requests_ahead = 0;
  1255. static __thread struct replication_request *rqs = NULL;
  1256. static __thread int rqs_last_executed = 0, rqs_last_prepared = 0;
  1257. static __thread size_t queue_rounds = 0; (void)queue_rounds;
  1258. struct replication_request *rq;
  1259. if(unlikely(cancel)) {
  1260. if(rqs) {
  1261. size_t cancelled = 0;
  1262. do {
  1263. if (++rqs_last_executed >= max_requests_ahead)
  1264. rqs_last_executed = 0;
  1265. rq = &rqs[rqs_last_executed];
  1266. if (rq->q) {
  1267. internal_fatal(rq->executed, "REPLAY FATAL: query has already been executed!");
  1268. internal_fatal(!rq->found, "REPLAY FATAL: orphan q in rq");
  1269. replication_response_cancel_and_finalize(rq->q);
  1270. rq->q = NULL;
  1271. cancelled++;
  1272. }
  1273. rq->executed = true;
  1274. rq->found = false;
  1275. } while (rqs_last_executed != rqs_last_prepared);
  1276. internal_error(true, "REPLICATION: cancelled %zu inflight queries", cancelled);
  1277. }
  1278. return REQUEST_QUEUE_EMPTY;
  1279. }
  1280. if(unlikely(!rqs)) {
  1281. max_requests_ahead = get_netdata_cpus() / 2;
  1282. if(max_requests_ahead > libuv_worker_threads * 2)
  1283. max_requests_ahead = libuv_worker_threads * 2;
  1284. if(max_requests_ahead < 2)
  1285. max_requests_ahead = 2;
  1286. rqs = callocz(max_requests_ahead, sizeof(struct replication_request));
  1287. __atomic_add_fetch(&replication_buffers_allocated, max_requests_ahead * sizeof(struct replication_request), __ATOMIC_RELAXED);
  1288. }
  1289. // fill the queue
  1290. do {
  1291. if(++rqs_last_prepared >= max_requests_ahead) {
  1292. rqs_last_prepared = 0;
  1293. queue_rounds++;
  1294. }
  1295. internal_fatal(rqs[rqs_last_prepared].q,
  1296. "REPLAY FATAL: slot is used by query that has not been executed!");
  1297. worker_is_busy(WORKER_JOB_FIND_NEXT);
  1298. rqs[rqs_last_prepared] = replication_request_get_first_available();
  1299. rq = &rqs[rqs_last_prepared];
  1300. if(rq->found) {
  1301. if (!rq->st) {
  1302. worker_is_busy(WORKER_JOB_FIND_CHART);
  1303. rq->st = rrdset_find(rq->sender->host, string2str(rq->chart_id));
  1304. }
  1305. if (rq->st && !rq->q) {
  1306. worker_is_busy(WORKER_JOB_PREPARE_QUERY);
  1307. rq->q = replication_response_prepare(rq->st, rq->start_streaming, rq->after, rq->before);
  1308. }
  1309. rq->executed = false;
  1310. }
  1311. } while(rq->found && rqs_last_prepared != rqs_last_executed);
  1312. // pick the first usable
  1313. do {
  1314. if (++rqs_last_executed >= max_requests_ahead)
  1315. rqs_last_executed = 0;
  1316. rq = &rqs[rqs_last_executed];
  1317. if(rq->found) {
  1318. internal_fatal(rq->executed, "REPLAY FATAL: query has already been executed!");
  1319. if (rq->sender_last_flush_ut != rrdpush_sender_get_flush_time(rq->sender)) {
  1320. // the sender has reconnected since this request was queued,
  1321. // we can safely throw it away, since the parent will resend it
  1322. replication_response_cancel_and_finalize(rq->q);
  1323. rq->executed = true;
  1324. rq->found = false;
  1325. rq->q = NULL;
  1326. }
  1327. else if (rrdpush_sender_replication_buffer_full_get(rq->sender)) {
  1328. // the sender buffer is full, so we can ignore this request,
  1329. // it has already been marked as 'preprocessed' in the dictionary,
  1330. // and the sender will put it back in when there is
  1331. // enough room in the buffer for processing replication requests
  1332. replication_response_cancel_and_finalize(rq->q);
  1333. rq->executed = true;
  1334. rq->found = false;
  1335. rq->q = NULL;
  1336. }
  1337. else {
  1338. // we can execute this,
  1339. // delete it from the dictionary
  1340. worker_is_busy(WORKER_JOB_DELETE_ENTRY);
  1341. dictionary_del(rq->sender->replication.requests, string2str(rq->chart_id));
  1342. }
  1343. }
  1344. else
  1345. internal_fatal(rq->q, "REPLAY FATAL: slot status says slot is empty, but it has a pending query!");
  1346. } while(!rq->found && rqs_last_executed != rqs_last_prepared);
  1347. if(unlikely(!rq->found)) {
  1348. worker_is_idle();
  1349. return REQUEST_QUEUE_EMPTY;
  1350. }
  1351. replication_set_latest_first_time(rq->after);
  1352. bool chart_found = replication_execute_request(rq, true);
  1353. rq->executed = true;
  1354. rq->found = false;
  1355. rq->q = NULL;
  1356. if(unlikely(!chart_found)) {
  1357. worker_is_idle();
  1358. return REQUEST_CHART_NOT_FOUND;
  1359. }
  1360. worker_is_idle();
  1361. return REQUEST_OK;
  1362. }
  1363. static void replication_worker_cleanup(void *ptr __maybe_unused) {
  1364. replication_execute_next_pending_request(true);
  1365. worker_unregister();
  1366. }
  1367. static void *replication_worker_thread(void *ptr) {
  1368. replication_initialize_workers(false);
  1369. netdata_thread_cleanup_push(replication_worker_cleanup, ptr);
  1370. while(service_running(SERVICE_REPLICATION)) {
  1371. if(unlikely(replication_execute_next_pending_request(false) == REQUEST_QUEUE_EMPTY)) {
  1372. sender_thread_buffer_free();
  1373. worker_is_busy(WORKER_JOB_WAIT);
  1374. worker_is_idle();
  1375. sleep_usec(1 * USEC_PER_SEC);
  1376. }
  1377. }
  1378. netdata_thread_cleanup_pop(1);
  1379. return NULL;
  1380. }
  1381. static void replication_main_cleanup(void *ptr) {
  1382. struct netdata_static_thread *static_thread = (struct netdata_static_thread *)ptr;
  1383. static_thread->enabled = NETDATA_MAIN_THREAD_EXITING;
  1384. replication_execute_next_pending_request(true);
  1385. int threads = (int)replication_globals.main_thread.threads;
  1386. for(int i = 0; i < threads ;i++) {
  1387. netdata_thread_join(*replication_globals.main_thread.threads_ptrs[i], NULL);
  1388. freez(replication_globals.main_thread.threads_ptrs[i]);
  1389. __atomic_sub_fetch(&replication_buffers_allocated, sizeof(netdata_thread_t), __ATOMIC_RELAXED);
  1390. }
  1391. freez(replication_globals.main_thread.threads_ptrs);
  1392. replication_globals.main_thread.threads_ptrs = NULL;
  1393. __atomic_sub_fetch(&replication_buffers_allocated, threads * sizeof(netdata_thread_t *), __ATOMIC_RELAXED);
  1394. // custom code
  1395. worker_unregister();
  1396. static_thread->enabled = NETDATA_MAIN_THREAD_EXITED;
  1397. }
  1398. void *replication_thread_main(void *ptr __maybe_unused) {
  1399. replication_initialize_workers(true);
  1400. int threads = config_get_number(CONFIG_SECTION_DB, "replication threads", 1);
  1401. if(threads < 1 || threads > MAX_REPLICATION_THREADS) {
  1402. error("replication threads given %d is invalid, resetting to 1", threads);
  1403. threads = 1;
  1404. }
  1405. if(--threads) {
  1406. replication_globals.main_thread.threads = threads;
  1407. replication_globals.main_thread.threads_ptrs = mallocz(threads * sizeof(netdata_thread_t *));
  1408. __atomic_add_fetch(&replication_buffers_allocated, threads * sizeof(netdata_thread_t *), __ATOMIC_RELAXED);
  1409. for(int i = 0; i < threads ;i++) {
  1410. char tag[NETDATA_THREAD_TAG_MAX + 1];
  1411. snprintfz(tag, NETDATA_THREAD_TAG_MAX, "REPLAY[%d]", i + 2);
  1412. replication_globals.main_thread.threads_ptrs[i] = mallocz(sizeof(netdata_thread_t));
  1413. __atomic_add_fetch(&replication_buffers_allocated, sizeof(netdata_thread_t), __ATOMIC_RELAXED);
  1414. netdata_thread_create(replication_globals.main_thread.threads_ptrs[i], tag,
  1415. NETDATA_THREAD_OPTION_JOINABLE, replication_worker_thread, NULL);
  1416. }
  1417. }
  1418. netdata_thread_cleanup_push(replication_main_cleanup, ptr);
  1419. // start from 100% completed
  1420. worker_set_metric(WORKER_JOB_CUSTOM_METRIC_COMPLETION, 100.0);
  1421. long run_verification_countdown = LONG_MAX; // LONG_MAX to prevent an initial verification when no replication ever took place
  1422. bool slow = true; // control the time we sleep - it has to start with true!
  1423. usec_t last_now_mono_ut = now_monotonic_usec();
  1424. time_t replication_reset_next_point_in_time_countdown = SECONDS_TO_RESET_POINT_IN_TIME; // restart from the beginning every 10 seconds
  1425. size_t last_executed = 0;
  1426. size_t last_sender_resets = 0;
  1427. while(service_running(SERVICE_REPLICATION)) {
  1428. // statistics
  1429. usec_t now_mono_ut = now_monotonic_usec();
  1430. if(unlikely(now_mono_ut - last_now_mono_ut > default_rrd_update_every * USEC_PER_SEC)) {
  1431. last_now_mono_ut = now_mono_ut;
  1432. worker_is_busy(WORKER_JOB_STATISTICS);
  1433. replication_recursive_lock();
  1434. size_t current_executed = __atomic_load_n(&replication_globals.atomic.executed, __ATOMIC_RELAXED);
  1435. if(last_executed != current_executed) {
  1436. run_verification_countdown = ITERATIONS_IDLE_WITHOUT_PENDING_TO_RUN_SENDER_VERIFICATION;
  1437. last_executed = current_executed;
  1438. slow = false;
  1439. }
  1440. if(replication_reset_next_point_in_time_countdown-- == 0) {
  1441. // once per second, make it scan all the pending requests next time
  1442. replication_set_next_point_in_time(0, 0);
  1443. // replication_globals.protected.skipped_no_room_since_last_reset = 0;
  1444. replication_reset_next_point_in_time_countdown = SECONDS_TO_RESET_POINT_IN_TIME;
  1445. }
  1446. if(--run_verification_countdown == 0) {
  1447. if (!replication_globals.unsafe.pending && !replication_globals.unsafe.pending_no_room) {
  1448. // reset the statistics about completion percentage
  1449. replication_globals.unsafe.first_time_t = 0;
  1450. replication_set_latest_first_time(0);
  1451. verify_all_hosts_charts_are_streaming_now();
  1452. run_verification_countdown = LONG_MAX;
  1453. slow = true;
  1454. }
  1455. else
  1456. run_verification_countdown = ITERATIONS_IDLE_WITHOUT_PENDING_TO_RUN_SENDER_VERIFICATION;
  1457. }
  1458. time_t latest_first_time_t = replication_get_latest_first_time();
  1459. if(latest_first_time_t && replication_globals.unsafe.pending) {
  1460. // completion percentage statistics
  1461. time_t now = now_realtime_sec();
  1462. time_t total = now - replication_globals.unsafe.first_time_t;
  1463. time_t done = latest_first_time_t - replication_globals.unsafe.first_time_t;
  1464. worker_set_metric(WORKER_JOB_CUSTOM_METRIC_COMPLETION,
  1465. (NETDATA_DOUBLE) done * 100.0 / (NETDATA_DOUBLE) total);
  1466. }
  1467. else
  1468. worker_set_metric(WORKER_JOB_CUSTOM_METRIC_COMPLETION, 100.0);
  1469. worker_set_metric(WORKER_JOB_CUSTOM_METRIC_PENDING_REQUESTS, (NETDATA_DOUBLE)replication_globals.unsafe.pending);
  1470. worker_set_metric(WORKER_JOB_CUSTOM_METRIC_ADDED, (NETDATA_DOUBLE)replication_globals.unsafe.added);
  1471. worker_set_metric(WORKER_JOB_CUSTOM_METRIC_DONE, (NETDATA_DOUBLE)__atomic_load_n(&replication_globals.atomic.executed, __ATOMIC_RELAXED));
  1472. worker_set_metric(WORKER_JOB_CUSTOM_METRIC_SKIPPED_NO_ROOM, (NETDATA_DOUBLE)replication_globals.unsafe.pending_no_room);
  1473. worker_set_metric(WORKER_JOB_CUSTOM_METRIC_SENDER_RESETS, (NETDATA_DOUBLE)replication_globals.unsafe.sender_resets);
  1474. worker_set_metric(WORKER_JOB_CUSTOM_METRIC_SENDER_FULL, (NETDATA_DOUBLE)replication_globals.unsafe.senders_full);
  1475. replication_recursive_unlock();
  1476. worker_is_idle();
  1477. }
  1478. if(unlikely(replication_execute_next_pending_request(false) == REQUEST_QUEUE_EMPTY)) {
  1479. worker_is_busy(WORKER_JOB_WAIT);
  1480. replication_recursive_lock();
  1481. // the timeout also defines now frequently we will traverse all the pending requests
  1482. // when the outbound buffers of all senders is full
  1483. usec_t timeout;
  1484. if(slow) {
  1485. // no work to be done, wait for a request to come in
  1486. timeout = 1000 * USEC_PER_MS;
  1487. sender_thread_buffer_free();
  1488. }
  1489. else if(replication_globals.unsafe.pending > 0) {
  1490. if(replication_globals.unsafe.sender_resets == last_sender_resets)
  1491. timeout = 1000 * USEC_PER_MS;
  1492. else {
  1493. // there are pending requests waiting to be executed,
  1494. // but none could be executed at this time.
  1495. // try again after this time.
  1496. timeout = 100 * USEC_PER_MS;
  1497. }
  1498. last_sender_resets = replication_globals.unsafe.sender_resets;
  1499. }
  1500. else {
  1501. // no requests pending, but there were requests recently (run_verification_countdown)
  1502. // so, try in a short time.
  1503. // if this is big, one chart replicating will be slow to finish (ping - pong just one chart)
  1504. timeout = 10 * USEC_PER_MS;
  1505. last_sender_resets = replication_globals.unsafe.sender_resets;
  1506. }
  1507. replication_recursive_unlock();
  1508. worker_is_idle();
  1509. sleep_usec(timeout);
  1510. // make it scan all the pending requests next time
  1511. replication_set_next_point_in_time(0, 0);
  1512. replication_reset_next_point_in_time_countdown = SECONDS_TO_RESET_POINT_IN_TIME;
  1513. continue;
  1514. }
  1515. }
  1516. netdata_thread_cleanup_pop(1);
  1517. return NULL;
  1518. }