pluginsd_parser.c 93 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737738739740741742743744745746747748749750751752753754755756757758759760761762763764765766767768769770771772773774775776777778779780781782783784785786787788789790791792793794795796797798799800801802803804805806807808809810811812813814815816817818819820821822823824825826827828829830831832833834835836837838839840841842843844845846847848849850851852853854855856857858859860861862863864865866867868869870871872873874875876877878879880881882883884885886887888889890891892893894895896897898899900901902903904905906907908909910911912913914915916917918919920921922923924925926927928929930931932933934935936937938939940941942943944945946947948949950951952953954955956957958959960961962963964965966967968969970971972973974975976977978979980981982983984985986987988989990991992993994995996997998999100010011002100310041005100610071008100910101011101210131014101510161017101810191020102110221023102410251026102710281029103010311032103310341035103610371038103910401041104210431044104510461047104810491050105110521053105410551056105710581059106010611062106310641065106610671068106910701071107210731074107510761077107810791080108110821083108410851086108710881089109010911092109310941095109610971098109911001101110211031104110511061107110811091110111111121113111411151116111711181119112011211122112311241125112611271128112911301131113211331134113511361137113811391140114111421143114411451146114711481149115011511152115311541155115611571158115911601161116211631164116511661167116811691170117111721173117411751176117711781179118011811182118311841185118611871188118911901191119211931194119511961197119811991200120112021203120412051206120712081209121012111212121312141215121612171218121912201221122212231224122512261227122812291230123112321233123412351236123712381239124012411242124312441245124612471248124912501251125212531254125512561257125812591260126112621263126412651266126712681269127012711272127312741275127612771278127912801281128212831284128512861287128812891290129112921293129412951296129712981299130013011302130313041305130613071308130913101311131213131314131513161317131813191320132113221323132413251326132713281329133013311332133313341335133613371338133913401341134213431344134513461347134813491350135113521353135413551356135713581359136013611362136313641365136613671368136913701371137213731374137513761377137813791380138113821383138413851386138713881389139013911392139313941395139613971398139914001401140214031404140514061407140814091410141114121413141414151416141714181419142014211422142314241425142614271428142914301431143214331434143514361437143814391440144114421443144414451446144714481449145014511452145314541455145614571458145914601461146214631464146514661467146814691470147114721473147414751476147714781479148014811482148314841485148614871488148914901491149214931494149514961497149814991500150115021503150415051506150715081509151015111512151315141515151615171518151915201521152215231524152515261527152815291530153115321533153415351536153715381539154015411542154315441545154615471548154915501551155215531554155515561557155815591560156115621563156415651566156715681569157015711572157315741575157615771578157915801581158215831584158515861587158815891590159115921593159415951596159715981599160016011602160316041605160616071608160916101611161216131614161516161617161816191620162116221623162416251626162716281629163016311632163316341635163616371638163916401641164216431644164516461647164816491650165116521653165416551656165716581659166016611662166316641665166616671668166916701671167216731674167516761677167816791680168116821683168416851686168716881689169016911692169316941695169616971698169917001701170217031704170517061707170817091710171117121713171417151716171717181719172017211722172317241725172617271728172917301731173217331734173517361737173817391740174117421743174417451746174717481749175017511752175317541755175617571758175917601761176217631764176517661767176817691770177117721773177417751776177717781779178017811782178317841785178617871788178917901791179217931794179517961797179817991800180118021803180418051806180718081809181018111812181318141815181618171818181918201821182218231824182518261827182818291830183118321833183418351836183718381839184018411842184318441845184618471848184918501851185218531854185518561857185818591860186118621863186418651866186718681869187018711872187318741875187618771878187918801881188218831884188518861887188818891890189118921893189418951896189718981899190019011902190319041905190619071908190919101911191219131914191519161917191819191920192119221923192419251926192719281929193019311932193319341935193619371938193919401941194219431944194519461947194819491950195119521953195419551956195719581959196019611962196319641965196619671968196919701971197219731974197519761977197819791980198119821983198419851986198719881989199019911992199319941995199619971998199920002001200220032004200520062007200820092010201120122013201420152016201720182019202020212022202320242025202620272028202920302031203220332034203520362037203820392040204120422043204420452046204720482049205020512052205320542055205620572058205920602061206220632064206520662067206820692070207120722073207420752076207720782079208020812082208320842085208620872088208920902091209220932094209520962097209820992100210121022103210421052106210721082109211021112112211321142115211621172118211921202121212221232124212521262127212821292130213121322133213421352136213721382139214021412142214321442145214621472148214921502151215221532154215521562157215821592160216121622163216421652166216721682169217021712172217321742175217621772178217921802181218221832184218521862187218821892190219121922193219421952196219721982199220022012202220322042205220622072208220922102211221222132214221522162217221822192220222122222223222422252226222722282229223022312232223322342235223622372238223922402241224222432244224522462247224822492250225122522253225422552256225722582259226022612262226322642265226622672268226922702271227222732274227522762277227822792280228122822283228422852286228722882289229022912292229322942295229622972298229923002301230223032304230523062307230823092310231123122313231423152316231723182319232023212322232323242325232623272328232923302331233223332334233523362337233823392340234123422343234423452346234723482349235023512352235323542355235623572358235923602361236223632364236523662367236823692370237123722373237423752376237723782379238023812382238323842385238623872388238923902391239223932394239523962397239823992400240124022403240424052406240724082409241024112412241324142415241624172418241924202421242224232424242524262427242824292430243124322433243424352436243724382439244024412442244324442445244624472448244924502451245224532454245524562457245824592460246124622463246424652466246724682469247024712472247324742475247624772478247924802481248224832484248524862487248824892490249124922493249424952496249724982499250025012502250325042505250625072508250925102511251225132514251525162517251825192520252125222523252425252526
  1. // SPDX-License-Identifier: GPL-3.0-or-later
  2. #include "pluginsd_parser.h"
  3. #define LOG_FUNCTIONS false
  4. static ssize_t send_to_plugin(const char *txt, void *data) {
  5. PARSER *parser = data;
  6. if(!txt || !*txt)
  7. return 0;
  8. errno = 0;
  9. spinlock_lock(&parser->writer.spinlock);
  10. ssize_t bytes = -1;
  11. #ifdef ENABLE_HTTPS
  12. NETDATA_SSL *ssl = parser->ssl_output;
  13. if(ssl) {
  14. if(SSL_connection(ssl))
  15. bytes = netdata_ssl_write(ssl, (void *) txt, strlen(txt));
  16. else
  17. netdata_log_error("PLUGINSD: cannot send command (SSL)");
  18. spinlock_unlock(&parser->writer.spinlock);
  19. return bytes;
  20. }
  21. #endif
  22. if(parser->fp_output) {
  23. bytes = fprintf(parser->fp_output, "%s", txt);
  24. if(bytes <= 0) {
  25. netdata_log_error("PLUGINSD: cannot send command (FILE)");
  26. bytes = -2;
  27. }
  28. else
  29. fflush(parser->fp_output);
  30. spinlock_unlock(&parser->writer.spinlock);
  31. return bytes;
  32. }
  33. if(parser->fd != -1) {
  34. bytes = 0;
  35. ssize_t total = (ssize_t)strlen(txt);
  36. ssize_t sent;
  37. do {
  38. sent = write(parser->fd, &txt[bytes], total - bytes);
  39. if(sent <= 0) {
  40. netdata_log_error("PLUGINSD: cannot send command (fd)");
  41. spinlock_unlock(&parser->writer.spinlock);
  42. return -3;
  43. }
  44. bytes += sent;
  45. }
  46. while(bytes < total);
  47. spinlock_unlock(&parser->writer.spinlock);
  48. return (int)bytes;
  49. }
  50. spinlock_unlock(&parser->writer.spinlock);
  51. netdata_log_error("PLUGINSD: cannot send command (no output socket/pipe/file given to plugins.d parser)");
  52. return -4;
  53. }
  54. static inline RRDHOST *pluginsd_require_scope_host(PARSER *parser, const char *cmd) {
  55. RRDHOST *host = parser->user.host;
  56. if(unlikely(!host))
  57. netdata_log_error("PLUGINSD: command %s requires a host, but is not set.", cmd);
  58. return host;
  59. }
  60. static inline RRDSET *pluginsd_require_scope_chart(PARSER *parser, const char *cmd, const char *parent_cmd) {
  61. RRDSET *st = parser->user.st;
  62. if(unlikely(!st))
  63. netdata_log_error("PLUGINSD: command %s requires a chart defined via command %s, but is not set.", cmd, parent_cmd);
  64. return st;
  65. }
  66. static inline RRDSET *pluginsd_get_scope_chart(PARSER *parser) {
  67. return parser->user.st;
  68. }
  69. static inline void pluginsd_lock_rrdset_data_collection(PARSER *parser) {
  70. if(parser->user.st && !parser->user.v2.locked_data_collection) {
  71. spinlock_lock(&parser->user.st->data_collection_lock);
  72. parser->user.v2.locked_data_collection = true;
  73. }
  74. }
  75. static inline bool pluginsd_unlock_rrdset_data_collection(PARSER *parser) {
  76. if(parser->user.st && parser->user.v2.locked_data_collection) {
  77. spinlock_unlock(&parser->user.st->data_collection_lock);
  78. parser->user.v2.locked_data_collection = false;
  79. return true;
  80. }
  81. return false;
  82. }
  83. void pluginsd_rrdset_cleanup(RRDSET *st) {
  84. spinlock_lock(&st->pluginsd.spinlock);
  85. for(size_t i = 0; i < st->pluginsd.size ; i++) {
  86. rrddim_acquired_release(st->pluginsd.rda[i]); // can be NULL
  87. st->pluginsd.rda[i] = NULL;
  88. }
  89. freez(st->pluginsd.rda);
  90. st->pluginsd.collector_tid = 0;
  91. st->pluginsd.rda = NULL;
  92. st->pluginsd.size = 0;
  93. st->pluginsd.pos = 0;
  94. spinlock_unlock(&st->pluginsd.spinlock);
  95. }
  96. static inline void pluginsd_unlock_previous_scope_chart(PARSER *parser, const char *keyword, bool stale) {
  97. if(unlikely(pluginsd_unlock_rrdset_data_collection(parser))) {
  98. if(stale)
  99. netdata_log_error("PLUGINSD: 'host:%s/chart:%s/' stale data collection lock found during %s; it has been unlocked",
  100. rrdhost_hostname(parser->user.st->rrdhost),
  101. rrdset_id(parser->user.st),
  102. keyword);
  103. }
  104. if(unlikely(parser->user.v2.ml_locked)) {
  105. ml_chart_update_end(parser->user.st);
  106. parser->user.v2.ml_locked = false;
  107. if(stale)
  108. netdata_log_error("PLUGINSD: 'host:%s/chart:%s/' stale ML lock found during %s, it has been unlocked",
  109. rrdhost_hostname(parser->user.st->rrdhost),
  110. rrdset_id(parser->user.st),
  111. keyword);
  112. }
  113. }
  114. static inline void pluginsd_clear_scope_chart(PARSER *parser, const char *keyword) {
  115. pluginsd_unlock_previous_scope_chart(parser, keyword, true);
  116. parser->user.st = NULL;
  117. }
  118. static inline bool pluginsd_set_scope_chart(PARSER *parser, RRDSET *st, const char *keyword) {
  119. RRDSET *old_st = parser->user.st;
  120. pid_t old_collector_tid = (old_st) ? old_st->pluginsd.collector_tid : 0;
  121. pid_t my_collector_tid = gettid();
  122. if(unlikely(old_collector_tid)) {
  123. if(old_collector_tid != my_collector_tid) {
  124. error_limit_static_global_var(erl, 1, 0);
  125. error_limit(&erl, "PLUGINSD: keyword %s: 'host:%s/chart:%s' is collected twice (my tid %d, other collector tid %d)",
  126. keyword ? keyword : "UNKNOWN",
  127. rrdhost_hostname(st->rrdhost), rrdset_id(st),
  128. my_collector_tid, old_collector_tid);
  129. return false;
  130. }
  131. old_st->pluginsd.collector_tid = 0;
  132. }
  133. st->pluginsd.collector_tid = my_collector_tid;
  134. pluginsd_clear_scope_chart(parser, keyword);
  135. size_t dims = dictionary_entries(st->rrddim_root_index);
  136. if(unlikely(st->pluginsd.size < dims)) {
  137. st->pluginsd.rda = reallocz(st->pluginsd.rda, dims * sizeof(RRDDIM_ACQUIRED *));
  138. // initialize the empty slots
  139. for(ssize_t i = (ssize_t)dims - 1; i >= (ssize_t)st->pluginsd.size ;i--)
  140. st->pluginsd.rda[i] = NULL;
  141. st->pluginsd.size = dims;
  142. }
  143. st->pluginsd.pos = 0;
  144. parser->user.st = st;
  145. return true;
  146. }
  147. static inline RRDDIM *pluginsd_acquire_dimension(RRDHOST *host, RRDSET *st, const char *dimension, const char *cmd) {
  148. if (unlikely(!dimension || !*dimension)) {
  149. netdata_log_error("PLUGINSD: 'host:%s/chart:%s' got a %s, without a dimension.",
  150. rrdhost_hostname(host), rrdset_id(st), cmd);
  151. return NULL;
  152. }
  153. if(unlikely(st->pluginsd.pos >= st->pluginsd.size))
  154. st->pluginsd.pos = 0;
  155. RRDDIM_ACQUIRED *rda = st->pluginsd.rda[st->pluginsd.pos];
  156. if(likely(rda)) {
  157. RRDDIM *rd = rrddim_acquired_to_rrddim(rda);
  158. if (likely(rd && string_strcmp(rd->id, dimension) == 0)) {
  159. // we found a cached RDA
  160. st->pluginsd.pos++;
  161. return rd;
  162. }
  163. else {
  164. // the collector is sending dimensions in a different order
  165. // release the previous one, to reuse this slot
  166. rrddim_acquired_release(rda);
  167. st->pluginsd.rda[st->pluginsd.pos] = NULL;
  168. }
  169. }
  170. rda = rrddim_find_and_acquire(st, dimension);
  171. if (unlikely(!rda)) {
  172. netdata_log_error("PLUGINSD: 'host:%s/chart:%s/dim:%s' got a %s but dimension does not exist.",
  173. rrdhost_hostname(host), rrdset_id(st), dimension, cmd);
  174. return NULL;
  175. }
  176. st->pluginsd.rda[st->pluginsd.pos++] = rda;
  177. return rrddim_acquired_to_rrddim(rda);
  178. }
  179. static inline RRDSET *pluginsd_find_chart(RRDHOST *host, const char *chart, const char *cmd) {
  180. if (unlikely(!chart || !*chart)) {
  181. netdata_log_error("PLUGINSD: 'host:%s' got a %s without a chart id.",
  182. rrdhost_hostname(host), cmd);
  183. return NULL;
  184. }
  185. RRDSET *st = rrdset_find(host, chart);
  186. if (unlikely(!st))
  187. netdata_log_error("PLUGINSD: 'host:%s/chart:%s' got a %s but chart does not exist.",
  188. rrdhost_hostname(host), chart, cmd);
  189. return st;
  190. }
  191. static inline PARSER_RC PLUGINSD_DISABLE_PLUGIN(PARSER *parser, const char *keyword, const char *msg) {
  192. parser->user.enabled = 0;
  193. if(keyword && msg) {
  194. error_limit_static_global_var(erl, 1, 0);
  195. error_limit(&erl, "PLUGINSD: keyword %s: %s", keyword, msg);
  196. }
  197. return PARSER_RC_ERROR;
  198. }
  199. static inline PARSER_RC pluginsd_set(char **words, size_t num_words, PARSER *parser) {
  200. char *dimension = get_word(words, num_words, 1);
  201. char *value = get_word(words, num_words, 2);
  202. RRDHOST *host = pluginsd_require_scope_host(parser, PLUGINSD_KEYWORD_SET);
  203. if(!host) return PLUGINSD_DISABLE_PLUGIN(parser, NULL, NULL);
  204. RRDSET *st = pluginsd_require_scope_chart(parser, PLUGINSD_KEYWORD_SET, PLUGINSD_KEYWORD_CHART);
  205. if(!st) return PLUGINSD_DISABLE_PLUGIN(parser, NULL, NULL);
  206. RRDDIM *rd = pluginsd_acquire_dimension(host, st, dimension, PLUGINSD_KEYWORD_SET);
  207. if(!rd) return PLUGINSD_DISABLE_PLUGIN(parser, NULL, NULL);
  208. st->pluginsd.set = true;
  209. if (unlikely(rrdset_flag_check(st, RRDSET_FLAG_DEBUG)))
  210. netdata_log_debug(D_PLUGINSD, "PLUGINSD: 'host:%s/chart:%s/dim:%s' SET is setting value to '%s'",
  211. rrdhost_hostname(host), rrdset_id(st), dimension, value && *value ? value : "UNSET");
  212. if (value && *value)
  213. rrddim_set_by_pointer(st, rd, str2ll_encoded(value));
  214. return PARSER_RC_OK;
  215. }
  216. static inline PARSER_RC pluginsd_begin(char **words, size_t num_words, PARSER *parser) {
  217. char *id = get_word(words, num_words, 1);
  218. char *microseconds_txt = get_word(words, num_words, 2);
  219. RRDHOST *host = pluginsd_require_scope_host(parser, PLUGINSD_KEYWORD_BEGIN);
  220. if(!host) return PLUGINSD_DISABLE_PLUGIN(parser, NULL, NULL);
  221. RRDSET *st = pluginsd_find_chart(host, id, PLUGINSD_KEYWORD_BEGIN);
  222. if(!st) return PLUGINSD_DISABLE_PLUGIN(parser, NULL, NULL);
  223. if(!pluginsd_set_scope_chart(parser, st, PLUGINSD_KEYWORD_BEGIN))
  224. return PLUGINSD_DISABLE_PLUGIN(parser, NULL, NULL);
  225. usec_t microseconds = 0;
  226. if (microseconds_txt && *microseconds_txt) {
  227. long long t = str2ll(microseconds_txt, NULL);
  228. if(t >= 0)
  229. microseconds = t;
  230. }
  231. #ifdef NETDATA_LOG_REPLICATION_REQUESTS
  232. if(st->replay.log_next_data_collection) {
  233. st->replay.log_next_data_collection = false;
  234. internal_error(true,
  235. "REPLAY: 'host:%s/chart:%s' first BEGIN after replication, last collected %llu, last updated %llu, microseconds %llu",
  236. rrdhost_hostname(host), rrdset_id(st),
  237. st->last_collected_time.tv_sec * USEC_PER_SEC + st->last_collected_time.tv_usec,
  238. st->last_updated.tv_sec * USEC_PER_SEC + st->last_updated.tv_usec,
  239. microseconds
  240. );
  241. }
  242. #endif
  243. if (likely(st->counter_done)) {
  244. if (likely(microseconds)) {
  245. if (parser->user.trust_durations)
  246. rrdset_next_usec_unfiltered(st, microseconds);
  247. else
  248. rrdset_next_usec(st, microseconds);
  249. }
  250. else
  251. rrdset_next(st);
  252. }
  253. return PARSER_RC_OK;
  254. }
  255. static inline PARSER_RC pluginsd_end(char **words, size_t num_words, PARSER *parser) {
  256. UNUSED(words);
  257. UNUSED(num_words);
  258. RRDHOST *host = pluginsd_require_scope_host(parser, PLUGINSD_KEYWORD_END);
  259. if(!host) return PLUGINSD_DISABLE_PLUGIN(parser, NULL, NULL);
  260. RRDSET *st = pluginsd_require_scope_chart(parser, PLUGINSD_KEYWORD_END, PLUGINSD_KEYWORD_BEGIN);
  261. if(!st) return PLUGINSD_DISABLE_PLUGIN(parser, NULL, NULL);
  262. if (unlikely(rrdset_flag_check(st, RRDSET_FLAG_DEBUG)))
  263. netdata_log_debug(D_PLUGINSD, "requested an END on chart '%s'", rrdset_id(st));
  264. pluginsd_clear_scope_chart(parser, PLUGINSD_KEYWORD_END);
  265. parser->user.data_collections_count++;
  266. struct timeval now;
  267. now_realtime_timeval(&now);
  268. rrdset_timed_done(st, now, /* pending_rrdset_next = */ false);
  269. return PARSER_RC_OK;
  270. }
  271. static void pluginsd_host_define_cleanup(PARSER *parser) {
  272. string_freez(parser->user.host_define.hostname);
  273. dictionary_destroy(parser->user.host_define.rrdlabels);
  274. parser->user.host_define.hostname = NULL;
  275. parser->user.host_define.rrdlabels = NULL;
  276. parser->user.host_define.parsing_host = false;
  277. }
  278. static inline bool pluginsd_validate_machine_guid(const char *guid, uuid_t *uuid, char *output) {
  279. if(uuid_parse(guid, *uuid))
  280. return false;
  281. uuid_unparse_lower(*uuid, output);
  282. return true;
  283. }
  284. static inline PARSER_RC pluginsd_host_define(char **words, size_t num_words, PARSER *parser) {
  285. char *guid = get_word(words, num_words, 1);
  286. char *hostname = get_word(words, num_words, 2);
  287. if(unlikely(!guid || !*guid || !hostname || !*hostname))
  288. return PLUGINSD_DISABLE_PLUGIN(parser, PLUGINSD_KEYWORD_HOST_DEFINE, "missing parameters");
  289. if(unlikely(parser->user.host_define.parsing_host))
  290. return PLUGINSD_DISABLE_PLUGIN(parser, PLUGINSD_KEYWORD_HOST_DEFINE,
  291. "another host definition is already open - did you send " PLUGINSD_KEYWORD_HOST_DEFINE_END "?");
  292. if(!pluginsd_validate_machine_guid(guid, &parser->user.host_define.machine_guid, parser->user.host_define.machine_guid_str))
  293. return PLUGINSD_DISABLE_PLUGIN(parser, PLUGINSD_KEYWORD_HOST_DEFINE, "cannot parse MACHINE_GUID - is it a valid UUID?");
  294. parser->user.host_define.hostname = string_strdupz(hostname);
  295. parser->user.host_define.rrdlabels = rrdlabels_create();
  296. parser->user.host_define.parsing_host = true;
  297. return PARSER_RC_OK;
  298. }
  299. static inline PARSER_RC pluginsd_host_dictionary(char **words, size_t num_words, PARSER *parser, DICTIONARY *dict, const char *keyword) {
  300. char *name = get_word(words, num_words, 1);
  301. char *value = get_word(words, num_words, 2);
  302. if(!name || !*name || !value)
  303. return PLUGINSD_DISABLE_PLUGIN(parser, keyword, "missing parameters");
  304. if(!parser->user.host_define.parsing_host || !dict)
  305. return PLUGINSD_DISABLE_PLUGIN(parser, keyword, "host is not defined, send " PLUGINSD_KEYWORD_HOST_DEFINE " before this");
  306. rrdlabels_add(dict, name, value, RRDLABEL_SRC_CONFIG);
  307. return PARSER_RC_OK;
  308. }
  309. static inline PARSER_RC pluginsd_host_labels(char **words, size_t num_words, PARSER *parser) {
  310. return pluginsd_host_dictionary(words, num_words, parser,
  311. parser->user.host_define.rrdlabels,
  312. PLUGINSD_KEYWORD_HOST_LABEL);
  313. }
  314. static inline PARSER_RC pluginsd_host_define_end(char **words __maybe_unused, size_t num_words __maybe_unused, PARSER *parser) {
  315. if(!parser->user.host_define.parsing_host)
  316. return PLUGINSD_DISABLE_PLUGIN(parser, PLUGINSD_KEYWORD_HOST_DEFINE_END, "missing initialization, send " PLUGINSD_KEYWORD_HOST_DEFINE " before this");
  317. RRDHOST *host = rrdhost_find_or_create(
  318. string2str(parser->user.host_define.hostname),
  319. string2str(parser->user.host_define.hostname),
  320. parser->user.host_define.machine_guid_str,
  321. "Netdata Virtual Host 1.0",
  322. netdata_configured_timezone,
  323. netdata_configured_abbrev_timezone,
  324. netdata_configured_utc_offset,
  325. NULL,
  326. program_name,
  327. program_version,
  328. default_rrd_update_every,
  329. default_rrd_history_entries,
  330. default_rrd_memory_mode,
  331. default_health_enabled,
  332. default_rrdpush_enabled,
  333. default_rrdpush_destination,
  334. default_rrdpush_api_key,
  335. default_rrdpush_send_charts_matching,
  336. default_rrdpush_enable_replication,
  337. default_rrdpush_seconds_to_replicate,
  338. default_rrdpush_replication_step,
  339. rrdhost_labels_to_system_info(parser->user.host_define.rrdlabels),
  340. false
  341. );
  342. rrdhost_option_set(host, RRDHOST_OPTION_VIRTUAL_HOST);
  343. if(host->rrdlabels) {
  344. rrdlabels_migrate_to_these(host->rrdlabels, parser->user.host_define.rrdlabels);
  345. }
  346. else {
  347. host->rrdlabels = parser->user.host_define.rrdlabels;
  348. parser->user.host_define.rrdlabels = NULL;
  349. }
  350. pluginsd_host_define_cleanup(parser);
  351. parser->user.host = host;
  352. pluginsd_clear_scope_chart(parser, PLUGINSD_KEYWORD_HOST_DEFINE_END);
  353. rrdhost_flag_clear(host, RRDHOST_FLAG_ORPHAN);
  354. rrdcontext_host_child_connected(host);
  355. schedule_node_info_update(host);
  356. return PARSER_RC_OK;
  357. }
  358. static inline PARSER_RC pluginsd_host(char **words, size_t num_words, PARSER *parser) {
  359. char *guid = get_word(words, num_words, 1);
  360. if(!guid || !*guid || strcmp(guid, "localhost") == 0) {
  361. parser->user.host = localhost;
  362. return PARSER_RC_OK;
  363. }
  364. uuid_t uuid;
  365. char uuid_str[UUID_STR_LEN];
  366. if(!pluginsd_validate_machine_guid(guid, &uuid, uuid_str))
  367. return PLUGINSD_DISABLE_PLUGIN(parser, PLUGINSD_KEYWORD_HOST, "cannot parse MACHINE_GUID - is it a valid UUID?");
  368. RRDHOST *host = rrdhost_find_by_guid(uuid_str);
  369. if(unlikely(!host))
  370. return PLUGINSD_DISABLE_PLUGIN(parser, PLUGINSD_KEYWORD_HOST, "cannot find a host with this machine guid - have you created it?");
  371. parser->user.host = host;
  372. return PARSER_RC_OK;
  373. }
  374. static inline PARSER_RC pluginsd_chart(char **words, size_t num_words, PARSER *parser) {
  375. RRDHOST *host = pluginsd_require_scope_host(parser, PLUGINSD_KEYWORD_CHART);
  376. if(!host) return PLUGINSD_DISABLE_PLUGIN(parser, NULL, NULL);
  377. char *type = get_word(words, num_words, 1);
  378. char *name = get_word(words, num_words, 2);
  379. char *title = get_word(words, num_words, 3);
  380. char *units = get_word(words, num_words, 4);
  381. char *family = get_word(words, num_words, 5);
  382. char *context = get_word(words, num_words, 6);
  383. char *chart = get_word(words, num_words, 7);
  384. char *priority_s = get_word(words, num_words, 8);
  385. char *update_every_s = get_word(words, num_words, 9);
  386. char *options = get_word(words, num_words, 10);
  387. char *plugin = get_word(words, num_words, 11);
  388. char *module = get_word(words, num_words, 12);
  389. // parse the id from type
  390. char *id = NULL;
  391. if (likely(type && (id = strchr(type, '.')))) {
  392. *id = '\0';
  393. id++;
  394. }
  395. // make sure we have the required variables
  396. if (unlikely((!type || !*type || !id || !*id)))
  397. return PLUGINSD_DISABLE_PLUGIN(parser, PLUGINSD_KEYWORD_CHART, "missing parameters");
  398. // parse the name, and make sure it does not include 'type.'
  399. if (unlikely(name && *name)) {
  400. // when data are streamed from child nodes
  401. // name will be type.name
  402. // so, we have to remove 'type.' from name too
  403. size_t len = strlen(type);
  404. if (strncmp(type, name, len) == 0 && name[len] == '.')
  405. name = &name[len + 1];
  406. // if the name is the same with the id,
  407. // or is just 'NULL', clear it.
  408. if (unlikely(strcmp(name, id) == 0 || strcasecmp(name, "NULL") == 0 || strcasecmp(name, "(NULL)") == 0))
  409. name = NULL;
  410. }
  411. int priority = 1000;
  412. if (likely(priority_s && *priority_s))
  413. priority = str2i(priority_s);
  414. int update_every = parser->user.cd->update_every;
  415. if (likely(update_every_s && *update_every_s))
  416. update_every = str2i(update_every_s);
  417. if (unlikely(!update_every))
  418. update_every = parser->user.cd->update_every;
  419. RRDSET_TYPE chart_type = RRDSET_TYPE_LINE;
  420. if (unlikely(chart))
  421. chart_type = rrdset_type_id(chart);
  422. if (unlikely(name && !*name))
  423. name = NULL;
  424. if (unlikely(family && !*family))
  425. family = NULL;
  426. if (unlikely(context && !*context))
  427. context = NULL;
  428. if (unlikely(!title))
  429. title = "";
  430. if (unlikely(!units))
  431. units = "unknown";
  432. netdata_log_debug(
  433. D_PLUGINSD,
  434. "creating chart type='%s', id='%s', name='%s', family='%s', context='%s', chart='%s', priority=%d, update_every=%d",
  435. type, id, name ? name : "", family ? family : "", context ? context : "", rrdset_type_name(chart_type),
  436. priority, update_every);
  437. RRDSET *st = NULL;
  438. st = rrdset_create(
  439. host, type, id, name, family, context, title, units,
  440. (plugin && *plugin) ? plugin : parser->user.cd->filename,
  441. module, priority, update_every,
  442. chart_type);
  443. if (likely(st)) {
  444. if (options && *options) {
  445. if (strstr(options, "obsolete")) {
  446. pluginsd_rrdset_cleanup(st);
  447. rrdset_is_obsolete(st);
  448. }
  449. else
  450. rrdset_isnot_obsolete(st);
  451. if (strstr(options, "detail"))
  452. rrdset_flag_set(st, RRDSET_FLAG_DETAIL);
  453. else
  454. rrdset_flag_clear(st, RRDSET_FLAG_DETAIL);
  455. if (strstr(options, "hidden"))
  456. rrdset_flag_set(st, RRDSET_FLAG_HIDDEN);
  457. else
  458. rrdset_flag_clear(st, RRDSET_FLAG_HIDDEN);
  459. if (strstr(options, "store_first"))
  460. rrdset_flag_set(st, RRDSET_FLAG_STORE_FIRST);
  461. else
  462. rrdset_flag_clear(st, RRDSET_FLAG_STORE_FIRST);
  463. }
  464. else {
  465. rrdset_isnot_obsolete(st);
  466. rrdset_flag_clear(st, RRDSET_FLAG_DETAIL);
  467. rrdset_flag_clear(st, RRDSET_FLAG_STORE_FIRST);
  468. }
  469. if(!pluginsd_set_scope_chart(parser, st, PLUGINSD_KEYWORD_CHART))
  470. return PLUGINSD_DISABLE_PLUGIN(parser, NULL, NULL);
  471. }
  472. else
  473. pluginsd_clear_scope_chart(parser, PLUGINSD_KEYWORD_CHART);
  474. return PARSER_RC_OK;
  475. }
  476. static inline PARSER_RC pluginsd_chart_definition_end(char **words, size_t num_words, PARSER *parser) {
  477. const char *first_entry_txt = get_word(words, num_words, 1);
  478. const char *last_entry_txt = get_word(words, num_words, 2);
  479. const char *wall_clock_time_txt = get_word(words, num_words, 3);
  480. RRDHOST *host = pluginsd_require_scope_host(parser, PLUGINSD_KEYWORD_CHART_DEFINITION_END);
  481. if(!host) return PLUGINSD_DISABLE_PLUGIN(parser, NULL, NULL);
  482. RRDSET *st = pluginsd_require_scope_chart(parser, PLUGINSD_KEYWORD_CHART_DEFINITION_END, PLUGINSD_KEYWORD_CHART);
  483. if(!st) return PLUGINSD_DISABLE_PLUGIN(parser, NULL, NULL);
  484. time_t first_entry_child = (first_entry_txt && *first_entry_txt) ? (time_t)str2ul(first_entry_txt) : 0;
  485. time_t last_entry_child = (last_entry_txt && *last_entry_txt) ? (time_t)str2ul(last_entry_txt) : 0;
  486. time_t child_wall_clock_time = (wall_clock_time_txt && *wall_clock_time_txt) ? (time_t)str2ul(wall_clock_time_txt) : now_realtime_sec();
  487. bool ok = true;
  488. if(!rrdset_flag_check(st, RRDSET_FLAG_RECEIVER_REPLICATION_IN_PROGRESS)) {
  489. #ifdef NETDATA_LOG_REPLICATION_REQUESTS
  490. st->replay.start_streaming = false;
  491. st->replay.after = 0;
  492. st->replay.before = 0;
  493. #endif
  494. rrdset_flag_set(st, RRDSET_FLAG_RECEIVER_REPLICATION_IN_PROGRESS);
  495. rrdset_flag_clear(st, RRDSET_FLAG_RECEIVER_REPLICATION_FINISHED);
  496. rrdhost_receiver_replicating_charts_plus_one(st->rrdhost);
  497. ok = replicate_chart_request(send_to_plugin, parser, host, st,
  498. first_entry_child, last_entry_child, child_wall_clock_time,
  499. 0, 0);
  500. }
  501. #ifdef NETDATA_LOG_REPLICATION_REQUESTS
  502. else {
  503. internal_error(true, "REPLAY: 'host:%s/chart:%s' not sending duplicate replication request",
  504. rrdhost_hostname(st->rrdhost), rrdset_id(st));
  505. }
  506. #endif
  507. return ok ? PARSER_RC_OK : PARSER_RC_ERROR;
  508. }
  509. static inline PARSER_RC pluginsd_dimension(char **words, size_t num_words, PARSER *parser) {
  510. char *id = get_word(words, num_words, 1);
  511. char *name = get_word(words, num_words, 2);
  512. char *algorithm = get_word(words, num_words, 3);
  513. char *multiplier_s = get_word(words, num_words, 4);
  514. char *divisor_s = get_word(words, num_words, 5);
  515. char *options = get_word(words, num_words, 6);
  516. RRDHOST *host = pluginsd_require_scope_host(parser, PLUGINSD_KEYWORD_DIMENSION);
  517. if(!host) return PLUGINSD_DISABLE_PLUGIN(parser, NULL, NULL);
  518. RRDSET *st = pluginsd_require_scope_chart(parser, PLUGINSD_KEYWORD_DIMENSION, PLUGINSD_KEYWORD_CHART);
  519. if(!st) return PLUGINSD_DISABLE_PLUGIN(parser, NULL, NULL);
  520. if (unlikely(!id))
  521. return PLUGINSD_DISABLE_PLUGIN(parser, PLUGINSD_KEYWORD_DIMENSION, "missing dimension id");
  522. long multiplier = 1;
  523. if (multiplier_s && *multiplier_s) {
  524. multiplier = str2ll_encoded(multiplier_s);
  525. if (unlikely(!multiplier))
  526. multiplier = 1;
  527. }
  528. long divisor = 1;
  529. if (likely(divisor_s && *divisor_s)) {
  530. divisor = str2ll_encoded(divisor_s);
  531. if (unlikely(!divisor))
  532. divisor = 1;
  533. }
  534. if (unlikely(!algorithm || !*algorithm))
  535. algorithm = "absolute";
  536. if (unlikely(st && rrdset_flag_check(st, RRDSET_FLAG_DEBUG)))
  537. netdata_log_debug(
  538. D_PLUGINSD,
  539. "creating dimension in chart %s, id='%s', name='%s', algorithm='%s', multiplier=%ld, divisor=%ld, hidden='%s'",
  540. rrdset_id(st), id, name ? name : "", rrd_algorithm_name(rrd_algorithm_id(algorithm)), multiplier, divisor,
  541. options ? options : "");
  542. RRDDIM *rd = rrddim_add(st, id, name, multiplier, divisor, rrd_algorithm_id(algorithm));
  543. int unhide_dimension = 1;
  544. rrddim_option_clear(rd, RRDDIM_OPTION_DONT_DETECT_RESETS_OR_OVERFLOWS);
  545. if (options && *options) {
  546. if (strstr(options, "obsolete") != NULL)
  547. rrddim_is_obsolete(st, rd);
  548. else
  549. rrddim_isnot_obsolete(st, rd);
  550. unhide_dimension = !strstr(options, "hidden");
  551. if (strstr(options, "noreset") != NULL)
  552. rrddim_option_set(rd, RRDDIM_OPTION_DONT_DETECT_RESETS_OR_OVERFLOWS);
  553. if (strstr(options, "nooverflow") != NULL)
  554. rrddim_option_set(rd, RRDDIM_OPTION_DONT_DETECT_RESETS_OR_OVERFLOWS);
  555. } else
  556. rrddim_isnot_obsolete(st, rd);
  557. bool should_update_dimension = false;
  558. if (likely(unhide_dimension)) {
  559. rrddim_option_clear(rd, RRDDIM_OPTION_HIDDEN);
  560. should_update_dimension = rrddim_flag_check(rd, RRDDIM_FLAG_META_HIDDEN);
  561. }
  562. else {
  563. rrddim_option_set(rd, RRDDIM_OPTION_HIDDEN);
  564. should_update_dimension = !rrddim_flag_check(rd, RRDDIM_FLAG_META_HIDDEN);
  565. }
  566. if (should_update_dimension) {
  567. rrddim_flag_set(rd, RRDDIM_FLAG_METADATA_UPDATE);
  568. rrdhost_flag_set(rd->rrdset->rrdhost, RRDHOST_FLAG_METADATA_UPDATE);
  569. }
  570. return PARSER_RC_OK;
  571. }
  572. // ----------------------------------------------------------------------------
  573. // execution of functions
  574. struct inflight_function {
  575. int code;
  576. int timeout;
  577. BUFFER *destination_wb;
  578. STRING *function;
  579. void (*callback)(BUFFER *wb, int code, void *callback_data);
  580. void *callback_data;
  581. usec_t timeout_ut;
  582. usec_t started_ut;
  583. usec_t sent_ut;
  584. const char *payload;
  585. };
  586. static void inflight_functions_insert_callback(const DICTIONARY_ITEM *item, void *func, void *parser_ptr) {
  587. struct inflight_function *pf = func;
  588. PARSER *parser = parser_ptr;
  589. // leave this code as default, so that when the dictionary is destroyed this will be sent back to the caller
  590. pf->code = HTTP_RESP_GATEWAY_TIMEOUT;
  591. char buffer[2048 + 1];
  592. snprintfz(buffer, 2048, "%s %s %d \"%s\"\n",
  593. pf->payload ? "FUNCTION_PAYLOAD" : "FUNCTION",
  594. dictionary_acquired_item_name(item),
  595. pf->timeout,
  596. string2str(pf->function));
  597. // send the command to the plugin
  598. int ret = send_to_plugin(buffer, parser);
  599. pf->sent_ut = now_realtime_usec();
  600. if(ret < 0) {
  601. netdata_log_error("FUNCTION: failed to send function to plugin, error %d", ret);
  602. rrd_call_function_error(pf->destination_wb, "Failed to communicate with collector", HTTP_RESP_BACKEND_FETCH_FAILED);
  603. }
  604. else {
  605. internal_error(LOG_FUNCTIONS,
  606. "FUNCTION '%s' with transaction '%s' sent to collector (%d bytes, in %llu usec)",
  607. string2str(pf->function), dictionary_acquired_item_name(item), ret,
  608. pf->sent_ut - pf->started_ut);
  609. }
  610. if (!pf->payload)
  611. return;
  612. // send the payload to the plugin
  613. ret = send_to_plugin(pf->payload, parser);
  614. if(ret < 0) {
  615. netdata_log_error("FUNCTION_PAYLOAD: failed to send function to plugin, error %d", ret);
  616. rrd_call_function_error(pf->destination_wb, "Failed to communicate with collector", HTTP_RESP_BACKEND_FETCH_FAILED);
  617. }
  618. else {
  619. internal_error(LOG_FUNCTIONS,
  620. "FUNCTION_PAYLOAD '%s' with transaction '%s' sent to collector (%d bytes, in %llu usec)",
  621. string2str(pf->function), dictionary_acquired_item_name(item), ret,
  622. pf->sent_ut - pf->started_ut);
  623. }
  624. send_to_plugin("\nFUNCTION_PAYLOAD_END\n", parser);
  625. }
  626. static bool inflight_functions_conflict_callback(const DICTIONARY_ITEM *item __maybe_unused, void *func __maybe_unused, void *new_func, void *parser_ptr __maybe_unused) {
  627. struct inflight_function *pf = new_func;
  628. netdata_log_error("PLUGINSD_PARSER: duplicate UUID on pending function '%s' detected. Ignoring the second one.", string2str(pf->function));
  629. pf->code = rrd_call_function_error(pf->destination_wb, "This request is already in progress", HTTP_RESP_BAD_REQUEST);
  630. pf->callback(pf->destination_wb, pf->code, pf->callback_data);
  631. string_freez(pf->function);
  632. return false;
  633. }
  634. static void inflight_functions_delete_callback(const DICTIONARY_ITEM *item __maybe_unused, void *func, void *parser_ptr __maybe_unused) {
  635. struct inflight_function *pf = func;
  636. internal_error(LOG_FUNCTIONS,
  637. "FUNCTION '%s' result of transaction '%s' received from collector (%zu bytes, request %llu usec, response %llu usec)",
  638. string2str(pf->function), dictionary_acquired_item_name(item),
  639. buffer_strlen(pf->destination_wb), pf->sent_ut - pf->started_ut, now_realtime_usec() - pf->sent_ut);
  640. pf->callback(pf->destination_wb, pf->code, pf->callback_data);
  641. string_freez(pf->function);
  642. }
  643. void inflight_functions_init(PARSER *parser) {
  644. parser->inflight.functions = dictionary_create_advanced(DICT_OPTION_DONT_OVERWRITE_VALUE, &dictionary_stats_category_functions, 0);
  645. dictionary_register_insert_callback(parser->inflight.functions, inflight_functions_insert_callback, parser);
  646. dictionary_register_delete_callback(parser->inflight.functions, inflight_functions_delete_callback, parser);
  647. dictionary_register_conflict_callback(parser->inflight.functions, inflight_functions_conflict_callback, parser);
  648. }
  649. static void inflight_functions_garbage_collect(PARSER *parser, usec_t now) {
  650. parser->inflight.smaller_timeout = 0;
  651. struct inflight_function *pf;
  652. dfe_start_write(parser->inflight.functions, pf) {
  653. if (pf->timeout_ut < now) {
  654. internal_error(true,
  655. "FUNCTION '%s' removing expired transaction '%s', after %llu usec.",
  656. string2str(pf->function), pf_dfe.name, now - pf->started_ut);
  657. if(!buffer_strlen(pf->destination_wb) || pf->code == HTTP_RESP_OK)
  658. pf->code = rrd_call_function_error(pf->destination_wb,
  659. "Timeout waiting for collector response.",
  660. HTTP_RESP_GATEWAY_TIMEOUT);
  661. dictionary_del(parser->inflight.functions, pf_dfe.name);
  662. }
  663. else if(!parser->inflight.smaller_timeout || pf->timeout_ut < parser->inflight.smaller_timeout)
  664. parser->inflight.smaller_timeout = pf->timeout_ut;
  665. }
  666. dfe_done(pf);
  667. }
  668. // this is the function that is called from
  669. // rrd_call_function_and_wait() and rrd_call_function_async()
  670. static int pluginsd_execute_function_callback(BUFFER *destination_wb, int timeout, const char *function, void *collector_data, void (*callback)(BUFFER *wb, int code, void *callback_data), void *callback_data) {
  671. PARSER *parser = collector_data;
  672. usec_t now = now_realtime_usec();
  673. struct inflight_function tmp = {
  674. .started_ut = now,
  675. .timeout_ut = now + timeout * USEC_PER_SEC,
  676. .destination_wb = destination_wb,
  677. .timeout = timeout,
  678. .function = string_strdupz(function),
  679. .callback = callback,
  680. .callback_data = callback_data,
  681. .payload = NULL
  682. };
  683. uuid_t uuid;
  684. uuid_generate_time(uuid);
  685. char key[UUID_STR_LEN];
  686. uuid_unparse_lower(uuid, key);
  687. dictionary_write_lock(parser->inflight.functions);
  688. // if there is any error, our dictionary callbacks will call the caller callback to notify
  689. // the caller about the error - no need for error handling here.
  690. dictionary_set(parser->inflight.functions, key, &tmp, sizeof(struct inflight_function));
  691. if(!parser->inflight.smaller_timeout || tmp.timeout_ut < parser->inflight.smaller_timeout)
  692. parser->inflight.smaller_timeout = tmp.timeout_ut;
  693. // garbage collect stale inflight functions
  694. if(parser->inflight.smaller_timeout < now)
  695. inflight_functions_garbage_collect(parser, now);
  696. dictionary_write_unlock(parser->inflight.functions);
  697. return HTTP_RESP_OK;
  698. }
  699. static inline PARSER_RC pluginsd_function(char **words, size_t num_words, PARSER *parser) {
  700. bool global = false;
  701. size_t i = 1;
  702. if(num_words >= 2 && strcmp(get_word(words, num_words, 1), "GLOBAL") == 0) {
  703. i++;
  704. global = true;
  705. }
  706. char *name = get_word(words, num_words, i++);
  707. char *timeout_s = get_word(words, num_words, i++);
  708. char *help = get_word(words, num_words, i++);
  709. RRDHOST *host = pluginsd_require_scope_host(parser, PLUGINSD_KEYWORD_FUNCTION);
  710. if(!host) return PARSER_RC_ERROR;
  711. RRDSET *st = (global)? NULL: pluginsd_require_scope_chart(parser, PLUGINSD_KEYWORD_FUNCTION, PLUGINSD_KEYWORD_CHART);
  712. if(!st) global = true;
  713. if (unlikely(!timeout_s || !name || !help || (!global && !st))) {
  714. netdata_log_error("PLUGINSD: 'host:%s/chart:%s' got a FUNCTION, without providing the required data (global = '%s', name = '%s', timeout = '%s', help = '%s'). Ignoring it.",
  715. rrdhost_hostname(host),
  716. st?rrdset_id(st):"(unset)",
  717. global?"yes":"no",
  718. name?name:"(unset)",
  719. timeout_s?timeout_s:"(unset)",
  720. help?help:"(unset)"
  721. );
  722. return PARSER_RC_ERROR;
  723. }
  724. int timeout = PLUGINS_FUNCTIONS_TIMEOUT_DEFAULT;
  725. if (timeout_s && *timeout_s) {
  726. timeout = str2i(timeout_s);
  727. if (unlikely(timeout <= 0))
  728. timeout = PLUGINS_FUNCTIONS_TIMEOUT_DEFAULT;
  729. }
  730. rrd_collector_add_function(host, st, name, timeout, help, false, pluginsd_execute_function_callback, parser);
  731. parser->user.data_collections_count++;
  732. return PARSER_RC_OK;
  733. }
  734. static void pluginsd_function_result_end(struct parser *parser, void *action_data) {
  735. STRING *key = action_data;
  736. if(key)
  737. dictionary_del(parser->inflight.functions, string2str(key));
  738. string_freez(key);
  739. parser->user.data_collections_count++;
  740. }
  741. static inline PARSER_RC pluginsd_function_result_begin(char **words, size_t num_words, PARSER *parser) {
  742. char *key = get_word(words, num_words, 1);
  743. char *status = get_word(words, num_words, 2);
  744. char *format = get_word(words, num_words, 3);
  745. char *expires = get_word(words, num_words, 4);
  746. if (unlikely(!key || !*key || !status || !*status || !format || !*format || !expires || !*expires)) {
  747. netdata_log_error("got a " PLUGINSD_KEYWORD_FUNCTION_RESULT_BEGIN " without providing the required data (key = '%s', status = '%s', format = '%s', expires = '%s')."
  748. , key ? key : "(unset)"
  749. , status ? status : "(unset)"
  750. , format ? format : "(unset)"
  751. , expires ? expires : "(unset)"
  752. );
  753. }
  754. int code = (status && *status) ? str2i(status) : 0;
  755. if (code <= 0)
  756. code = HTTP_RESP_BACKEND_RESPONSE_INVALID;
  757. time_t expiration = (expires && *expires) ? str2l(expires) : 0;
  758. struct inflight_function *pf = NULL;
  759. if(key && *key)
  760. pf = (struct inflight_function *)dictionary_get(parser->inflight.functions, key);
  761. if(!pf) {
  762. netdata_log_error("got a " PLUGINSD_KEYWORD_FUNCTION_RESULT_BEGIN " for transaction '%s', but the transaction is not found.", key?key:"(unset)");
  763. }
  764. else {
  765. if(format && *format)
  766. pf->destination_wb->content_type = functions_format_to_content_type(format);
  767. pf->code = code;
  768. pf->destination_wb->expires = expiration;
  769. if(expiration <= now_realtime_sec())
  770. buffer_no_cacheable(pf->destination_wb);
  771. else
  772. buffer_cacheable(pf->destination_wb);
  773. }
  774. parser->defer.response = (pf) ? pf->destination_wb : NULL;
  775. parser->defer.end_keyword = PLUGINSD_KEYWORD_FUNCTION_RESULT_END;
  776. parser->defer.action = pluginsd_function_result_end;
  777. parser->defer.action_data = string_strdupz(key); // it is ok is key is NULL
  778. parser->flags |= PARSER_DEFER_UNTIL_KEYWORD;
  779. return PARSER_RC_OK;
  780. }
  781. // ----------------------------------------------------------------------------
  782. static inline PARSER_RC pluginsd_variable(char **words, size_t num_words, PARSER *parser) {
  783. char *name = get_word(words, num_words, 1);
  784. char *value = get_word(words, num_words, 2);
  785. NETDATA_DOUBLE v;
  786. RRDHOST *host = pluginsd_require_scope_host(parser, PLUGINSD_KEYWORD_VARIABLE);
  787. if(!host) return PLUGINSD_DISABLE_PLUGIN(parser, NULL, NULL);
  788. RRDSET *st = pluginsd_get_scope_chart(parser);
  789. int global = (st) ? 0 : 1;
  790. if (name && *name) {
  791. if ((strcmp(name, "GLOBAL") == 0 || strcmp(name, "HOST") == 0)) {
  792. global = 1;
  793. name = get_word(words, num_words, 2);
  794. value = get_word(words, num_words, 3);
  795. } else if ((strcmp(name, "LOCAL") == 0 || strcmp(name, "CHART") == 0)) {
  796. global = 0;
  797. name = get_word(words, num_words, 2);
  798. value = get_word(words, num_words, 3);
  799. }
  800. }
  801. if (unlikely(!name || !*name))
  802. return PLUGINSD_DISABLE_PLUGIN(parser, PLUGINSD_KEYWORD_VARIABLE, "missing variable name");
  803. if (unlikely(!value || !*value))
  804. value = NULL;
  805. if (unlikely(!value)) {
  806. netdata_log_error("PLUGINSD: 'host:%s/chart:%s' cannot set %s VARIABLE '%s' to an empty value",
  807. rrdhost_hostname(host),
  808. st ? rrdset_id(st):"UNSET",
  809. (global) ? "HOST" : "CHART",
  810. name);
  811. return PARSER_RC_OK;
  812. }
  813. if (!global && !st)
  814. return PLUGINSD_DISABLE_PLUGIN(parser, PLUGINSD_KEYWORD_VARIABLE, "no chart is defined and no GLOBAL is given");
  815. char *endptr = NULL;
  816. v = (NETDATA_DOUBLE) str2ndd_encoded(value, &endptr);
  817. if (unlikely(endptr && *endptr)) {
  818. if (endptr == value)
  819. netdata_log_error("PLUGINSD: 'host:%s/chart:%s' the value '%s' of VARIABLE '%s' cannot be parsed as a number",
  820. rrdhost_hostname(host),
  821. st ? rrdset_id(st):"UNSET",
  822. value,
  823. name);
  824. else
  825. netdata_log_error("PLUGINSD: 'host:%s/chart:%s' the value '%s' of VARIABLE '%s' has leftovers: '%s'",
  826. rrdhost_hostname(host),
  827. st ? rrdset_id(st):"UNSET",
  828. value,
  829. name,
  830. endptr);
  831. }
  832. if (global) {
  833. const RRDVAR_ACQUIRED *rva = rrdvar_custom_host_variable_add_and_acquire(host, name);
  834. if (rva) {
  835. rrdvar_custom_host_variable_set(host, rva, v);
  836. rrdvar_custom_host_variable_release(host, rva);
  837. }
  838. else
  839. netdata_log_error("PLUGINSD: 'host:%s' cannot find/create HOST VARIABLE '%s'",
  840. rrdhost_hostname(host),
  841. name);
  842. } else {
  843. const RRDSETVAR_ACQUIRED *rsa = rrdsetvar_custom_chart_variable_add_and_acquire(st, name);
  844. if (rsa) {
  845. rrdsetvar_custom_chart_variable_set(st, rsa, v);
  846. rrdsetvar_custom_chart_variable_release(st, rsa);
  847. }
  848. else
  849. netdata_log_error("PLUGINSD: 'host:%s/chart:%s' cannot find/create CHART VARIABLE '%s'",
  850. rrdhost_hostname(host), rrdset_id(st), name);
  851. }
  852. return PARSER_RC_OK;
  853. }
  854. static inline PARSER_RC pluginsd_flush(char **words __maybe_unused, size_t num_words __maybe_unused, PARSER *parser) {
  855. netdata_log_debug(D_PLUGINSD, "requested a " PLUGINSD_KEYWORD_FLUSH);
  856. pluginsd_clear_scope_chart(parser, PLUGINSD_KEYWORD_FLUSH);
  857. parser->user.replay.start_time = 0;
  858. parser->user.replay.end_time = 0;
  859. parser->user.replay.start_time_ut = 0;
  860. parser->user.replay.end_time_ut = 0;
  861. return PARSER_RC_OK;
  862. }
  863. static inline PARSER_RC pluginsd_disable(char **words __maybe_unused, size_t num_words __maybe_unused, PARSER *parser) {
  864. netdata_log_info("PLUGINSD: plugin called DISABLE. Disabling it.");
  865. parser->user.enabled = 0;
  866. return PARSER_RC_STOP;
  867. }
  868. static inline PARSER_RC pluginsd_label(char **words, size_t num_words, PARSER *parser) {
  869. const char *name = get_word(words, num_words, 1);
  870. const char *label_source = get_word(words, num_words, 2);
  871. const char *value = get_word(words, num_words, 3);
  872. if (!name || !label_source || !value)
  873. return PLUGINSD_DISABLE_PLUGIN(parser, PLUGINSD_KEYWORD_LABEL, "missing parameters");
  874. char *store = (char *)value;
  875. bool allocated_store = false;
  876. if(unlikely(num_words > 4)) {
  877. allocated_store = true;
  878. store = mallocz(PLUGINSD_LINE_MAX + 1);
  879. size_t remaining = PLUGINSD_LINE_MAX;
  880. char *move = store;
  881. char *word;
  882. for(size_t i = 3; i < num_words && remaining > 2 && (word = get_word(words, num_words, i)) ;i++) {
  883. if(i > 3) {
  884. *move++ = ' ';
  885. *move = '\0';
  886. remaining--;
  887. }
  888. size_t length = strlen(word);
  889. if (length > remaining)
  890. length = remaining;
  891. remaining -= length;
  892. memcpy(move, word, length);
  893. move += length;
  894. *move = '\0';
  895. }
  896. }
  897. if(unlikely(!(parser->user.new_host_labels)))
  898. parser->user.new_host_labels = rrdlabels_create();
  899. rrdlabels_add(parser->user.new_host_labels, name, store, str2l(label_source));
  900. if (allocated_store)
  901. freez(store);
  902. return PARSER_RC_OK;
  903. }
  904. static inline PARSER_RC pluginsd_overwrite(char **words __maybe_unused, size_t num_words __maybe_unused, PARSER *parser) {
  905. RRDHOST *host = pluginsd_require_scope_host(parser, PLUGINSD_KEYWORD_OVERWRITE);
  906. if(!host) return PLUGINSD_DISABLE_PLUGIN(parser, NULL, NULL);
  907. netdata_log_debug(D_PLUGINSD, "requested to OVERWRITE host labels");
  908. if(unlikely(!host->rrdlabels))
  909. host->rrdlabels = rrdlabels_create();
  910. rrdlabels_migrate_to_these(host->rrdlabels, parser->user.new_host_labels);
  911. rrdhost_flag_set(host, RRDHOST_FLAG_METADATA_LABELS | RRDHOST_FLAG_METADATA_UPDATE);
  912. rrdlabels_destroy(parser->user.new_host_labels);
  913. parser->user.new_host_labels = NULL;
  914. return PARSER_RC_OK;
  915. }
  916. static inline PARSER_RC pluginsd_clabel(char **words, size_t num_words, PARSER *parser) {
  917. const char *name = get_word(words, num_words, 1);
  918. const char *value = get_word(words, num_words, 2);
  919. const char *label_source = get_word(words, num_words, 3);
  920. if (!name || !value || !*label_source) {
  921. netdata_log_error("Ignoring malformed or empty CHART LABEL command.");
  922. return PLUGINSD_DISABLE_PLUGIN(parser, NULL, NULL);
  923. }
  924. if(unlikely(!parser->user.chart_rrdlabels_linked_temporarily)) {
  925. RRDSET *st = pluginsd_get_scope_chart(parser);
  926. parser->user.chart_rrdlabels_linked_temporarily = st->rrdlabels;
  927. rrdlabels_unmark_all(parser->user.chart_rrdlabels_linked_temporarily);
  928. }
  929. rrdlabels_add(parser->user.chart_rrdlabels_linked_temporarily, name, value, str2l(label_source));
  930. return PARSER_RC_OK;
  931. }
  932. static inline PARSER_RC pluginsd_clabel_commit(char **words __maybe_unused, size_t num_words __maybe_unused, PARSER *parser) {
  933. RRDHOST *host = pluginsd_require_scope_host(parser, PLUGINSD_KEYWORD_CLABEL_COMMIT);
  934. if(!host) return PLUGINSD_DISABLE_PLUGIN(parser, NULL, NULL);
  935. RRDSET *st = pluginsd_require_scope_chart(parser, PLUGINSD_KEYWORD_CLABEL_COMMIT, PLUGINSD_KEYWORD_BEGIN);
  936. if(!st) return PLUGINSD_DISABLE_PLUGIN(parser, NULL, NULL);
  937. netdata_log_debug(D_PLUGINSD, "requested to commit chart labels");
  938. if(!parser->user.chart_rrdlabels_linked_temporarily) {
  939. netdata_log_error("PLUGINSD: 'host:%s' got CLABEL_COMMIT, without a CHART or BEGIN. Ignoring it.", rrdhost_hostname(host));
  940. return PLUGINSD_DISABLE_PLUGIN(parser, NULL, NULL);
  941. }
  942. rrdlabels_remove_all_unmarked(parser->user.chart_rrdlabels_linked_temporarily);
  943. rrdset_flag_set(st, RRDSET_FLAG_METADATA_UPDATE);
  944. rrdhost_flag_set(st->rrdhost, RRDHOST_FLAG_METADATA_UPDATE);
  945. parser->user.chart_rrdlabels_linked_temporarily = NULL;
  946. return PARSER_RC_OK;
  947. }
  948. static inline PARSER_RC pluginsd_replay_begin(char **words, size_t num_words, PARSER *parser) {
  949. char *id = get_word(words, num_words, 1);
  950. char *start_time_str = get_word(words, num_words, 2);
  951. char *end_time_str = get_word(words, num_words, 3);
  952. char *child_now_str = get_word(words, num_words, 4);
  953. RRDHOST *host = pluginsd_require_scope_host(parser, PLUGINSD_KEYWORD_REPLAY_BEGIN);
  954. if(!host) return PLUGINSD_DISABLE_PLUGIN(parser, NULL, NULL);
  955. RRDSET *st;
  956. if (likely(!id || !*id))
  957. st = pluginsd_require_scope_chart(parser, PLUGINSD_KEYWORD_REPLAY_BEGIN, PLUGINSD_KEYWORD_REPLAY_BEGIN);
  958. else
  959. st = pluginsd_find_chart(host, id, PLUGINSD_KEYWORD_REPLAY_BEGIN);
  960. if(!st) return PLUGINSD_DISABLE_PLUGIN(parser, NULL, NULL);
  961. if(!pluginsd_set_scope_chart(parser, st, PLUGINSD_KEYWORD_REPLAY_BEGIN))
  962. return PLUGINSD_DISABLE_PLUGIN(parser, NULL, NULL);
  963. if(start_time_str && end_time_str) {
  964. time_t start_time = (time_t) str2ull_encoded(start_time_str);
  965. time_t end_time = (time_t) str2ull_encoded(end_time_str);
  966. time_t wall_clock_time = 0, tolerance;
  967. bool wall_clock_comes_from_child; (void)wall_clock_comes_from_child;
  968. if(child_now_str) {
  969. wall_clock_time = (time_t) str2ull_encoded(child_now_str);
  970. tolerance = st->update_every + 1;
  971. wall_clock_comes_from_child = true;
  972. }
  973. if(wall_clock_time <= 0) {
  974. wall_clock_time = now_realtime_sec();
  975. tolerance = st->update_every + 5;
  976. wall_clock_comes_from_child = false;
  977. }
  978. #ifdef NETDATA_LOG_REPLICATION_REQUESTS
  979. internal_error(
  980. (!st->replay.start_streaming && (end_time < st->replay.after || start_time > st->replay.before)),
  981. "REPLAY ERROR: 'host:%s/chart:%s' got a " PLUGINSD_KEYWORD_REPLAY_BEGIN " from %ld to %ld, which does not match our request (%ld to %ld).",
  982. rrdhost_hostname(st->rrdhost), rrdset_id(st), start_time, end_time, st->replay.after, st->replay.before);
  983. internal_error(
  984. true,
  985. "REPLAY: 'host:%s/chart:%s' got a " PLUGINSD_KEYWORD_REPLAY_BEGIN " from %ld to %ld, child wall clock is %ld (%s), had requested %ld to %ld",
  986. rrdhost_hostname(st->rrdhost), rrdset_id(st),
  987. start_time, end_time, wall_clock_time, wall_clock_comes_from_child ? "from child" : "parent time",
  988. st->replay.after, st->replay.before);
  989. #endif
  990. if(start_time && end_time && start_time < wall_clock_time + tolerance && end_time < wall_clock_time + tolerance && start_time < end_time) {
  991. if (unlikely(end_time - start_time != st->update_every))
  992. rrdset_set_update_every_s(st, end_time - start_time);
  993. st->last_collected_time.tv_sec = end_time;
  994. st->last_collected_time.tv_usec = 0;
  995. st->last_updated.tv_sec = end_time;
  996. st->last_updated.tv_usec = 0;
  997. st->counter++;
  998. st->counter_done++;
  999. // these are only needed for db mode RAM, SAVE, MAP, ALLOC
  1000. st->db.current_entry++;
  1001. if(st->db.current_entry >= st->db.entries)
  1002. st->db.current_entry -= st->db.entries;
  1003. parser->user.replay.start_time = start_time;
  1004. parser->user.replay.end_time = end_time;
  1005. parser->user.replay.start_time_ut = (usec_t) start_time * USEC_PER_SEC;
  1006. parser->user.replay.end_time_ut = (usec_t) end_time * USEC_PER_SEC;
  1007. parser->user.replay.wall_clock_time = wall_clock_time;
  1008. parser->user.replay.rset_enabled = true;
  1009. return PARSER_RC_OK;
  1010. }
  1011. netdata_log_error("PLUGINSD REPLAY ERROR: 'host:%s/chart:%s' got a " PLUGINSD_KEYWORD_REPLAY_BEGIN
  1012. " from %ld to %ld, but timestamps are invalid "
  1013. "(now is %ld [%s], tolerance %ld). Ignoring " PLUGINSD_KEYWORD_REPLAY_SET,
  1014. rrdhost_hostname(st->rrdhost), rrdset_id(st), start_time, end_time,
  1015. wall_clock_time, wall_clock_comes_from_child ? "child wall clock" : "parent wall clock",
  1016. tolerance);
  1017. }
  1018. // the child sends an RBEGIN without any parameters initially
  1019. // setting rset_enabled to false, means the RSET should not store any metrics
  1020. // to store metrics, the RBEGIN needs to have timestamps
  1021. parser->user.replay.start_time = 0;
  1022. parser->user.replay.end_time = 0;
  1023. parser->user.replay.start_time_ut = 0;
  1024. parser->user.replay.end_time_ut = 0;
  1025. parser->user.replay.wall_clock_time = 0;
  1026. parser->user.replay.rset_enabled = false;
  1027. return PARSER_RC_OK;
  1028. }
  1029. static inline SN_FLAGS pluginsd_parse_storage_number_flags(const char *flags_str) {
  1030. SN_FLAGS flags = SN_FLAG_NONE;
  1031. char c;
  1032. while ((c = *flags_str++)) {
  1033. switch (c) {
  1034. case 'A':
  1035. flags |= SN_FLAG_NOT_ANOMALOUS;
  1036. break;
  1037. case 'R':
  1038. flags |= SN_FLAG_RESET;
  1039. break;
  1040. case 'E':
  1041. flags = SN_EMPTY_SLOT;
  1042. return flags;
  1043. default:
  1044. internal_error(true, "Unknown SN_FLAGS flag '%c'", c);
  1045. break;
  1046. }
  1047. }
  1048. return flags;
  1049. }
  1050. static inline PARSER_RC pluginsd_replay_set(char **words, size_t num_words, PARSER *parser) {
  1051. char *dimension = get_word(words, num_words, 1);
  1052. char *value_str = get_word(words, num_words, 2);
  1053. char *flags_str = get_word(words, num_words, 3);
  1054. RRDHOST *host = pluginsd_require_scope_host(parser, PLUGINSD_KEYWORD_REPLAY_SET);
  1055. if(!host) return PLUGINSD_DISABLE_PLUGIN(parser, NULL, NULL);
  1056. RRDSET *st = pluginsd_require_scope_chart(parser, PLUGINSD_KEYWORD_REPLAY_SET, PLUGINSD_KEYWORD_REPLAY_BEGIN);
  1057. if(!st) return PLUGINSD_DISABLE_PLUGIN(parser, NULL, NULL);
  1058. if(!parser->user.replay.rset_enabled) {
  1059. error_limit_static_thread_var(erl, 1, 0);
  1060. error_limit(&erl, "PLUGINSD: 'host:%s/chart:%s' got a %s but it is disabled by %s errors",
  1061. rrdhost_hostname(host), rrdset_id(st), PLUGINSD_KEYWORD_REPLAY_SET, PLUGINSD_KEYWORD_REPLAY_BEGIN);
  1062. // we have to return OK here
  1063. return PARSER_RC_OK;
  1064. }
  1065. RRDDIM *rd = pluginsd_acquire_dimension(host, st, dimension, PLUGINSD_KEYWORD_REPLAY_SET);
  1066. if(!rd) return PLUGINSD_DISABLE_PLUGIN(parser, NULL, NULL);
  1067. st->pluginsd.set = true;
  1068. if (unlikely(!parser->user.replay.start_time || !parser->user.replay.end_time)) {
  1069. netdata_log_error("PLUGINSD: 'host:%s/chart:%s/dim:%s' got a %s with invalid timestamps %ld to %ld from a %s. Disabling it.",
  1070. rrdhost_hostname(host),
  1071. rrdset_id(st),
  1072. dimension,
  1073. PLUGINSD_KEYWORD_REPLAY_SET,
  1074. parser->user.replay.start_time,
  1075. parser->user.replay.end_time,
  1076. PLUGINSD_KEYWORD_REPLAY_BEGIN);
  1077. return PLUGINSD_DISABLE_PLUGIN(parser, NULL, NULL);
  1078. }
  1079. if (unlikely(!value_str || !*value_str))
  1080. value_str = "NAN";
  1081. if(unlikely(!flags_str))
  1082. flags_str = "";
  1083. if (likely(value_str)) {
  1084. RRDDIM_FLAGS rd_flags = rrddim_flag_check(rd, RRDDIM_FLAG_OBSOLETE | RRDDIM_FLAG_ARCHIVED);
  1085. if(!(rd_flags & RRDDIM_FLAG_ARCHIVED)) {
  1086. NETDATA_DOUBLE value = str2ndd_encoded(value_str, NULL);
  1087. SN_FLAGS flags = pluginsd_parse_storage_number_flags(flags_str);
  1088. if (!netdata_double_isnumber(value) || (flags == SN_EMPTY_SLOT)) {
  1089. value = NAN;
  1090. flags = SN_EMPTY_SLOT;
  1091. }
  1092. rrddim_store_metric(rd, parser->user.replay.end_time_ut, value, flags);
  1093. rd->collector.last_collected_time.tv_sec = parser->user.replay.end_time;
  1094. rd->collector.last_collected_time.tv_usec = 0;
  1095. rd->collector.counter++;
  1096. }
  1097. else {
  1098. error_limit_static_global_var(erl, 1, 0);
  1099. error_limit(&erl, "PLUGINSD: 'host:%s/chart:%s/dim:%s' has the ARCHIVED flag set, but it is replicated. Ignoring data.",
  1100. rrdhost_hostname(st->rrdhost), rrdset_id(st), rrddim_name(rd));
  1101. }
  1102. }
  1103. return PARSER_RC_OK;
  1104. }
  1105. static inline PARSER_RC pluginsd_replay_rrddim_collection_state(char **words, size_t num_words, PARSER *parser) {
  1106. if(parser->user.replay.rset_enabled == false)
  1107. return PARSER_RC_OK;
  1108. char *dimension = get_word(words, num_words, 1);
  1109. char *last_collected_ut_str = get_word(words, num_words, 2);
  1110. char *last_collected_value_str = get_word(words, num_words, 3);
  1111. char *last_calculated_value_str = get_word(words, num_words, 4);
  1112. char *last_stored_value_str = get_word(words, num_words, 5);
  1113. RRDHOST *host = pluginsd_require_scope_host(parser, PLUGINSD_KEYWORD_REPLAY_RRDDIM_STATE);
  1114. if(!host) return PLUGINSD_DISABLE_PLUGIN(parser, NULL, NULL);
  1115. RRDSET *st = pluginsd_require_scope_chart(parser, PLUGINSD_KEYWORD_REPLAY_RRDDIM_STATE, PLUGINSD_KEYWORD_REPLAY_BEGIN);
  1116. if(!st) return PLUGINSD_DISABLE_PLUGIN(parser, NULL, NULL);
  1117. if(st->pluginsd.set) {
  1118. // reset pos to reuse the same RDAs
  1119. st->pluginsd.pos = 0;
  1120. st->pluginsd.set = false;
  1121. }
  1122. RRDDIM *rd = pluginsd_acquire_dimension(host, st, dimension, PLUGINSD_KEYWORD_REPLAY_RRDDIM_STATE);
  1123. if(!rd) return PLUGINSD_DISABLE_PLUGIN(parser, NULL, NULL);
  1124. usec_t dim_last_collected_ut = (usec_t)rd->collector.last_collected_time.tv_sec * USEC_PER_SEC + (usec_t)rd->collector.last_collected_time.tv_usec;
  1125. usec_t last_collected_ut = last_collected_ut_str ? str2ull_encoded(last_collected_ut_str) : 0;
  1126. if(last_collected_ut > dim_last_collected_ut) {
  1127. rd->collector.last_collected_time.tv_sec = (time_t)(last_collected_ut / USEC_PER_SEC);
  1128. rd->collector.last_collected_time.tv_usec = (last_collected_ut % USEC_PER_SEC);
  1129. }
  1130. rd->collector.last_collected_value = last_collected_value_str ? str2ll_encoded(last_collected_value_str) : 0;
  1131. rd->collector.last_calculated_value = last_calculated_value_str ? str2ndd_encoded(last_calculated_value_str, NULL) : 0;
  1132. rd->collector.last_stored_value = last_stored_value_str ? str2ndd_encoded(last_stored_value_str, NULL) : 0.0;
  1133. return PARSER_RC_OK;
  1134. }
  1135. static inline PARSER_RC pluginsd_replay_rrdset_collection_state(char **words, size_t num_words, PARSER *parser) {
  1136. if(parser->user.replay.rset_enabled == false)
  1137. return PARSER_RC_OK;
  1138. char *last_collected_ut_str = get_word(words, num_words, 1);
  1139. char *last_updated_ut_str = get_word(words, num_words, 2);
  1140. RRDHOST *host = pluginsd_require_scope_host(parser, PLUGINSD_KEYWORD_REPLAY_RRDSET_STATE);
  1141. if(!host) return PLUGINSD_DISABLE_PLUGIN(parser, NULL, NULL);
  1142. RRDSET *st = pluginsd_require_scope_chart(parser, PLUGINSD_KEYWORD_REPLAY_RRDSET_STATE,
  1143. PLUGINSD_KEYWORD_REPLAY_BEGIN);
  1144. if(!st) return PLUGINSD_DISABLE_PLUGIN(parser, NULL, NULL);
  1145. usec_t chart_last_collected_ut = (usec_t)st->last_collected_time.tv_sec * USEC_PER_SEC + (usec_t)st->last_collected_time.tv_usec;
  1146. usec_t last_collected_ut = last_collected_ut_str ? str2ull_encoded(last_collected_ut_str) : 0;
  1147. if(last_collected_ut > chart_last_collected_ut) {
  1148. st->last_collected_time.tv_sec = (time_t)(last_collected_ut / USEC_PER_SEC);
  1149. st->last_collected_time.tv_usec = (last_collected_ut % USEC_PER_SEC);
  1150. }
  1151. usec_t chart_last_updated_ut = (usec_t)st->last_updated.tv_sec * USEC_PER_SEC + (usec_t)st->last_updated.tv_usec;
  1152. usec_t last_updated_ut = last_updated_ut_str ? str2ull_encoded(last_updated_ut_str) : 0;
  1153. if(last_updated_ut > chart_last_updated_ut) {
  1154. st->last_updated.tv_sec = (time_t)(last_updated_ut / USEC_PER_SEC);
  1155. st->last_updated.tv_usec = (last_updated_ut % USEC_PER_SEC);
  1156. }
  1157. st->counter++;
  1158. st->counter_done++;
  1159. return PARSER_RC_OK;
  1160. }
  1161. static inline PARSER_RC pluginsd_replay_end(char **words, size_t num_words, PARSER *parser) {
  1162. if (num_words < 7) { // accepts 7, but the 7th is optional
  1163. netdata_log_error("REPLAY: malformed " PLUGINSD_KEYWORD_REPLAY_END " command");
  1164. return PARSER_RC_ERROR;
  1165. }
  1166. const char *update_every_child_txt = get_word(words, num_words, 1);
  1167. const char *first_entry_child_txt = get_word(words, num_words, 2);
  1168. const char *last_entry_child_txt = get_word(words, num_words, 3);
  1169. const char *start_streaming_txt = get_word(words, num_words, 4);
  1170. const char *first_entry_requested_txt = get_word(words, num_words, 5);
  1171. const char *last_entry_requested_txt = get_word(words, num_words, 6);
  1172. const char *child_world_time_txt = get_word(words, num_words, 7); // optional
  1173. time_t update_every_child = (time_t) str2ull_encoded(update_every_child_txt);
  1174. time_t first_entry_child = (time_t) str2ull_encoded(first_entry_child_txt);
  1175. time_t last_entry_child = (time_t) str2ull_encoded(last_entry_child_txt);
  1176. bool start_streaming = (strcmp(start_streaming_txt, "true") == 0);
  1177. time_t first_entry_requested = (time_t) str2ull_encoded(first_entry_requested_txt);
  1178. time_t last_entry_requested = (time_t) str2ull_encoded(last_entry_requested_txt);
  1179. // the optional child world time
  1180. time_t child_world_time = (child_world_time_txt && *child_world_time_txt) ? (time_t) str2ull_encoded(
  1181. child_world_time_txt) : now_realtime_sec();
  1182. RRDHOST *host = pluginsd_require_scope_host(parser, PLUGINSD_KEYWORD_REPLAY_END);
  1183. if(!host) return PLUGINSD_DISABLE_PLUGIN(parser, NULL, NULL);
  1184. RRDSET *st = pluginsd_require_scope_chart(parser, PLUGINSD_KEYWORD_REPLAY_END, PLUGINSD_KEYWORD_REPLAY_BEGIN);
  1185. if(!st) return PLUGINSD_DISABLE_PLUGIN(parser, NULL, NULL);
  1186. #ifdef NETDATA_LOG_REPLICATION_REQUESTS
  1187. internal_error(true,
  1188. "PLUGINSD REPLAY: 'host:%s/chart:%s': got a " PLUGINSD_KEYWORD_REPLAY_END " child db from %llu to %llu, start_streaming %s, had requested from %llu to %llu, wall clock %llu",
  1189. rrdhost_hostname(host), rrdset_id(st),
  1190. (unsigned long long)first_entry_child, (unsigned long long)last_entry_child,
  1191. start_streaming?"true":"false",
  1192. (unsigned long long)first_entry_requested, (unsigned long long)last_entry_requested,
  1193. (unsigned long long)child_world_time
  1194. );
  1195. #endif
  1196. parser->user.data_collections_count++;
  1197. if(parser->user.replay.rset_enabled && st->rrdhost->receiver) {
  1198. time_t now = now_realtime_sec();
  1199. time_t started = st->rrdhost->receiver->replication_first_time_t;
  1200. time_t current = parser->user.replay.end_time;
  1201. if(started && current > started) {
  1202. host->rrdpush_receiver_replication_percent = (NETDATA_DOUBLE) (current - started) * 100.0 / (NETDATA_DOUBLE) (now - started);
  1203. worker_set_metric(WORKER_RECEIVER_JOB_REPLICATION_COMPLETION,
  1204. host->rrdpush_receiver_replication_percent);
  1205. }
  1206. }
  1207. parser->user.replay.start_time = 0;
  1208. parser->user.replay.end_time = 0;
  1209. parser->user.replay.start_time_ut = 0;
  1210. parser->user.replay.end_time_ut = 0;
  1211. parser->user.replay.wall_clock_time = 0;
  1212. parser->user.replay.rset_enabled = false;
  1213. st->counter++;
  1214. st->counter_done++;
  1215. store_metric_collection_completed();
  1216. #ifdef NETDATA_LOG_REPLICATION_REQUESTS
  1217. st->replay.start_streaming = false;
  1218. st->replay.after = 0;
  1219. st->replay.before = 0;
  1220. if(start_streaming)
  1221. st->replay.log_next_data_collection = true;
  1222. #endif
  1223. if (start_streaming) {
  1224. if (st->update_every != update_every_child)
  1225. rrdset_set_update_every_s(st, update_every_child);
  1226. if(rrdset_flag_check(st, RRDSET_FLAG_RECEIVER_REPLICATION_IN_PROGRESS)) {
  1227. rrdset_flag_set(st, RRDSET_FLAG_RECEIVER_REPLICATION_FINISHED);
  1228. rrdset_flag_clear(st, RRDSET_FLAG_RECEIVER_REPLICATION_IN_PROGRESS);
  1229. rrdset_flag_clear(st, RRDSET_FLAG_SYNC_CLOCK);
  1230. rrdhost_receiver_replicating_charts_minus_one(st->rrdhost);
  1231. }
  1232. #ifdef NETDATA_LOG_REPLICATION_REQUESTS
  1233. else
  1234. internal_error(true, "REPLAY ERROR: 'host:%s/chart:%s' got a " PLUGINSD_KEYWORD_REPLAY_END " with enable_streaming = true, but there is no replication in progress for this chart.",
  1235. rrdhost_hostname(host), rrdset_id(st));
  1236. #endif
  1237. pluginsd_clear_scope_chart(parser, PLUGINSD_KEYWORD_REPLAY_END);
  1238. host->rrdpush_receiver_replication_percent = 100.0;
  1239. worker_set_metric(WORKER_RECEIVER_JOB_REPLICATION_COMPLETION, host->rrdpush_receiver_replication_percent);
  1240. return PARSER_RC_OK;
  1241. }
  1242. pluginsd_clear_scope_chart(parser, PLUGINSD_KEYWORD_REPLAY_END);
  1243. rrdcontext_updated_retention_rrdset(st);
  1244. bool ok = replicate_chart_request(send_to_plugin, parser, host, st,
  1245. first_entry_child, last_entry_child, child_world_time,
  1246. first_entry_requested, last_entry_requested);
  1247. return ok ? PARSER_RC_OK : PARSER_RC_ERROR;
  1248. }
  1249. static inline PARSER_RC pluginsd_begin_v2(char **words, size_t num_words, PARSER *parser) {
  1250. timing_init();
  1251. char *id = get_word(words, num_words, 1);
  1252. char *update_every_str = get_word(words, num_words, 2);
  1253. char *end_time_str = get_word(words, num_words, 3);
  1254. char *wall_clock_time_str = get_word(words, num_words, 4);
  1255. if(unlikely(!id || !update_every_str || !end_time_str || !wall_clock_time_str))
  1256. return PLUGINSD_DISABLE_PLUGIN(parser, PLUGINSD_KEYWORD_BEGIN_V2, "missing parameters");
  1257. RRDHOST *host = pluginsd_require_scope_host(parser, PLUGINSD_KEYWORD_BEGIN_V2);
  1258. if(unlikely(!host)) return PLUGINSD_DISABLE_PLUGIN(parser, NULL, NULL);
  1259. timing_step(TIMING_STEP_BEGIN2_PREPARE);
  1260. RRDSET *st = pluginsd_find_chart(host, id, PLUGINSD_KEYWORD_BEGIN_V2);
  1261. if(unlikely(!st)) return PLUGINSD_DISABLE_PLUGIN(parser, NULL, NULL);
  1262. if(!pluginsd_set_scope_chart(parser, st, PLUGINSD_KEYWORD_BEGIN_V2))
  1263. return PLUGINSD_DISABLE_PLUGIN(parser, NULL, NULL);
  1264. if(unlikely(rrdset_flag_check(st, RRDSET_FLAG_OBSOLETE | RRDSET_FLAG_ARCHIVED)))
  1265. rrdset_isnot_obsolete(st);
  1266. timing_step(TIMING_STEP_BEGIN2_FIND_CHART);
  1267. // ------------------------------------------------------------------------
  1268. // parse the parameters
  1269. time_t update_every = (time_t) str2ull_encoded(update_every_str);
  1270. time_t end_time = (time_t) str2ull_encoded(end_time_str);
  1271. time_t wall_clock_time;
  1272. if(likely(*wall_clock_time_str == '#'))
  1273. wall_clock_time = end_time;
  1274. else
  1275. wall_clock_time = (time_t) str2ull_encoded(wall_clock_time_str);
  1276. if (unlikely(update_every != st->update_every))
  1277. rrdset_set_update_every_s(st, update_every);
  1278. timing_step(TIMING_STEP_BEGIN2_PARSE);
  1279. // ------------------------------------------------------------------------
  1280. // prepare our state
  1281. pluginsd_lock_rrdset_data_collection(parser);
  1282. parser->user.v2.update_every = update_every;
  1283. parser->user.v2.end_time = end_time;
  1284. parser->user.v2.wall_clock_time = wall_clock_time;
  1285. parser->user.v2.ml_locked = ml_chart_update_begin(st);
  1286. timing_step(TIMING_STEP_BEGIN2_ML);
  1287. // ------------------------------------------------------------------------
  1288. // propagate it forward in v2
  1289. if(!parser->user.v2.stream_buffer.wb && rrdhost_has_rrdpush_sender_enabled(st->rrdhost))
  1290. parser->user.v2.stream_buffer = rrdset_push_metric_initialize(parser->user.st, wall_clock_time);
  1291. if(parser->user.v2.stream_buffer.v2 && parser->user.v2.stream_buffer.wb) {
  1292. // check if receiver and sender have the same number parsing capabilities
  1293. bool can_copy = stream_has_capability(&parser->user, STREAM_CAP_IEEE754) == stream_has_capability(&parser->user.v2.stream_buffer, STREAM_CAP_IEEE754);
  1294. NUMBER_ENCODING encoding = stream_has_capability(&parser->user.v2.stream_buffer, STREAM_CAP_IEEE754) ? NUMBER_ENCODING_BASE64 : NUMBER_ENCODING_HEX;
  1295. BUFFER *wb = parser->user.v2.stream_buffer.wb;
  1296. buffer_need_bytes(wb, 1024);
  1297. if(unlikely(parser->user.v2.stream_buffer.begin_v2_added))
  1298. buffer_fast_strcat(wb, PLUGINSD_KEYWORD_END_V2 "\n", sizeof(PLUGINSD_KEYWORD_END_V2) - 1 + 1);
  1299. buffer_fast_strcat(wb, PLUGINSD_KEYWORD_BEGIN_V2 " '", sizeof(PLUGINSD_KEYWORD_BEGIN_V2) - 1 + 2);
  1300. buffer_fast_strcat(wb, rrdset_id(st), string_strlen(st->id));
  1301. buffer_fast_strcat(wb, "' ", 2);
  1302. if(can_copy)
  1303. buffer_strcat(wb, update_every_str);
  1304. else
  1305. buffer_print_uint64_encoded(wb, encoding, update_every);
  1306. buffer_fast_strcat(wb, " ", 1);
  1307. if(can_copy)
  1308. buffer_strcat(wb, end_time_str);
  1309. else
  1310. buffer_print_uint64_encoded(wb, encoding, end_time);
  1311. buffer_fast_strcat(wb, " ", 1);
  1312. if(can_copy)
  1313. buffer_strcat(wb, wall_clock_time_str);
  1314. else
  1315. buffer_print_uint64_encoded(wb, encoding, wall_clock_time);
  1316. buffer_fast_strcat(wb, "\n", 1);
  1317. parser->user.v2.stream_buffer.last_point_end_time_s = end_time;
  1318. parser->user.v2.stream_buffer.begin_v2_added = true;
  1319. }
  1320. timing_step(TIMING_STEP_BEGIN2_PROPAGATE);
  1321. // ------------------------------------------------------------------------
  1322. // store it
  1323. st->last_collected_time.tv_sec = end_time;
  1324. st->last_collected_time.tv_usec = 0;
  1325. st->last_updated.tv_sec = end_time;
  1326. st->last_updated.tv_usec = 0;
  1327. st->counter++;
  1328. st->counter_done++;
  1329. // these are only needed for db mode RAM, SAVE, MAP, ALLOC
  1330. st->db.current_entry++;
  1331. if(st->db.current_entry >= st->db.entries)
  1332. st->db.current_entry -= st->db.entries;
  1333. timing_step(TIMING_STEP_BEGIN2_STORE);
  1334. return PARSER_RC_OK;
  1335. }
  1336. static inline PARSER_RC pluginsd_set_v2(char **words, size_t num_words, PARSER *parser) {
  1337. timing_init();
  1338. char *dimension = get_word(words, num_words, 1);
  1339. char *collected_str = get_word(words, num_words, 2);
  1340. char *value_str = get_word(words, num_words, 3);
  1341. char *flags_str = get_word(words, num_words, 4);
  1342. if(unlikely(!dimension || !collected_str || !value_str || !flags_str))
  1343. return PLUGINSD_DISABLE_PLUGIN(parser, PLUGINSD_KEYWORD_SET_V2, "missing parameters");
  1344. RRDHOST *host = pluginsd_require_scope_host(parser, PLUGINSD_KEYWORD_SET_V2);
  1345. if(unlikely(!host)) return PLUGINSD_DISABLE_PLUGIN(parser, NULL, NULL);
  1346. RRDSET *st = pluginsd_require_scope_chart(parser, PLUGINSD_KEYWORD_SET_V2, PLUGINSD_KEYWORD_BEGIN_V2);
  1347. if(unlikely(!st)) return PLUGINSD_DISABLE_PLUGIN(parser, NULL, NULL);
  1348. timing_step(TIMING_STEP_SET2_PREPARE);
  1349. RRDDIM *rd = pluginsd_acquire_dimension(host, st, dimension, PLUGINSD_KEYWORD_SET_V2);
  1350. if(unlikely(!rd)) return PLUGINSD_DISABLE_PLUGIN(parser, NULL, NULL);
  1351. st->pluginsd.set = true;
  1352. if(unlikely(rrddim_flag_check(rd, RRDDIM_FLAG_OBSOLETE | RRDDIM_FLAG_ARCHIVED)))
  1353. rrddim_isnot_obsolete(st, rd);
  1354. timing_step(TIMING_STEP_SET2_LOOKUP_DIMENSION);
  1355. // ------------------------------------------------------------------------
  1356. // parse the parameters
  1357. collected_number collected_value = (collected_number) str2ll_encoded(collected_str);
  1358. NETDATA_DOUBLE value;
  1359. if(*value_str == '#')
  1360. value = (NETDATA_DOUBLE)collected_value;
  1361. else
  1362. value = str2ndd_encoded(value_str, NULL);
  1363. SN_FLAGS flags = pluginsd_parse_storage_number_flags(flags_str);
  1364. timing_step(TIMING_STEP_SET2_PARSE);
  1365. // ------------------------------------------------------------------------
  1366. // check value and ML
  1367. if (unlikely(!netdata_double_isnumber(value) || (flags == SN_EMPTY_SLOT))) {
  1368. value = NAN;
  1369. flags = SN_EMPTY_SLOT;
  1370. if(parser->user.v2.ml_locked)
  1371. ml_dimension_is_anomalous(rd, parser->user.v2.end_time, 0, false);
  1372. }
  1373. else if(parser->user.v2.ml_locked) {
  1374. if (ml_dimension_is_anomalous(rd, parser->user.v2.end_time, value, true)) {
  1375. // clear anomaly bit: 0 -> is anomalous, 1 -> not anomalous
  1376. flags &= ~((storage_number) SN_FLAG_NOT_ANOMALOUS);
  1377. }
  1378. else
  1379. flags |= SN_FLAG_NOT_ANOMALOUS;
  1380. }
  1381. timing_step(TIMING_STEP_SET2_ML);
  1382. // ------------------------------------------------------------------------
  1383. // propagate it forward in v2
  1384. if(parser->user.v2.stream_buffer.v2 && parser->user.v2.stream_buffer.begin_v2_added && parser->user.v2.stream_buffer.wb) {
  1385. // check if receiver and sender have the same number parsing capabilities
  1386. bool can_copy = stream_has_capability(&parser->user, STREAM_CAP_IEEE754) == stream_has_capability(&parser->user.v2.stream_buffer, STREAM_CAP_IEEE754);
  1387. NUMBER_ENCODING integer_encoding = stream_has_capability(&parser->user.v2.stream_buffer, STREAM_CAP_IEEE754) ? NUMBER_ENCODING_BASE64 : NUMBER_ENCODING_HEX;
  1388. NUMBER_ENCODING doubles_encoding = stream_has_capability(&parser->user.v2.stream_buffer, STREAM_CAP_IEEE754) ? NUMBER_ENCODING_BASE64 : NUMBER_ENCODING_DECIMAL;
  1389. BUFFER *wb = parser->user.v2.stream_buffer.wb;
  1390. buffer_need_bytes(wb, 1024);
  1391. buffer_fast_strcat(wb, PLUGINSD_KEYWORD_SET_V2 " '", sizeof(PLUGINSD_KEYWORD_SET_V2) - 1 + 2);
  1392. buffer_fast_strcat(wb, rrddim_id(rd), string_strlen(rd->id));
  1393. buffer_fast_strcat(wb, "' ", 2);
  1394. if(can_copy)
  1395. buffer_strcat(wb, collected_str);
  1396. else
  1397. buffer_print_int64_encoded(wb, integer_encoding, collected_value); // original v2 had hex
  1398. buffer_fast_strcat(wb, " ", 1);
  1399. if(can_copy)
  1400. buffer_strcat(wb, value_str);
  1401. else
  1402. buffer_print_netdata_double_encoded(wb, doubles_encoding, value); // original v2 had decimal
  1403. buffer_fast_strcat(wb, " ", 1);
  1404. buffer_print_sn_flags(wb, flags, true);
  1405. buffer_fast_strcat(wb, "\n", 1);
  1406. }
  1407. timing_step(TIMING_STEP_SET2_PROPAGATE);
  1408. // ------------------------------------------------------------------------
  1409. // store it
  1410. rrddim_store_metric(rd, parser->user.v2.end_time * USEC_PER_SEC, value, flags);
  1411. rd->collector.last_collected_time.tv_sec = parser->user.v2.end_time;
  1412. rd->collector.last_collected_time.tv_usec = 0;
  1413. rd->collector.last_collected_value = collected_value;
  1414. rd->collector.last_stored_value = value;
  1415. rd->collector.last_calculated_value = value;
  1416. rd->collector.counter++;
  1417. rrddim_set_updated(rd);
  1418. timing_step(TIMING_STEP_SET2_STORE);
  1419. return PARSER_RC_OK;
  1420. }
  1421. void pluginsd_cleanup_v2(PARSER *parser) {
  1422. // this is called when the thread is stopped while processing
  1423. pluginsd_clear_scope_chart(parser, "THREAD CLEANUP");
  1424. }
  1425. static inline PARSER_RC pluginsd_end_v2(char **words __maybe_unused, size_t num_words __maybe_unused, PARSER *parser) {
  1426. timing_init();
  1427. RRDHOST *host = pluginsd_require_scope_host(parser, PLUGINSD_KEYWORD_END_V2);
  1428. if(unlikely(!host)) return PLUGINSD_DISABLE_PLUGIN(parser, NULL, NULL);
  1429. RRDSET *st = pluginsd_require_scope_chart(parser, PLUGINSD_KEYWORD_END_V2, PLUGINSD_KEYWORD_BEGIN_V2);
  1430. if(unlikely(!st)) return PLUGINSD_DISABLE_PLUGIN(parser, NULL, NULL);
  1431. parser->user.data_collections_count++;
  1432. timing_step(TIMING_STEP_END2_PREPARE);
  1433. // ------------------------------------------------------------------------
  1434. // propagate the whole chart update in v1
  1435. if(unlikely(!parser->user.v2.stream_buffer.v2 && !parser->user.v2.stream_buffer.begin_v2_added && parser->user.v2.stream_buffer.wb))
  1436. rrdset_push_metrics_v1(&parser->user.v2.stream_buffer, st);
  1437. timing_step(TIMING_STEP_END2_PUSH_V1);
  1438. // ------------------------------------------------------------------------
  1439. // unblock data collection
  1440. pluginsd_unlock_previous_scope_chart(parser, PLUGINSD_KEYWORD_END_V2, false);
  1441. rrdcontext_collected_rrdset(st);
  1442. store_metric_collection_completed();
  1443. timing_step(TIMING_STEP_END2_RRDSET);
  1444. // ------------------------------------------------------------------------
  1445. // propagate it forward
  1446. rrdset_push_metrics_finished(&parser->user.v2.stream_buffer, st);
  1447. timing_step(TIMING_STEP_END2_PROPAGATE);
  1448. // ------------------------------------------------------------------------
  1449. // cleanup RRDSET / RRDDIM
  1450. RRDDIM *rd;
  1451. rrddim_foreach_read(rd, st) {
  1452. rd->collector.calculated_value = 0;
  1453. rd->collector.collected_value = 0;
  1454. rrddim_clear_updated(rd);
  1455. }
  1456. rrddim_foreach_done(rd);
  1457. // ------------------------------------------------------------------------
  1458. // reset state
  1459. parser->user.v2 = (struct parser_user_object_v2){ 0 };
  1460. timing_step(TIMING_STEP_END2_STORE);
  1461. timing_report();
  1462. return PARSER_RC_OK;
  1463. }
  1464. static inline PARSER_RC pluginsd_exit(char **words __maybe_unused, size_t num_words __maybe_unused, PARSER *parser __maybe_unused) {
  1465. netdata_log_info("PLUGINSD: plugin called EXIT.");
  1466. return PARSER_RC_STOP;
  1467. }
  1468. struct mutex_cond {
  1469. pthread_mutex_t lock;
  1470. pthread_cond_t cond;
  1471. int rc;
  1472. };
  1473. static void virt_fnc_got_data_cb(BUFFER *wb, int code, void *callback_data)
  1474. {
  1475. struct mutex_cond *ctx = callback_data;
  1476. pthread_mutex_lock(&ctx->lock);
  1477. ctx->rc = code;
  1478. pthread_cond_broadcast(&ctx->cond);
  1479. pthread_mutex_unlock(&ctx->lock);
  1480. }
  1481. #define VIRT_FNC_TIMEOUT 1
  1482. dyncfg_config_t call_virtual_function_blocking(PARSER *parser, const char *name, int *rc, const char *payload) {
  1483. usec_t now = now_realtime_usec();
  1484. BUFFER *wb = buffer_create(4096, NULL);
  1485. struct mutex_cond cond = {
  1486. .lock = PTHREAD_MUTEX_INITIALIZER,
  1487. .cond = PTHREAD_COND_INITIALIZER
  1488. };
  1489. struct inflight_function tmp = {
  1490. .started_ut = now,
  1491. .timeout_ut = now + VIRT_FNC_TIMEOUT + USEC_PER_SEC,
  1492. .destination_wb = wb,
  1493. .timeout = VIRT_FNC_TIMEOUT,
  1494. .function = string_strdupz(name),
  1495. .callback = virt_fnc_got_data_cb,
  1496. .callback_data = &cond,
  1497. .payload = payload,
  1498. };
  1499. uuid_t uuid;
  1500. uuid_generate_time(uuid);
  1501. char key[UUID_STR_LEN];
  1502. uuid_unparse_lower(uuid, key);
  1503. dictionary_write_lock(parser->inflight.functions);
  1504. // if there is any error, our dictionary callbacks will call the caller callback to notify
  1505. // the caller about the error - no need for error handling here.
  1506. dictionary_set(parser->inflight.functions, key, &tmp, sizeof(struct inflight_function));
  1507. if(!parser->inflight.smaller_timeout || tmp.timeout_ut < parser->inflight.smaller_timeout)
  1508. parser->inflight.smaller_timeout = tmp.timeout_ut;
  1509. // garbage collect stale inflight functions
  1510. if(parser->inflight.smaller_timeout < now)
  1511. inflight_functions_garbage_collect(parser, now);
  1512. dictionary_write_unlock(parser->inflight.functions);
  1513. struct timespec tp;
  1514. clock_gettime(CLOCK_REALTIME, &tp);
  1515. tp.tv_sec += (time_t)VIRT_FNC_TIMEOUT;
  1516. pthread_mutex_lock(&cond.lock);
  1517. int ret = pthread_cond_timedwait(&cond.cond, &cond.lock, &tp);
  1518. if (ret == ETIMEDOUT)
  1519. netdata_log_error("PLUGINSD: DYNCFG virtual function %s timed out", name);
  1520. pthread_mutex_unlock(&cond.lock);
  1521. dyncfg_config_t cfg;
  1522. cfg.data = strdupz(buffer_tostring(wb));
  1523. cfg.data_size = buffer_strlen(wb);
  1524. if (rc != NULL)
  1525. *rc = cond.rc;
  1526. buffer_free(wb);
  1527. return cfg;
  1528. }
  1529. static dyncfg_config_t get_plugin_config_cb(void *usr_ctx)
  1530. {
  1531. PARSER *parser = usr_ctx;
  1532. return call_virtual_function_blocking(parser, "get_plugin_config", NULL, NULL);
  1533. }
  1534. static dyncfg_config_t get_plugin_config_schema_cb(void *usr_ctx)
  1535. {
  1536. PARSER *parser = usr_ctx;
  1537. return call_virtual_function_blocking(parser, "get_plugin_config_schema", NULL, NULL);
  1538. }
  1539. static dyncfg_config_t get_module_config_cb(void *usr_ctx, const char *module_name)
  1540. {
  1541. PARSER *parser = usr_ctx;
  1542. char buf[1024];
  1543. snprintfz(buf, sizeof(buf), "get_module_config %s", module_name);
  1544. return call_virtual_function_blocking(parser, buf, NULL, NULL);
  1545. }
  1546. static dyncfg_config_t get_module_config_schema_cb(void *usr_ctx, const char *module_name)
  1547. {
  1548. PARSER *parser = usr_ctx;
  1549. char buf[1024];
  1550. snprintfz(buf, sizeof(buf), "get_module_config_schema %s", module_name);
  1551. return call_virtual_function_blocking(parser, buf, NULL, NULL);
  1552. }
  1553. static dyncfg_config_t get_job_config_schema_cb(void *usr_ctx, const char *module_name)
  1554. {
  1555. PARSER *parser = usr_ctx;
  1556. char buf[1024];
  1557. snprintfz(buf, sizeof(buf), "get_job_config_schema %s", module_name);
  1558. return call_virtual_function_blocking(parser, buf, NULL, NULL);
  1559. }
  1560. static dyncfg_config_t get_job_config_cb(void *usr_ctx, const char *module_name, const char* job_name)
  1561. {
  1562. PARSER *parser = usr_ctx;
  1563. char buf[1024];
  1564. snprintfz(buf, sizeof(buf), "get_job_config %s %s", module_name, job_name);
  1565. return call_virtual_function_blocking(parser, buf, NULL, NULL);
  1566. }
  1567. enum set_config_result set_plugin_config_cb(void *usr_ctx, dyncfg_config_t *cfg)
  1568. {
  1569. PARSER *parser = usr_ctx;
  1570. int rc;
  1571. call_virtual_function_blocking(parser, "set_plugin_config", &rc, cfg->data);
  1572. if(rc != 1)
  1573. return SET_CONFIG_REJECTED;
  1574. return SET_CONFIG_ACCEPTED;
  1575. }
  1576. enum set_config_result set_module_config_cb(void *usr_ctx, const char *module_name, dyncfg_config_t *cfg)
  1577. {
  1578. PARSER *parser = usr_ctx;
  1579. int rc;
  1580. char buf[1024];
  1581. snprintfz(buf, sizeof(buf), "set_module_config %s", module_name);
  1582. call_virtual_function_blocking(parser, buf, &rc, cfg->data);
  1583. if(rc != 1)
  1584. return SET_CONFIG_REJECTED;
  1585. return SET_CONFIG_ACCEPTED;
  1586. }
  1587. enum set_config_result set_job_config_cb(void *usr_ctx, const char *module_name, const char *job_name, dyncfg_config_t *cfg)
  1588. {
  1589. PARSER *parser = usr_ctx;
  1590. int rc;
  1591. char buf[1024];
  1592. snprintfz(buf, sizeof(buf), "set_job_config %s %s", module_name, job_name);
  1593. call_virtual_function_blocking(parser, buf, &rc, cfg->data);
  1594. if(rc != 1)
  1595. return SET_CONFIG_REJECTED;
  1596. return SET_CONFIG_ACCEPTED;
  1597. }
  1598. enum set_config_result delete_job_cb(void *usr_ctx, const char *module_name, const char *job_name)
  1599. {
  1600. PARSER *parser = usr_ctx;
  1601. int rc;
  1602. char buf[1024];
  1603. snprintfz(buf, sizeof(buf), "delete_job %s %s", module_name, job_name);
  1604. call_virtual_function_blocking(parser, buf, &rc, NULL);
  1605. if(rc != 1)
  1606. return SET_CONFIG_REJECTED;
  1607. return SET_CONFIG_ACCEPTED;
  1608. }
  1609. static inline PARSER_RC pluginsd_register_plugin(char **words __maybe_unused, size_t num_words __maybe_unused, PARSER *parser __maybe_unused) {
  1610. netdata_log_info("PLUGINSD: DYNCFG_ENABLE");
  1611. if (unlikely (num_words != 2))
  1612. return PLUGINSD_DISABLE_PLUGIN(parser, PLUGINSD_KEYWORD_DYNCFG_ENABLE, "missing name parameter");
  1613. struct configurable_plugin *cfg = callocz(1, sizeof(struct configurable_plugin));
  1614. cfg->name = strdupz(words[1]);
  1615. cfg->set_config_cb = set_plugin_config_cb;
  1616. cfg->get_config_cb = get_plugin_config_cb;
  1617. cfg->get_config_schema_cb = get_plugin_config_schema_cb;
  1618. cfg->cb_usr_ctx = parser;
  1619. parser->user.cd->cfg_dict_item = register_plugin(cfg);
  1620. if (unlikely(parser->user.cd->cfg_dict_item == NULL)) {
  1621. freez(cfg->name);
  1622. freez(cfg);
  1623. return PLUGINSD_DISABLE_PLUGIN(parser, PLUGINSD_KEYWORD_DYNCFG_ENABLE, "error registering plugin");
  1624. }
  1625. parser->user.cd->configuration = cfg;
  1626. return PARSER_RC_OK;
  1627. }
  1628. static inline PARSER_RC pluginsd_register_module(char **words __maybe_unused, size_t num_words __maybe_unused, PARSER *parser __maybe_unused) {
  1629. netdata_log_info("PLUGINSD: DYNCFG_REG_MODULE");
  1630. struct configurable_plugin *plug_cfg = parser->user.cd->configuration;
  1631. if (unlikely(plug_cfg == NULL))
  1632. return PLUGINSD_DISABLE_PLUGIN(parser, PLUGINSD_KEYWORD_DYNCFG_REGISTER_MODULE, "you have to enable dynamic configuration first using " PLUGINSD_KEYWORD_DYNCFG_ENABLE);
  1633. if (unlikely(num_words != 3))
  1634. return PLUGINSD_DISABLE_PLUGIN(parser, PLUGINSD_KEYWORD_DYNCFG_REGISTER_MODULE, "expected 2 parameters module_name followed by module_type");
  1635. struct module *mod = callocz(1, sizeof(struct module));
  1636. mod->type = str2_module_type(words[2]);
  1637. if (unlikely(mod->type == MOD_TYPE_UNKNOWN)) {
  1638. freez(mod);
  1639. return PLUGINSD_DISABLE_PLUGIN(parser, PLUGINSD_KEYWORD_DYNCFG_REGISTER_MODULE, "unknown module type (allowed: job_array, single)");
  1640. }
  1641. mod->name = strdupz(words[1]);
  1642. mod->set_config_cb = set_module_config_cb;
  1643. mod->get_config_cb = get_module_config_cb;
  1644. mod->get_config_schema_cb = get_module_config_schema_cb;
  1645. mod->config_cb_usr_ctx = parser;
  1646. mod->get_job_config_cb = get_job_config_cb;
  1647. mod->get_job_config_schema_cb = get_job_config_schema_cb;
  1648. mod->set_job_config_cb = set_job_config_cb;
  1649. mod->delete_job_cb = delete_job_cb;
  1650. mod->job_config_cb_usr_ctx = parser;
  1651. register_module(plug_cfg, mod);
  1652. return PARSER_RC_OK;
  1653. }
  1654. // job_status <module_name> <job_name> <status_code> <state> <message>
  1655. static inline PARSER_RC pluginsd_job_status(char **words, size_t num_words, PARSER *parser)
  1656. {
  1657. if (unlikely(num_words != 6 && num_words != 5))
  1658. return PLUGINSD_DISABLE_PLUGIN(parser, PLUGINSD_KEYWORD_REPORT_JOB_STATUS, "expected 4 or 5 parameters: module_name, job_name, status_code, state, [optional: message]");
  1659. int state = atoi(words[4]);
  1660. enum job_status job_status = str2job_state(words[3]);
  1661. if (unlikely(job_status == JOB_STATUS_UNKNOWN))
  1662. return PLUGINSD_DISABLE_PLUGIN(parser, PLUGINSD_KEYWORD_REPORT_JOB_STATUS, "unknown job state");
  1663. char *message = NULL;
  1664. if (num_words == 6)
  1665. message = strdupz(words[5]);
  1666. report_job_status(parser->user.cd->configuration, words[1], words[2], job_status, state, message);
  1667. return PARSER_RC_OK;
  1668. }
  1669. static inline PARSER_RC streaming_claimed_id(char **words, size_t num_words, PARSER *parser)
  1670. {
  1671. const char *host_uuid_str = get_word(words, num_words, 1);
  1672. const char *claim_id_str = get_word(words, num_words, 2);
  1673. if (!host_uuid_str || !claim_id_str) {
  1674. netdata_log_error("Command CLAIMED_ID came malformed, uuid = '%s', claim_id = '%s'",
  1675. host_uuid_str ? host_uuid_str : "[unset]",
  1676. claim_id_str ? claim_id_str : "[unset]");
  1677. return PARSER_RC_ERROR;
  1678. }
  1679. uuid_t uuid;
  1680. RRDHOST *host = parser->user.host;
  1681. // We don't need the parsed UUID
  1682. // just do it to check the format
  1683. if(uuid_parse(host_uuid_str, uuid)) {
  1684. netdata_log_error("1st parameter (host GUID) to CLAIMED_ID command is not valid GUID. Received: \"%s\".", host_uuid_str);
  1685. return PARSER_RC_ERROR;
  1686. }
  1687. if(uuid_parse(claim_id_str, uuid) && strcmp(claim_id_str, "NULL") != 0) {
  1688. netdata_log_error("2nd parameter (Claim ID) to CLAIMED_ID command is not valid GUID. Received: \"%s\".", claim_id_str);
  1689. return PARSER_RC_ERROR;
  1690. }
  1691. if(strcmp(host_uuid_str, host->machine_guid) != 0) {
  1692. netdata_log_error("Claim ID is for host \"%s\" but it came over connection for \"%s\"", host_uuid_str, host->machine_guid);
  1693. return PARSER_RC_OK; //the message is OK problem must be somewhere else
  1694. }
  1695. rrdhost_aclk_state_lock(host);
  1696. if (host->aclk_state.claimed_id)
  1697. freez(host->aclk_state.claimed_id);
  1698. host->aclk_state.claimed_id = strcmp(claim_id_str, "NULL") ? strdupz(claim_id_str) : NULL;
  1699. rrdhost_aclk_state_unlock(host);
  1700. rrdhost_flag_set(host, RRDHOST_FLAG_METADATA_CLAIMID |RRDHOST_FLAG_METADATA_UPDATE);
  1701. rrdpush_send_claimed_id(host);
  1702. return PARSER_RC_OK;
  1703. }
  1704. // ----------------------------------------------------------------------------
  1705. static inline bool buffered_reader_read(struct buffered_reader *reader, int fd) {
  1706. #ifdef NETDATA_INTERNAL_CHECKS
  1707. if(reader->read_buffer[reader->read_len] != '\0')
  1708. fatal("%s(): read_buffer does not start with zero", __FUNCTION__ );
  1709. #endif
  1710. ssize_t bytes_read = read(fd, reader->read_buffer + reader->read_len, sizeof(reader->read_buffer) - reader->read_len - 1);
  1711. if(unlikely(bytes_read <= 0))
  1712. return false;
  1713. reader->read_len += bytes_read;
  1714. reader->read_buffer[reader->read_len] = '\0';
  1715. return true;
  1716. }
  1717. static inline bool buffered_reader_read_timeout(struct buffered_reader *reader, int fd, int timeout_ms) {
  1718. errno = 0;
  1719. struct pollfd fds[1];
  1720. fds[0].fd = fd;
  1721. fds[0].events = POLLIN;
  1722. int ret = poll(fds, 1, timeout_ms);
  1723. if (ret > 0) {
  1724. /* There is data to read */
  1725. if (fds[0].revents & POLLIN)
  1726. return buffered_reader_read(reader, fd);
  1727. else if(fds[0].revents & POLLERR) {
  1728. netdata_log_error("PARSER: read failed: POLLERR.");
  1729. return false;
  1730. }
  1731. else if(fds[0].revents & POLLHUP) {
  1732. netdata_log_error("PARSER: read failed: POLLHUP.");
  1733. return false;
  1734. }
  1735. else if(fds[0].revents & POLLNVAL) {
  1736. netdata_log_error("PARSER: read failed: POLLNVAL.");
  1737. return false;
  1738. }
  1739. netdata_log_error("PARSER: poll() returned positive number, but POLLIN|POLLERR|POLLHUP|POLLNVAL are not set.");
  1740. return false;
  1741. }
  1742. else if (ret == 0) {
  1743. netdata_log_error("PARSER: timeout while waiting for data.");
  1744. return false;
  1745. }
  1746. netdata_log_error("PARSER: poll() failed with code %d.", ret);
  1747. return false;
  1748. }
  1749. void pluginsd_process_thread_cleanup(void *ptr) {
  1750. PARSER *parser = (PARSER *)ptr;
  1751. pluginsd_cleanup_v2(parser);
  1752. pluginsd_host_define_cleanup(parser);
  1753. rrd_collector_finished();
  1754. parser_destroy(parser);
  1755. }
  1756. inline size_t pluginsd_process(RRDHOST *host, struct plugind *cd, FILE *fp_plugin_input, FILE *fp_plugin_output, int trust_durations)
  1757. {
  1758. int enabled = cd->unsafe.enabled;
  1759. if (!fp_plugin_input || !fp_plugin_output || !enabled) {
  1760. cd->unsafe.enabled = 0;
  1761. return 0;
  1762. }
  1763. if (unlikely(fileno(fp_plugin_input) == -1)) {
  1764. netdata_log_error("input file descriptor given is not a valid stream");
  1765. cd->serial_failures++;
  1766. return 0;
  1767. }
  1768. if (unlikely(fileno(fp_plugin_output) == -1)) {
  1769. netdata_log_error("output file descriptor given is not a valid stream");
  1770. cd->serial_failures++;
  1771. return 0;
  1772. }
  1773. clearerr(fp_plugin_input);
  1774. clearerr(fp_plugin_output);
  1775. PARSER *parser;
  1776. {
  1777. PARSER_USER_OBJECT user = {
  1778. .enabled = cd->unsafe.enabled,
  1779. .host = host,
  1780. .cd = cd,
  1781. .trust_durations = trust_durations
  1782. };
  1783. // fp_plugin_output = our input; fp_plugin_input = our output
  1784. parser = parser_init(&user, fp_plugin_output, fp_plugin_input, -1, PARSER_INPUT_SPLIT, NULL);
  1785. }
  1786. pluginsd_keywords_init(parser, PARSER_INIT_PLUGINSD);
  1787. rrd_collector_started();
  1788. size_t count = 0;
  1789. // this keeps the parser with its current value
  1790. // so, parser needs to be allocated before pushing it
  1791. netdata_thread_cleanup_push(pluginsd_process_thread_cleanup, parser);
  1792. buffered_reader_init(&parser->reader);
  1793. char buffer[PLUGINSD_LINE_MAX + 2];
  1794. while(likely(service_running(SERVICE_COLLECTORS))) {
  1795. if (unlikely(!buffered_reader_next_line(&parser->reader, buffer, PLUGINSD_LINE_MAX + 2))) {
  1796. if(unlikely(!buffered_reader_read_timeout(&parser->reader, fileno((FILE *)parser->fp_input), 2 * 60 * MSEC_PER_SEC)))
  1797. break;
  1798. }
  1799. else if(unlikely(parser_action(parser, buffer)))
  1800. break;
  1801. }
  1802. cd->unsafe.enabled = parser->user.enabled;
  1803. count = parser->user.data_collections_count;
  1804. if (likely(count)) {
  1805. cd->successful_collections += count;
  1806. cd->serial_failures = 0;
  1807. }
  1808. else
  1809. cd->serial_failures++;
  1810. // free parser with the pop function
  1811. netdata_thread_cleanup_pop(1);
  1812. return count;
  1813. }
  1814. void pluginsd_keywords_init(PARSER *parser, PARSER_REPERTOIRE repertoire) {
  1815. parser_init_repertoire(parser, repertoire);
  1816. if (repertoire & (PARSER_INIT_PLUGINSD | PARSER_INIT_STREAMING))
  1817. inflight_functions_init(parser);
  1818. }
  1819. PARSER *parser_init(struct parser_user_object *user, FILE *fp_input, FILE *fp_output, int fd,
  1820. PARSER_INPUT_TYPE flags, void *ssl __maybe_unused) {
  1821. PARSER *parser;
  1822. parser = callocz(1, sizeof(*parser));
  1823. if(user)
  1824. parser->user = *user;
  1825. parser->fd = fd;
  1826. parser->fp_input = fp_input;
  1827. parser->fp_output = fp_output;
  1828. #ifdef ENABLE_HTTPS
  1829. parser->ssl_output = ssl;
  1830. #endif
  1831. parser->flags = flags;
  1832. spinlock_init(&parser->writer.spinlock);
  1833. return parser;
  1834. }
  1835. PARSER_RC parser_execute(PARSER *parser, PARSER_KEYWORD *keyword, char **words, size_t num_words) {
  1836. switch(keyword->id) {
  1837. case 1:
  1838. return pluginsd_set_v2(words, num_words, parser);
  1839. case 2:
  1840. return pluginsd_begin_v2(words, num_words, parser);
  1841. case 3:
  1842. return pluginsd_end_v2(words, num_words, parser);
  1843. case 11:
  1844. return pluginsd_set(words, num_words, parser);
  1845. case 12:
  1846. return pluginsd_begin(words, num_words, parser);
  1847. case 13:
  1848. return pluginsd_end(words, num_words, parser);
  1849. case 21:
  1850. return pluginsd_replay_set(words, num_words, parser);
  1851. case 22:
  1852. return pluginsd_replay_begin(words, num_words, parser);
  1853. case 23:
  1854. return pluginsd_replay_rrddim_collection_state(words, num_words, parser);
  1855. case 24:
  1856. return pluginsd_replay_rrdset_collection_state(words, num_words, parser);
  1857. case 25:
  1858. return pluginsd_replay_end(words, num_words, parser);
  1859. case 31:
  1860. return pluginsd_dimension(words, num_words, parser);
  1861. case 32:
  1862. return pluginsd_chart(words, num_words, parser);
  1863. case 33:
  1864. return pluginsd_chart_definition_end(words, num_words, parser);
  1865. case 34:
  1866. return pluginsd_clabel(words, num_words, parser);
  1867. case 35:
  1868. return pluginsd_clabel_commit(words, num_words, parser);
  1869. case 41:
  1870. return pluginsd_function(words, num_words, parser);
  1871. case 42:
  1872. return pluginsd_function_result_begin(words, num_words, parser);
  1873. case 51:
  1874. return pluginsd_label(words, num_words, parser);
  1875. case 52:
  1876. return pluginsd_overwrite(words, num_words, parser);
  1877. case 53:
  1878. return pluginsd_variable(words, num_words, parser);
  1879. case 61:
  1880. return streaming_claimed_id(words, num_words, parser);
  1881. case 71:
  1882. return pluginsd_host(words, num_words, parser);
  1883. case 72:
  1884. return pluginsd_host_define(words, num_words, parser);
  1885. case 73:
  1886. return pluginsd_host_define_end(words, num_words, parser);
  1887. case 74:
  1888. return pluginsd_host_labels(words, num_words, parser);
  1889. case 97:
  1890. return pluginsd_flush(words, num_words, parser);
  1891. case 98:
  1892. return pluginsd_disable(words, num_words, parser);
  1893. case 99:
  1894. return pluginsd_exit(words, num_words, parser);
  1895. case 101:
  1896. return pluginsd_register_plugin(words, num_words, parser);
  1897. case 102:
  1898. return pluginsd_register_module(words, num_words, parser);
  1899. default:
  1900. fatal("Unknown keyword '%s' with id %zu", keyword->keyword, keyword->id);
  1901. }
  1902. }
  1903. #include "gperf-hashtable.h"
  1904. void parser_init_repertoire(PARSER *parser, PARSER_REPERTOIRE repertoire) {
  1905. parser->repertoire = repertoire;
  1906. for(size_t i = GPERF_PARSER_MIN_HASH_VALUE ; i <= GPERF_PARSER_MAX_HASH_VALUE ;i++) {
  1907. if(gperf_keywords[i].keyword && *gperf_keywords[i].keyword && (parser->repertoire & gperf_keywords[i].repertoire))
  1908. worker_register_job_name(gperf_keywords[i].worker_job_id, gperf_keywords[i].keyword);
  1909. }
  1910. }
  1911. void parser_destroy(PARSER *parser) {
  1912. if (unlikely(!parser))
  1913. return;
  1914. if (parser->user.cd != NULL && parser->user.cd->configuration != NULL) {
  1915. unregister_plugin(parser->user.cd->cfg_dict_item);
  1916. parser->user.cd->configuration = NULL;
  1917. }
  1918. dictionary_destroy(parser->inflight.functions);
  1919. freez(parser);
  1920. }
  1921. int pluginsd_parser_unittest(void) {
  1922. PARSER *p = parser_init(NULL, NULL, NULL, -1, PARSER_INPUT_SPLIT, NULL);
  1923. pluginsd_keywords_init(p, PARSER_INIT_PLUGINSD | PARSER_INIT_STREAMING);
  1924. char *lines[] = {
  1925. "BEGIN2 abcdefghijklmnopqr 123",
  1926. "SET2 abcdefg 0x12345678 0 0",
  1927. "SET2 hijklmnoqr 0x12345678 0 0",
  1928. "SET2 stuvwxyz 0x12345678 0 0",
  1929. "END2",
  1930. NULL,
  1931. };
  1932. char *words[PLUGINSD_MAX_WORDS];
  1933. size_t iterations = 1000000;
  1934. size_t count = 0;
  1935. char input[PLUGINSD_LINE_MAX + 1];
  1936. usec_t started = now_realtime_usec();
  1937. while(--iterations) {
  1938. for(size_t line = 0; lines[line] ;line++) {
  1939. strncpyz(input, lines[line], PLUGINSD_LINE_MAX);
  1940. size_t num_words = quoted_strings_splitter_pluginsd(input, words, PLUGINSD_MAX_WORDS);
  1941. const char *command = get_word(words, num_words, 0);
  1942. PARSER_KEYWORD *keyword = parser_find_keyword(p, command);
  1943. if(unlikely(!keyword))
  1944. fatal("Cannot parse the line '%s'", lines[line]);
  1945. count++;
  1946. }
  1947. }
  1948. usec_t ended = now_realtime_usec();
  1949. netdata_log_info("Parsed %zu lines in %0.2f secs, %0.2f klines/sec", count,
  1950. (double)(ended - started) / (double)USEC_PER_SEC,
  1951. (double)count / ((double)(ended - started) / (double)USEC_PER_SEC) / 1000.0);
  1952. parser_destroy(p);
  1953. return 0;
  1954. }