pluginsd_parser.c 110 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737738739740741742743744745746747748749750751752753754755756757758759760761762763764765766767768769770771772773774775776777778779780781782783784785786787788789790791792793794795796797798799800801802803804805806807808809810811812813814815816817818819820821822823824825826827828829830831832833834835836837838839840841842843844845846847848849850851852853854855856857858859860861862863864865866867868869870871872873874875876877878879880881882883884885886887888889890891892893894895896897898899900901902903904905906907908909910911912913914915916917918919920921922923924925926927928929930931932933934935936937938939940941942943944945946947948949950951952953954955956957958959960961962963964965966967968969970971972973974975976977978979980981982983984985986987988989990991992993994995996997998999100010011002100310041005100610071008100910101011101210131014101510161017101810191020102110221023102410251026102710281029103010311032103310341035103610371038103910401041104210431044104510461047104810491050105110521053105410551056105710581059106010611062106310641065106610671068106910701071107210731074107510761077107810791080108110821083108410851086108710881089109010911092109310941095109610971098109911001101110211031104110511061107110811091110111111121113111411151116111711181119112011211122112311241125112611271128112911301131113211331134113511361137113811391140114111421143114411451146114711481149115011511152115311541155115611571158115911601161116211631164116511661167116811691170117111721173117411751176117711781179118011811182118311841185118611871188118911901191119211931194119511961197119811991200120112021203120412051206120712081209121012111212121312141215121612171218121912201221122212231224122512261227122812291230123112321233123412351236123712381239124012411242124312441245124612471248124912501251125212531254125512561257125812591260126112621263126412651266126712681269127012711272127312741275127612771278127912801281128212831284128512861287128812891290129112921293129412951296129712981299130013011302130313041305130613071308130913101311131213131314131513161317131813191320132113221323132413251326132713281329133013311332133313341335133613371338133913401341134213431344134513461347134813491350135113521353135413551356135713581359136013611362136313641365136613671368136913701371137213731374137513761377137813791380138113821383138413851386138713881389139013911392139313941395139613971398139914001401140214031404140514061407140814091410141114121413141414151416141714181419142014211422142314241425142614271428142914301431143214331434143514361437143814391440144114421443144414451446144714481449145014511452145314541455145614571458145914601461146214631464146514661467146814691470147114721473147414751476147714781479148014811482148314841485148614871488148914901491149214931494149514961497149814991500150115021503150415051506150715081509151015111512151315141515151615171518151915201521152215231524152515261527152815291530153115321533153415351536153715381539154015411542154315441545154615471548154915501551155215531554155515561557155815591560156115621563156415651566156715681569157015711572157315741575157615771578157915801581158215831584158515861587158815891590159115921593159415951596159715981599160016011602160316041605160616071608160916101611161216131614161516161617161816191620162116221623162416251626162716281629163016311632163316341635163616371638163916401641164216431644164516461647164816491650165116521653165416551656165716581659166016611662166316641665166616671668166916701671167216731674167516761677167816791680168116821683168416851686168716881689169016911692169316941695169616971698169917001701170217031704170517061707170817091710171117121713171417151716171717181719172017211722172317241725172617271728172917301731173217331734173517361737173817391740174117421743174417451746174717481749175017511752175317541755175617571758175917601761176217631764176517661767176817691770177117721773177417751776177717781779178017811782178317841785178617871788178917901791179217931794179517961797179817991800180118021803180418051806180718081809181018111812181318141815181618171818181918201821182218231824182518261827182818291830183118321833183418351836183718381839184018411842184318441845184618471848184918501851185218531854185518561857185818591860186118621863186418651866186718681869187018711872187318741875187618771878187918801881188218831884188518861887188818891890189118921893189418951896189718981899190019011902190319041905190619071908190919101911191219131914191519161917191819191920192119221923192419251926192719281929193019311932193319341935193619371938193919401941194219431944194519461947194819491950195119521953195419551956195719581959196019611962196319641965196619671968196919701971197219731974197519761977197819791980198119821983198419851986198719881989199019911992199319941995199619971998199920002001200220032004200520062007200820092010201120122013201420152016201720182019202020212022202320242025202620272028202920302031203220332034203520362037203820392040204120422043204420452046204720482049205020512052205320542055205620572058205920602061206220632064206520662067206820692070207120722073207420752076207720782079208020812082208320842085208620872088208920902091209220932094209520962097209820992100210121022103210421052106210721082109211021112112211321142115211621172118211921202121212221232124212521262127212821292130213121322133213421352136213721382139214021412142214321442145214621472148214921502151215221532154215521562157215821592160216121622163216421652166216721682169217021712172217321742175217621772178217921802181218221832184218521862187218821892190219121922193219421952196219721982199220022012202220322042205220622072208220922102211221222132214221522162217221822192220222122222223222422252226222722282229223022312232223322342235223622372238223922402241224222432244224522462247224822492250225122522253225422552256225722582259226022612262226322642265226622672268226922702271227222732274227522762277227822792280228122822283228422852286228722882289229022912292229322942295229622972298229923002301230223032304230523062307230823092310231123122313231423152316231723182319232023212322232323242325232623272328232923302331233223332334233523362337233823392340234123422343234423452346234723482349235023512352235323542355235623572358235923602361236223632364236523662367236823692370237123722373237423752376237723782379238023812382238323842385238623872388238923902391239223932394239523962397239823992400240124022403240424052406240724082409241024112412241324142415241624172418241924202421242224232424242524262427242824292430243124322433243424352436243724382439244024412442244324442445244624472448244924502451245224532454245524562457245824592460246124622463246424652466246724682469247024712472247324742475247624772478247924802481248224832484248524862487248824892490249124922493249424952496249724982499250025012502250325042505250625072508250925102511251225132514251525162517251825192520252125222523252425252526252725282529253025312532253325342535253625372538253925402541254225432544254525462547254825492550255125522553255425552556255725582559256025612562256325642565256625672568256925702571257225732574257525762577257825792580258125822583258425852586258725882589259025912592259325942595259625972598259926002601260226032604260526062607260826092610261126122613261426152616261726182619262026212622262326242625262626272628262926302631263226332634263526362637263826392640264126422643264426452646264726482649265026512652265326542655265626572658265926602661266226632664266526662667266826692670267126722673267426752676267726782679268026812682268326842685268626872688268926902691269226932694269526962697269826992700270127022703270427052706270727082709271027112712271327142715271627172718271927202721272227232724272527262727272827292730273127322733273427352736273727382739274027412742274327442745274627472748274927502751275227532754275527562757275827592760276127622763276427652766276727682769277027712772277327742775277627772778277927802781278227832784278527862787278827892790279127922793279427952796279727982799280028012802280328042805280628072808280928102811281228132814281528162817281828192820282128222823282428252826282728282829283028312832283328342835283628372838283928402841284228432844284528462847284828492850285128522853285428552856285728582859286028612862286328642865286628672868286928702871287228732874287528762877287828792880288128822883288428852886288728882889289028912892289328942895289628972898289929002901290229032904290529062907290829092910291129122913291429152916291729182919292029212922292329242925292629272928292929302931293229332934293529362937
  1. // SPDX-License-Identifier: GPL-3.0-or-later
  2. #include "pluginsd_parser.h"
  3. #define LOG_FUNCTIONS false
  4. #define SERVING_STREAMING(parser) (parser->repertoire == PARSER_INIT_STREAMING)
  5. #define SERVING_PLUGINSD(parser) (parser->repertoire == PARSER_INIT_PLUGINSD)
  6. static ssize_t send_to_plugin(const char *txt, void *data) {
  7. PARSER *parser = data;
  8. if(!txt || !*txt)
  9. return 0;
  10. errno = 0;
  11. spinlock_lock(&parser->writer.spinlock);
  12. ssize_t bytes = -1;
  13. #ifdef ENABLE_HTTPS
  14. NETDATA_SSL *ssl = parser->ssl_output;
  15. if(ssl) {
  16. if(SSL_connection(ssl))
  17. bytes = netdata_ssl_write(ssl, (void *) txt, strlen(txt));
  18. else
  19. netdata_log_error("PLUGINSD: cannot send command (SSL)");
  20. spinlock_unlock(&parser->writer.spinlock);
  21. return bytes;
  22. }
  23. #endif
  24. if(parser->fp_output) {
  25. bytes = fprintf(parser->fp_output, "%s", txt);
  26. if(bytes <= 0) {
  27. netdata_log_error("PLUGINSD: cannot send command (FILE)");
  28. bytes = -2;
  29. }
  30. else
  31. fflush(parser->fp_output);
  32. spinlock_unlock(&parser->writer.spinlock);
  33. return bytes;
  34. }
  35. if(parser->fd != -1) {
  36. bytes = 0;
  37. ssize_t total = (ssize_t)strlen(txt);
  38. ssize_t sent;
  39. do {
  40. sent = write(parser->fd, &txt[bytes], total - bytes);
  41. if(sent <= 0) {
  42. netdata_log_error("PLUGINSD: cannot send command (fd)");
  43. spinlock_unlock(&parser->writer.spinlock);
  44. return -3;
  45. }
  46. bytes += sent;
  47. }
  48. while(bytes < total);
  49. spinlock_unlock(&parser->writer.spinlock);
  50. return (int)bytes;
  51. }
  52. spinlock_unlock(&parser->writer.spinlock);
  53. netdata_log_error("PLUGINSD: cannot send command (no output socket/pipe/file given to plugins.d parser)");
  54. return -4;
  55. }
  56. static inline RRDHOST *pluginsd_require_scope_host(PARSER *parser, const char *cmd) {
  57. RRDHOST *host = parser->user.host;
  58. if(unlikely(!host))
  59. netdata_log_error("PLUGINSD: command %s requires a host, but is not set.", cmd);
  60. return host;
  61. }
  62. static inline RRDSET *pluginsd_require_scope_chart(PARSER *parser, const char *cmd, const char *parent_cmd) {
  63. RRDSET *st = parser->user.st;
  64. if(unlikely(!st))
  65. netdata_log_error("PLUGINSD: command %s requires a chart defined via command %s, but is not set.", cmd, parent_cmd);
  66. return st;
  67. }
  68. static inline RRDSET *pluginsd_get_scope_chart(PARSER *parser) {
  69. return parser->user.st;
  70. }
  71. static inline void pluginsd_lock_rrdset_data_collection(PARSER *parser) {
  72. if(parser->user.st && !parser->user.v2.locked_data_collection) {
  73. spinlock_lock(&parser->user.st->data_collection_lock);
  74. parser->user.v2.locked_data_collection = true;
  75. }
  76. }
  77. static inline bool pluginsd_unlock_rrdset_data_collection(PARSER *parser) {
  78. if(parser->user.st && parser->user.v2.locked_data_collection) {
  79. spinlock_unlock(&parser->user.st->data_collection_lock);
  80. parser->user.v2.locked_data_collection = false;
  81. return true;
  82. }
  83. return false;
  84. }
  85. void pluginsd_rrdset_cleanup(RRDSET *st) {
  86. spinlock_lock(&st->pluginsd.spinlock);
  87. for(size_t i = 0; i < st->pluginsd.size ; i++) {
  88. rrddim_acquired_release(st->pluginsd.rda[i]); // can be NULL
  89. st->pluginsd.rda[i] = NULL;
  90. }
  91. freez(st->pluginsd.rda);
  92. st->pluginsd.collector_tid = 0;
  93. st->pluginsd.rda = NULL;
  94. st->pluginsd.size = 0;
  95. st->pluginsd.pos = 0;
  96. spinlock_unlock(&st->pluginsd.spinlock);
  97. }
  98. static inline void pluginsd_unlock_previous_scope_chart(PARSER *parser, const char *keyword, bool stale) {
  99. if(unlikely(pluginsd_unlock_rrdset_data_collection(parser))) {
  100. if(stale)
  101. netdata_log_error("PLUGINSD: 'host:%s/chart:%s/' stale data collection lock found during %s; it has been unlocked",
  102. rrdhost_hostname(parser->user.st->rrdhost),
  103. rrdset_id(parser->user.st),
  104. keyword);
  105. }
  106. if(unlikely(parser->user.v2.ml_locked)) {
  107. ml_chart_update_end(parser->user.st);
  108. parser->user.v2.ml_locked = false;
  109. if(stale)
  110. netdata_log_error("PLUGINSD: 'host:%s/chart:%s/' stale ML lock found during %s, it has been unlocked",
  111. rrdhost_hostname(parser->user.st->rrdhost),
  112. rrdset_id(parser->user.st),
  113. keyword);
  114. }
  115. }
  116. static inline void pluginsd_clear_scope_chart(PARSER *parser, const char *keyword) {
  117. pluginsd_unlock_previous_scope_chart(parser, keyword, true);
  118. parser->user.st = NULL;
  119. }
  120. static inline bool pluginsd_set_scope_chart(PARSER *parser, RRDSET *st, const char *keyword) {
  121. RRDSET *old_st = parser->user.st;
  122. pid_t old_collector_tid = (old_st) ? old_st->pluginsd.collector_tid : 0;
  123. pid_t my_collector_tid = gettid();
  124. if(unlikely(old_collector_tid)) {
  125. if(old_collector_tid != my_collector_tid) {
  126. error_limit_static_global_var(erl, 1, 0);
  127. error_limit(&erl, "PLUGINSD: keyword %s: 'host:%s/chart:%s' is collected twice (my tid %d, other collector tid %d)",
  128. keyword ? keyword : "UNKNOWN",
  129. rrdhost_hostname(st->rrdhost), rrdset_id(st),
  130. my_collector_tid, old_collector_tid);
  131. return false;
  132. }
  133. old_st->pluginsd.collector_tid = 0;
  134. }
  135. st->pluginsd.collector_tid = my_collector_tid;
  136. pluginsd_clear_scope_chart(parser, keyword);
  137. size_t dims = dictionary_entries(st->rrddim_root_index);
  138. if(unlikely(st->pluginsd.size < dims)) {
  139. st->pluginsd.rda = reallocz(st->pluginsd.rda, dims * sizeof(RRDDIM_ACQUIRED *));
  140. // initialize the empty slots
  141. for(ssize_t i = (ssize_t)dims - 1; i >= (ssize_t)st->pluginsd.size ;i--)
  142. st->pluginsd.rda[i] = NULL;
  143. st->pluginsd.size = dims;
  144. }
  145. st->pluginsd.pos = 0;
  146. parser->user.st = st;
  147. return true;
  148. }
  149. static inline RRDDIM *pluginsd_acquire_dimension(RRDHOST *host, RRDSET *st, const char *dimension, const char *cmd) {
  150. if (unlikely(!dimension || !*dimension)) {
  151. netdata_log_error("PLUGINSD: 'host:%s/chart:%s' got a %s, without a dimension.",
  152. rrdhost_hostname(host), rrdset_id(st), cmd);
  153. return NULL;
  154. }
  155. if(unlikely(st->pluginsd.pos >= st->pluginsd.size))
  156. st->pluginsd.pos = 0;
  157. RRDDIM_ACQUIRED *rda = st->pluginsd.rda[st->pluginsd.pos];
  158. if(likely(rda)) {
  159. RRDDIM *rd = rrddim_acquired_to_rrddim(rda);
  160. if (likely(rd && string_strcmp(rd->id, dimension) == 0)) {
  161. // we found a cached RDA
  162. st->pluginsd.pos++;
  163. return rd;
  164. }
  165. else {
  166. // the collector is sending dimensions in a different order
  167. // release the previous one, to reuse this slot
  168. rrddim_acquired_release(rda);
  169. st->pluginsd.rda[st->pluginsd.pos] = NULL;
  170. }
  171. }
  172. rda = rrddim_find_and_acquire(st, dimension);
  173. if (unlikely(!rda)) {
  174. netdata_log_error("PLUGINSD: 'host:%s/chart:%s/dim:%s' got a %s but dimension does not exist.",
  175. rrdhost_hostname(host), rrdset_id(st), dimension, cmd);
  176. return NULL;
  177. }
  178. st->pluginsd.rda[st->pluginsd.pos++] = rda;
  179. return rrddim_acquired_to_rrddim(rda);
  180. }
  181. static inline RRDSET *pluginsd_find_chart(RRDHOST *host, const char *chart, const char *cmd) {
  182. if (unlikely(!chart || !*chart)) {
  183. netdata_log_error("PLUGINSD: 'host:%s' got a %s without a chart id.",
  184. rrdhost_hostname(host), cmd);
  185. return NULL;
  186. }
  187. RRDSET *st = rrdset_find(host, chart);
  188. if (unlikely(!st))
  189. netdata_log_error("PLUGINSD: 'host:%s/chart:%s' got a %s but chart does not exist.",
  190. rrdhost_hostname(host), chart, cmd);
  191. return st;
  192. }
  193. static inline PARSER_RC PLUGINSD_DISABLE_PLUGIN(PARSER *parser, const char *keyword, const char *msg) {
  194. parser->user.enabled = 0;
  195. if(keyword && msg) {
  196. error_limit_static_global_var(erl, 1, 0);
  197. error_limit(&erl, "PLUGINSD: keyword %s: %s", keyword, msg);
  198. }
  199. return PARSER_RC_ERROR;
  200. }
  201. static inline PARSER_RC pluginsd_set(char **words, size_t num_words, PARSER *parser) {
  202. char *dimension = get_word(words, num_words, 1);
  203. char *value = get_word(words, num_words, 2);
  204. RRDHOST *host = pluginsd_require_scope_host(parser, PLUGINSD_KEYWORD_SET);
  205. if(!host) return PLUGINSD_DISABLE_PLUGIN(parser, NULL, NULL);
  206. RRDSET *st = pluginsd_require_scope_chart(parser, PLUGINSD_KEYWORD_SET, PLUGINSD_KEYWORD_CHART);
  207. if(!st) return PLUGINSD_DISABLE_PLUGIN(parser, NULL, NULL);
  208. RRDDIM *rd = pluginsd_acquire_dimension(host, st, dimension, PLUGINSD_KEYWORD_SET);
  209. if(!rd) return PLUGINSD_DISABLE_PLUGIN(parser, NULL, NULL);
  210. st->pluginsd.set = true;
  211. if (unlikely(rrdset_flag_check(st, RRDSET_FLAG_DEBUG)))
  212. netdata_log_debug(D_PLUGINSD, "PLUGINSD: 'host:%s/chart:%s/dim:%s' SET is setting value to '%s'",
  213. rrdhost_hostname(host), rrdset_id(st), dimension, value && *value ? value : "UNSET");
  214. if (value && *value)
  215. rrddim_set_by_pointer(st, rd, str2ll_encoded(value));
  216. return PARSER_RC_OK;
  217. }
  218. static inline PARSER_RC pluginsd_begin(char **words, size_t num_words, PARSER *parser) {
  219. char *id = get_word(words, num_words, 1);
  220. char *microseconds_txt = get_word(words, num_words, 2);
  221. RRDHOST *host = pluginsd_require_scope_host(parser, PLUGINSD_KEYWORD_BEGIN);
  222. if(!host) return PLUGINSD_DISABLE_PLUGIN(parser, NULL, NULL);
  223. RRDSET *st = pluginsd_find_chart(host, id, PLUGINSD_KEYWORD_BEGIN);
  224. if(!st) return PLUGINSD_DISABLE_PLUGIN(parser, NULL, NULL);
  225. if(!pluginsd_set_scope_chart(parser, st, PLUGINSD_KEYWORD_BEGIN))
  226. return PLUGINSD_DISABLE_PLUGIN(parser, NULL, NULL);
  227. usec_t microseconds = 0;
  228. if (microseconds_txt && *microseconds_txt) {
  229. long long t = str2ll(microseconds_txt, NULL);
  230. if(t >= 0)
  231. microseconds = t;
  232. }
  233. #ifdef NETDATA_LOG_REPLICATION_REQUESTS
  234. if(st->replay.log_next_data_collection) {
  235. st->replay.log_next_data_collection = false;
  236. internal_error(true,
  237. "REPLAY: 'host:%s/chart:%s' first BEGIN after replication, last collected %llu, last updated %llu, microseconds %llu",
  238. rrdhost_hostname(host), rrdset_id(st),
  239. st->last_collected_time.tv_sec * USEC_PER_SEC + st->last_collected_time.tv_usec,
  240. st->last_updated.tv_sec * USEC_PER_SEC + st->last_updated.tv_usec,
  241. microseconds
  242. );
  243. }
  244. #endif
  245. if (likely(st->counter_done)) {
  246. if (likely(microseconds)) {
  247. if (parser->user.trust_durations)
  248. rrdset_next_usec_unfiltered(st, microseconds);
  249. else
  250. rrdset_next_usec(st, microseconds);
  251. }
  252. else
  253. rrdset_next(st);
  254. }
  255. return PARSER_RC_OK;
  256. }
  257. static inline PARSER_RC pluginsd_end(char **words, size_t num_words, PARSER *parser) {
  258. UNUSED(words);
  259. UNUSED(num_words);
  260. RRDHOST *host = pluginsd_require_scope_host(parser, PLUGINSD_KEYWORD_END);
  261. if(!host) return PLUGINSD_DISABLE_PLUGIN(parser, NULL, NULL);
  262. RRDSET *st = pluginsd_require_scope_chart(parser, PLUGINSD_KEYWORD_END, PLUGINSD_KEYWORD_BEGIN);
  263. if(!st) return PLUGINSD_DISABLE_PLUGIN(parser, NULL, NULL);
  264. if (unlikely(rrdset_flag_check(st, RRDSET_FLAG_DEBUG)))
  265. netdata_log_debug(D_PLUGINSD, "requested an END on chart '%s'", rrdset_id(st));
  266. pluginsd_clear_scope_chart(parser, PLUGINSD_KEYWORD_END);
  267. parser->user.data_collections_count++;
  268. struct timeval now;
  269. now_realtime_timeval(&now);
  270. rrdset_timed_done(st, now, /* pending_rrdset_next = */ false);
  271. return PARSER_RC_OK;
  272. }
  273. static void pluginsd_host_define_cleanup(PARSER *parser) {
  274. string_freez(parser->user.host_define.hostname);
  275. rrdlabels_destroy(parser->user.host_define.rrdlabels);
  276. parser->user.host_define.hostname = NULL;
  277. parser->user.host_define.rrdlabels = NULL;
  278. parser->user.host_define.parsing_host = false;
  279. }
  280. static inline bool pluginsd_validate_machine_guid(const char *guid, uuid_t *uuid, char *output) {
  281. if(uuid_parse(guid, *uuid))
  282. return false;
  283. uuid_unparse_lower(*uuid, output);
  284. return true;
  285. }
  286. static inline PARSER_RC pluginsd_host_define(char **words, size_t num_words, PARSER *parser) {
  287. char *guid = get_word(words, num_words, 1);
  288. char *hostname = get_word(words, num_words, 2);
  289. if(unlikely(!guid || !*guid || !hostname || !*hostname))
  290. return PLUGINSD_DISABLE_PLUGIN(parser, PLUGINSD_KEYWORD_HOST_DEFINE, "missing parameters");
  291. if(unlikely(parser->user.host_define.parsing_host))
  292. return PLUGINSD_DISABLE_PLUGIN(parser, PLUGINSD_KEYWORD_HOST_DEFINE,
  293. "another host definition is already open - did you send " PLUGINSD_KEYWORD_HOST_DEFINE_END "?");
  294. if(!pluginsd_validate_machine_guid(guid, &parser->user.host_define.machine_guid, parser->user.host_define.machine_guid_str))
  295. return PLUGINSD_DISABLE_PLUGIN(parser, PLUGINSD_KEYWORD_HOST_DEFINE, "cannot parse MACHINE_GUID - is it a valid UUID?");
  296. parser->user.host_define.hostname = string_strdupz(hostname);
  297. parser->user.host_define.rrdlabels = rrdlabels_create();
  298. parser->user.host_define.parsing_host = true;
  299. return PARSER_RC_OK;
  300. }
  301. static inline PARSER_RC pluginsd_host_dictionary(char **words, size_t num_words, PARSER *parser, RRDLABELS *labels, const char *keyword) {
  302. char *name = get_word(words, num_words, 1);
  303. char *value = get_word(words, num_words, 2);
  304. if(!name || !*name || !value)
  305. return PLUGINSD_DISABLE_PLUGIN(parser, keyword, "missing parameters");
  306. if(!parser->user.host_define.parsing_host || !labels)
  307. return PLUGINSD_DISABLE_PLUGIN(parser, keyword, "host is not defined, send " PLUGINSD_KEYWORD_HOST_DEFINE " before this");
  308. rrdlabels_add(labels, name, value, RRDLABEL_SRC_CONFIG);
  309. return PARSER_RC_OK;
  310. }
  311. static inline PARSER_RC pluginsd_host_labels(char **words, size_t num_words, PARSER *parser) {
  312. return pluginsd_host_dictionary(words, num_words, parser,
  313. parser->user.host_define.rrdlabels,
  314. PLUGINSD_KEYWORD_HOST_LABEL);
  315. }
  316. static inline PARSER_RC pluginsd_host_define_end(char **words __maybe_unused, size_t num_words __maybe_unused, PARSER *parser) {
  317. if(!parser->user.host_define.parsing_host)
  318. return PLUGINSD_DISABLE_PLUGIN(parser, PLUGINSD_KEYWORD_HOST_DEFINE_END, "missing initialization, send " PLUGINSD_KEYWORD_HOST_DEFINE " before this");
  319. RRDHOST *host = rrdhost_find_or_create(
  320. string2str(parser->user.host_define.hostname),
  321. string2str(parser->user.host_define.hostname),
  322. parser->user.host_define.machine_guid_str,
  323. "Netdata Virtual Host 1.0",
  324. netdata_configured_timezone,
  325. netdata_configured_abbrev_timezone,
  326. netdata_configured_utc_offset,
  327. NULL,
  328. program_name,
  329. program_version,
  330. default_rrd_update_every,
  331. default_rrd_history_entries,
  332. default_rrd_memory_mode,
  333. default_health_enabled,
  334. default_rrdpush_enabled,
  335. default_rrdpush_destination,
  336. default_rrdpush_api_key,
  337. default_rrdpush_send_charts_matching,
  338. default_rrdpush_enable_replication,
  339. default_rrdpush_seconds_to_replicate,
  340. default_rrdpush_replication_step,
  341. rrdhost_labels_to_system_info(parser->user.host_define.rrdlabels),
  342. false
  343. );
  344. rrdhost_option_set(host, RRDHOST_OPTION_VIRTUAL_HOST);
  345. if(host->rrdlabels) {
  346. rrdlabels_migrate_to_these(host->rrdlabels, parser->user.host_define.rrdlabels);
  347. }
  348. else {
  349. host->rrdlabels = parser->user.host_define.rrdlabels;
  350. parser->user.host_define.rrdlabels = NULL;
  351. }
  352. pluginsd_host_define_cleanup(parser);
  353. parser->user.host = host;
  354. pluginsd_clear_scope_chart(parser, PLUGINSD_KEYWORD_HOST_DEFINE_END);
  355. rrdhost_flag_clear(host, RRDHOST_FLAG_ORPHAN);
  356. rrdcontext_host_child_connected(host);
  357. schedule_node_info_update(host);
  358. return PARSER_RC_OK;
  359. }
  360. static inline PARSER_RC pluginsd_host(char **words, size_t num_words, PARSER *parser) {
  361. char *guid = get_word(words, num_words, 1);
  362. if(!guid || !*guid || strcmp(guid, "localhost") == 0) {
  363. parser->user.host = localhost;
  364. return PARSER_RC_OK;
  365. }
  366. uuid_t uuid;
  367. char uuid_str[UUID_STR_LEN];
  368. if(!pluginsd_validate_machine_guid(guid, &uuid, uuid_str))
  369. return PLUGINSD_DISABLE_PLUGIN(parser, PLUGINSD_KEYWORD_HOST, "cannot parse MACHINE_GUID - is it a valid UUID?");
  370. RRDHOST *host = rrdhost_find_by_guid(uuid_str);
  371. if(unlikely(!host))
  372. return PLUGINSD_DISABLE_PLUGIN(parser, PLUGINSD_KEYWORD_HOST, "cannot find a host with this machine guid - have you created it?");
  373. parser->user.host = host;
  374. return PARSER_RC_OK;
  375. }
  376. static inline PARSER_RC pluginsd_chart(char **words, size_t num_words, PARSER *parser) {
  377. RRDHOST *host = pluginsd_require_scope_host(parser, PLUGINSD_KEYWORD_CHART);
  378. if(!host) return PLUGINSD_DISABLE_PLUGIN(parser, NULL, NULL);
  379. char *type = get_word(words, num_words, 1);
  380. char *name = get_word(words, num_words, 2);
  381. char *title = get_word(words, num_words, 3);
  382. char *units = get_word(words, num_words, 4);
  383. char *family = get_word(words, num_words, 5);
  384. char *context = get_word(words, num_words, 6);
  385. char *chart = get_word(words, num_words, 7);
  386. char *priority_s = get_word(words, num_words, 8);
  387. char *update_every_s = get_word(words, num_words, 9);
  388. char *options = get_word(words, num_words, 10);
  389. char *plugin = get_word(words, num_words, 11);
  390. char *module = get_word(words, num_words, 12);
  391. // parse the id from type
  392. char *id = NULL;
  393. if (likely(type && (id = strchr(type, '.')))) {
  394. *id = '\0';
  395. id++;
  396. }
  397. // make sure we have the required variables
  398. if (unlikely((!type || !*type || !id || !*id)))
  399. return PLUGINSD_DISABLE_PLUGIN(parser, PLUGINSD_KEYWORD_CHART, "missing parameters");
  400. // parse the name, and make sure it does not include 'type.'
  401. if (unlikely(name && *name)) {
  402. // when data are streamed from child nodes
  403. // name will be type.name
  404. // so, we have to remove 'type.' from name too
  405. size_t len = strlen(type);
  406. if (strncmp(type, name, len) == 0 && name[len] == '.')
  407. name = &name[len + 1];
  408. // if the name is the same with the id,
  409. // or is just 'NULL', clear it.
  410. if (unlikely(strcmp(name, id) == 0 || strcasecmp(name, "NULL") == 0 || strcasecmp(name, "(NULL)") == 0))
  411. name = NULL;
  412. }
  413. int priority = 1000;
  414. if (likely(priority_s && *priority_s))
  415. priority = str2i(priority_s);
  416. int update_every = parser->user.cd->update_every;
  417. if (likely(update_every_s && *update_every_s))
  418. update_every = str2i(update_every_s);
  419. if (unlikely(!update_every))
  420. update_every = parser->user.cd->update_every;
  421. RRDSET_TYPE chart_type = RRDSET_TYPE_LINE;
  422. if (unlikely(chart))
  423. chart_type = rrdset_type_id(chart);
  424. if (unlikely(name && !*name))
  425. name = NULL;
  426. if (unlikely(family && !*family))
  427. family = NULL;
  428. if (unlikely(context && !*context))
  429. context = NULL;
  430. if (unlikely(!title))
  431. title = "";
  432. if (unlikely(!units))
  433. units = "unknown";
  434. netdata_log_debug(
  435. D_PLUGINSD,
  436. "creating chart type='%s', id='%s', name='%s', family='%s', context='%s', chart='%s', priority=%d, update_every=%d",
  437. type, id, name ? name : "", family ? family : "", context ? context : "", rrdset_type_name(chart_type),
  438. priority, update_every);
  439. RRDSET *st = NULL;
  440. st = rrdset_create(
  441. host, type, id, name, family, context, title, units,
  442. (plugin && *plugin) ? plugin : parser->user.cd->filename,
  443. module, priority, update_every,
  444. chart_type);
  445. if (likely(st)) {
  446. if (options && *options) {
  447. if (strstr(options, "obsolete")) {
  448. pluginsd_rrdset_cleanup(st);
  449. rrdset_is_obsolete(st);
  450. }
  451. else
  452. rrdset_isnot_obsolete(st);
  453. if (strstr(options, "detail"))
  454. rrdset_flag_set(st, RRDSET_FLAG_DETAIL);
  455. else
  456. rrdset_flag_clear(st, RRDSET_FLAG_DETAIL);
  457. if (strstr(options, "hidden"))
  458. rrdset_flag_set(st, RRDSET_FLAG_HIDDEN);
  459. else
  460. rrdset_flag_clear(st, RRDSET_FLAG_HIDDEN);
  461. if (strstr(options, "store_first"))
  462. rrdset_flag_set(st, RRDSET_FLAG_STORE_FIRST);
  463. else
  464. rrdset_flag_clear(st, RRDSET_FLAG_STORE_FIRST);
  465. }
  466. else {
  467. rrdset_isnot_obsolete(st);
  468. rrdset_flag_clear(st, RRDSET_FLAG_DETAIL);
  469. rrdset_flag_clear(st, RRDSET_FLAG_STORE_FIRST);
  470. }
  471. if(!pluginsd_set_scope_chart(parser, st, PLUGINSD_KEYWORD_CHART))
  472. return PLUGINSD_DISABLE_PLUGIN(parser, NULL, NULL);
  473. }
  474. else
  475. pluginsd_clear_scope_chart(parser, PLUGINSD_KEYWORD_CHART);
  476. return PARSER_RC_OK;
  477. }
  478. static inline PARSER_RC pluginsd_chart_definition_end(char **words, size_t num_words, PARSER *parser) {
  479. const char *first_entry_txt = get_word(words, num_words, 1);
  480. const char *last_entry_txt = get_word(words, num_words, 2);
  481. const char *wall_clock_time_txt = get_word(words, num_words, 3);
  482. RRDHOST *host = pluginsd_require_scope_host(parser, PLUGINSD_KEYWORD_CHART_DEFINITION_END);
  483. if(!host) return PLUGINSD_DISABLE_PLUGIN(parser, NULL, NULL);
  484. RRDSET *st = pluginsd_require_scope_chart(parser, PLUGINSD_KEYWORD_CHART_DEFINITION_END, PLUGINSD_KEYWORD_CHART);
  485. if(!st) return PLUGINSD_DISABLE_PLUGIN(parser, NULL, NULL);
  486. time_t first_entry_child = (first_entry_txt && *first_entry_txt) ? (time_t)str2ul(first_entry_txt) : 0;
  487. time_t last_entry_child = (last_entry_txt && *last_entry_txt) ? (time_t)str2ul(last_entry_txt) : 0;
  488. time_t child_wall_clock_time = (wall_clock_time_txt && *wall_clock_time_txt) ? (time_t)str2ul(wall_clock_time_txt) : now_realtime_sec();
  489. bool ok = true;
  490. if(!rrdset_flag_check(st, RRDSET_FLAG_RECEIVER_REPLICATION_IN_PROGRESS)) {
  491. #ifdef NETDATA_LOG_REPLICATION_REQUESTS
  492. st->replay.start_streaming = false;
  493. st->replay.after = 0;
  494. st->replay.before = 0;
  495. #endif
  496. rrdset_flag_set(st, RRDSET_FLAG_RECEIVER_REPLICATION_IN_PROGRESS);
  497. rrdset_flag_clear(st, RRDSET_FLAG_RECEIVER_REPLICATION_FINISHED);
  498. rrdhost_receiver_replicating_charts_plus_one(st->rrdhost);
  499. ok = replicate_chart_request(send_to_plugin, parser, host, st,
  500. first_entry_child, last_entry_child, child_wall_clock_time,
  501. 0, 0);
  502. }
  503. #ifdef NETDATA_LOG_REPLICATION_REQUESTS
  504. else {
  505. internal_error(true, "REPLAY: 'host:%s/chart:%s' not sending duplicate replication request",
  506. rrdhost_hostname(st->rrdhost), rrdset_id(st));
  507. }
  508. #endif
  509. return ok ? PARSER_RC_OK : PARSER_RC_ERROR;
  510. }
  511. static inline PARSER_RC pluginsd_dimension(char **words, size_t num_words, PARSER *parser) {
  512. char *id = get_word(words, num_words, 1);
  513. char *name = get_word(words, num_words, 2);
  514. char *algorithm = get_word(words, num_words, 3);
  515. char *multiplier_s = get_word(words, num_words, 4);
  516. char *divisor_s = get_word(words, num_words, 5);
  517. char *options = get_word(words, num_words, 6);
  518. RRDHOST *host = pluginsd_require_scope_host(parser, PLUGINSD_KEYWORD_DIMENSION);
  519. if(!host) return PLUGINSD_DISABLE_PLUGIN(parser, NULL, NULL);
  520. RRDSET *st = pluginsd_require_scope_chart(parser, PLUGINSD_KEYWORD_DIMENSION, PLUGINSD_KEYWORD_CHART);
  521. if(!st) return PLUGINSD_DISABLE_PLUGIN(parser, NULL, NULL);
  522. if (unlikely(!id))
  523. return PLUGINSD_DISABLE_PLUGIN(parser, PLUGINSD_KEYWORD_DIMENSION, "missing dimension id");
  524. long multiplier = 1;
  525. if (multiplier_s && *multiplier_s) {
  526. multiplier = str2ll_encoded(multiplier_s);
  527. if (unlikely(!multiplier))
  528. multiplier = 1;
  529. }
  530. long divisor = 1;
  531. if (likely(divisor_s && *divisor_s)) {
  532. divisor = str2ll_encoded(divisor_s);
  533. if (unlikely(!divisor))
  534. divisor = 1;
  535. }
  536. if (unlikely(!algorithm || !*algorithm))
  537. algorithm = "absolute";
  538. if (unlikely(st && rrdset_flag_check(st, RRDSET_FLAG_DEBUG)))
  539. netdata_log_debug(
  540. D_PLUGINSD,
  541. "creating dimension in chart %s, id='%s', name='%s', algorithm='%s', multiplier=%ld, divisor=%ld, hidden='%s'",
  542. rrdset_id(st), id, name ? name : "", rrd_algorithm_name(rrd_algorithm_id(algorithm)), multiplier, divisor,
  543. options ? options : "");
  544. RRDDIM *rd = rrddim_add(st, id, name, multiplier, divisor, rrd_algorithm_id(algorithm));
  545. int unhide_dimension = 1;
  546. rrddim_option_clear(rd, RRDDIM_OPTION_DONT_DETECT_RESETS_OR_OVERFLOWS);
  547. if (options && *options) {
  548. if (strstr(options, "obsolete") != NULL)
  549. rrddim_is_obsolete(st, rd);
  550. else
  551. rrddim_isnot_obsolete(st, rd);
  552. unhide_dimension = !strstr(options, "hidden");
  553. if (strstr(options, "noreset") != NULL)
  554. rrddim_option_set(rd, RRDDIM_OPTION_DONT_DETECT_RESETS_OR_OVERFLOWS);
  555. if (strstr(options, "nooverflow") != NULL)
  556. rrddim_option_set(rd, RRDDIM_OPTION_DONT_DETECT_RESETS_OR_OVERFLOWS);
  557. } else
  558. rrddim_isnot_obsolete(st, rd);
  559. bool should_update_dimension = false;
  560. if (likely(unhide_dimension)) {
  561. rrddim_option_clear(rd, RRDDIM_OPTION_HIDDEN);
  562. should_update_dimension = rrddim_flag_check(rd, RRDDIM_FLAG_META_HIDDEN);
  563. }
  564. else {
  565. rrddim_option_set(rd, RRDDIM_OPTION_HIDDEN);
  566. should_update_dimension = !rrddim_flag_check(rd, RRDDIM_FLAG_META_HIDDEN);
  567. }
  568. if (should_update_dimension) {
  569. rrddim_flag_set(rd, RRDDIM_FLAG_METADATA_UPDATE);
  570. rrdhost_flag_set(rd->rrdset->rrdhost, RRDHOST_FLAG_METADATA_UPDATE);
  571. }
  572. return PARSER_RC_OK;
  573. }
  574. // ----------------------------------------------------------------------------
  575. // execution of functions
  576. struct inflight_function {
  577. int code;
  578. int timeout;
  579. STRING *function;
  580. BUFFER *result_body_wb;
  581. rrd_function_result_callback_t result_cb;
  582. void *result_cb_data;
  583. usec_t timeout_ut;
  584. usec_t started_ut;
  585. usec_t sent_ut;
  586. const char *payload;
  587. PARSER *parser;
  588. bool virtual;
  589. };
  590. static void inflight_functions_insert_callback(const DICTIONARY_ITEM *item, void *func, void *parser_ptr) {
  591. struct inflight_function *pf = func;
  592. PARSER *parser = parser_ptr;
  593. // leave this code as default, so that when the dictionary is destroyed this will be sent back to the caller
  594. pf->code = HTTP_RESP_GATEWAY_TIMEOUT;
  595. const char *transaction = dictionary_acquired_item_name(item);
  596. char buffer[2048 + 1];
  597. snprintfz(buffer, 2048, "%s %s %d \"%s\"\n",
  598. pf->payload ? "FUNCTION_PAYLOAD" : "FUNCTION",
  599. transaction,
  600. pf->timeout,
  601. string2str(pf->function));
  602. // send the command to the plugin
  603. ssize_t ret = send_to_plugin(buffer, parser);
  604. pf->sent_ut = now_realtime_usec();
  605. if(ret < 0) {
  606. netdata_log_error("FUNCTION '%s': failed to send it to the plugin, error %zd", string2str(pf->function), ret);
  607. rrd_call_function_error(pf->result_body_wb, "Failed to communicate with collector", HTTP_RESP_SERVICE_UNAVAILABLE);
  608. }
  609. else {
  610. internal_error(LOG_FUNCTIONS,
  611. "FUNCTION '%s' with transaction '%s' sent to collector (%zd bytes, in %"PRIu64" usec)",
  612. string2str(pf->function), dictionary_acquired_item_name(item), ret,
  613. pf->sent_ut - pf->started_ut);
  614. }
  615. if (!pf->payload)
  616. return;
  617. // send the payload to the plugin
  618. ret = send_to_plugin(pf->payload, parser);
  619. if(ret < 0) {
  620. netdata_log_error("FUNCTION_PAYLOAD '%s': failed to send function to plugin, error %zd", string2str(pf->function), ret);
  621. rrd_call_function_error(pf->result_body_wb, "Failed to communicate with collector", HTTP_RESP_SERVICE_UNAVAILABLE);
  622. }
  623. else {
  624. internal_error(LOG_FUNCTIONS,
  625. "FUNCTION_PAYLOAD '%s' with transaction '%s' sent to collector (%zd bytes, in %"PRIu64" usec)",
  626. string2str(pf->function), dictionary_acquired_item_name(item), ret,
  627. pf->sent_ut - pf->started_ut);
  628. }
  629. send_to_plugin("\nFUNCTION_PAYLOAD_END\n", parser);
  630. }
  631. static bool inflight_functions_conflict_callback(const DICTIONARY_ITEM *item __maybe_unused, void *func __maybe_unused, void *new_func, void *parser_ptr __maybe_unused) {
  632. struct inflight_function *pf = new_func;
  633. netdata_log_error("PLUGINSD_PARSER: duplicate UUID on pending function '%s' detected. Ignoring the second one.", string2str(pf->function));
  634. pf->code = rrd_call_function_error(pf->result_body_wb, "This request is already in progress", HTTP_RESP_BAD_REQUEST);
  635. pf->result_cb(pf->result_body_wb, pf->code, pf->result_cb_data);
  636. string_freez(pf->function);
  637. return false;
  638. }
  639. void delete_job_finalize(struct parser *parser __maybe_unused, struct configurable_plugin *plug, const char *fnc_sig, int code) {
  640. if (code != DYNCFG_VFNC_RET_CFG_ACCEPTED)
  641. return;
  642. char *params_local = strdupz(fnc_sig);
  643. char *words[DYNCFG_MAX_WORDS];
  644. size_t words_c = quoted_strings_splitter(params_local, words, DYNCFG_MAX_WORDS, isspace_map_pluginsd);
  645. if (words_c != 3) {
  646. netdata_log_error("PLUGINSD_PARSER: invalid number of parameters for delete_job");
  647. freez(params_local);
  648. return;
  649. }
  650. const char *module = words[1];
  651. const char *job = words[2];
  652. delete_job(plug, module, job);
  653. unlink_job(plug->name, module, job);
  654. rrdpush_send_job_deleted(localhost, plug->name, module, job);
  655. freez(params_local);
  656. }
  657. void set_job_finalize(struct parser *parser __maybe_unused, struct configurable_plugin *plug __maybe_unused, const char *fnc_sig, int code) {
  658. if (code != DYNCFG_VFNC_RET_CFG_ACCEPTED)
  659. return;
  660. char *params_local = strdupz(fnc_sig);
  661. char *words[DYNCFG_MAX_WORDS];
  662. size_t words_c = quoted_strings_splitter(params_local, words, DYNCFG_MAX_WORDS, isspace_map_pluginsd);
  663. if (words_c != 3) {
  664. netdata_log_error("PLUGINSD_PARSER: invalid number of parameters for set_job_config");
  665. freez(params_local);
  666. return;
  667. }
  668. const char *module_name = get_word(words, words_c, 1);
  669. const char *job_name = get_word(words, words_c, 2);
  670. if (register_job(parser->user.host->configurable_plugins, parser->user.cd->configuration->name, module_name, job_name, JOB_TYPE_USER, JOB_FLG_USER_CREATED, 1)) {
  671. freez(params_local);
  672. return;
  673. }
  674. // only send this if it is not existing already (register_job cares for that)
  675. rrdpush_send_dyncfg_reg_job(localhost, parser->user.cd->configuration->name, module_name, job_name, JOB_TYPE_USER, JOB_FLG_USER_CREATED);
  676. freez(params_local);
  677. }
  678. static void inflight_functions_delete_callback(const DICTIONARY_ITEM *item __maybe_unused, void *func, void *parser_ptr) {
  679. struct inflight_function *pf = func;
  680. struct parser *parser = (struct parser *)parser_ptr;
  681. internal_error(LOG_FUNCTIONS,
  682. "FUNCTION '%s' result of transaction '%s' received from collector (%zu bytes, request %"PRIu64" usec, response %"PRIu64" usec)",
  683. string2str(pf->function), dictionary_acquired_item_name(item),
  684. buffer_strlen(pf->result_body_wb), pf->sent_ut - pf->started_ut, now_realtime_usec() - pf->sent_ut);
  685. if (pf->virtual && SERVING_PLUGINSD(parser)) {
  686. if (pf->payload) {
  687. if (strncmp(string2str(pf->function), FUNCTION_NAME_SET_JOB_CONFIG, strlen(FUNCTION_NAME_SET_JOB_CONFIG)) == 0)
  688. set_job_finalize(parser, parser->user.cd->configuration, string2str(pf->function), pf->code);
  689. dyn_conf_store_config(string2str(pf->function), pf->payload, parser->user.cd->configuration);
  690. } else if (strncmp(string2str(pf->function), FUNCTION_NAME_DELETE_JOB, strlen(FUNCTION_NAME_DELETE_JOB)) == 0) {
  691. delete_job_finalize(parser, parser->user.cd->configuration, string2str(pf->function), pf->code);
  692. }
  693. }
  694. pf->result_cb(pf->result_body_wb, pf->code, pf->result_cb_data);
  695. string_freez(pf->function);
  696. freez((void *)pf->payload);
  697. }
  698. void inflight_functions_init(PARSER *parser) {
  699. parser->inflight.functions = dictionary_create_advanced(DICT_OPTION_DONT_OVERWRITE_VALUE, &dictionary_stats_category_functions, 0);
  700. dictionary_register_insert_callback(parser->inflight.functions, inflight_functions_insert_callback, parser);
  701. dictionary_register_delete_callback(parser->inflight.functions, inflight_functions_delete_callback, parser);
  702. dictionary_register_conflict_callback(parser->inflight.functions, inflight_functions_conflict_callback, parser);
  703. }
  704. static void inflight_functions_garbage_collect(PARSER *parser, usec_t now) {
  705. parser->inflight.smaller_timeout = 0;
  706. struct inflight_function *pf;
  707. dfe_start_write(parser->inflight.functions, pf) {
  708. if (pf->timeout_ut < now) {
  709. internal_error(true,
  710. "FUNCTION '%s' removing expired transaction '%s', after %"PRIu64" usec.",
  711. string2str(pf->function), pf_dfe.name, now - pf->started_ut);
  712. if(!buffer_strlen(pf->result_body_wb) || pf->code == HTTP_RESP_OK)
  713. pf->code = rrd_call_function_error(pf->result_body_wb,
  714. "Timeout waiting for collector response.",
  715. HTTP_RESP_GATEWAY_TIMEOUT);
  716. dictionary_del(parser->inflight.functions, pf_dfe.name);
  717. }
  718. else if(!parser->inflight.smaller_timeout || pf->timeout_ut < parser->inflight.smaller_timeout)
  719. parser->inflight.smaller_timeout = pf->timeout_ut;
  720. }
  721. dfe_done(pf);
  722. }
  723. void pluginsd_function_cancel(void *data) {
  724. struct inflight_function *look_for = data, *t;
  725. bool sent = false;
  726. dfe_start_read(look_for->parser->inflight.functions, t) {
  727. if(look_for == t) {
  728. const char *transaction = t_dfe.name;
  729. internal_error(true, "PLUGINSD: sending function cancellation to plugin for transaction '%s'", transaction);
  730. char buffer[2048 + 1];
  731. snprintfz(buffer, 2048, "%s %s\n",
  732. PLUGINSD_KEYWORD_FUNCTION_CANCEL,
  733. transaction);
  734. // send the command to the plugin
  735. ssize_t ret = send_to_plugin(buffer, t->parser);
  736. if(ret < 0)
  737. sent = true;
  738. break;
  739. }
  740. }
  741. dfe_done(t);
  742. if(sent <= 0)
  743. netdata_log_error("PLUGINSD: FUNCTION_CANCEL request didn't match any pending function requests in pluginsd.d.");
  744. }
  745. // this is the function that is called from
  746. // rrd_call_function_and_wait() and rrd_call_function_async()
  747. static int pluginsd_function_execute_cb(BUFFER *result_body_wb, int timeout, const char *function,
  748. void *execute_cb_data,
  749. rrd_function_result_callback_t result_cb, void *result_cb_data,
  750. rrd_function_is_cancelled_cb_t is_cancelled_cb __maybe_unused,
  751. void *is_cancelled_cb_data __maybe_unused,
  752. rrd_function_register_canceller_cb_t register_canceller_cb,
  753. void *register_canceller_db_data) {
  754. PARSER *parser = execute_cb_data;
  755. usec_t now = now_realtime_usec();
  756. struct inflight_function tmp = {
  757. .started_ut = now,
  758. .timeout_ut = now + timeout * USEC_PER_SEC + RRDFUNCTIONS_TIMEOUT_EXTENSION_UT,
  759. .result_body_wb = result_body_wb,
  760. .timeout = timeout,
  761. .function = string_strdupz(function),
  762. .result_cb = result_cb,
  763. .result_cb_data = result_cb_data,
  764. .payload = NULL,
  765. .parser = parser,
  766. };
  767. uuid_t uuid;
  768. uuid_generate_random(uuid);
  769. char transaction[UUID_STR_LEN];
  770. uuid_unparse_lower(uuid, transaction);
  771. dictionary_write_lock(parser->inflight.functions);
  772. // if there is any error, our dictionary callbacks will call the caller callback to notify
  773. // the caller about the error - no need for error handling here.
  774. void *t = dictionary_set(parser->inflight.functions, transaction, &tmp, sizeof(struct inflight_function));
  775. if(register_canceller_cb)
  776. register_canceller_cb(register_canceller_db_data, pluginsd_function_cancel, t);
  777. if(!parser->inflight.smaller_timeout || tmp.timeout_ut < parser->inflight.smaller_timeout)
  778. parser->inflight.smaller_timeout = tmp.timeout_ut;
  779. // garbage collect stale inflight functions
  780. if(parser->inflight.smaller_timeout < now)
  781. inflight_functions_garbage_collect(parser, now);
  782. dictionary_write_unlock(parser->inflight.functions);
  783. return HTTP_RESP_OK;
  784. }
  785. static inline PARSER_RC pluginsd_function(char **words, size_t num_words, PARSER *parser) {
  786. // a plugin or a child is registering a function
  787. bool global = false;
  788. size_t i = 1;
  789. if(num_words >= 2 && strcmp(get_word(words, num_words, 1), "GLOBAL") == 0) {
  790. i++;
  791. global = true;
  792. }
  793. char *name = get_word(words, num_words, i++);
  794. char *timeout_s = get_word(words, num_words, i++);
  795. char *help = get_word(words, num_words, i++);
  796. RRDHOST *host = pluginsd_require_scope_host(parser, PLUGINSD_KEYWORD_FUNCTION);
  797. if(!host) return PARSER_RC_ERROR;
  798. RRDSET *st = (global)? NULL: pluginsd_require_scope_chart(parser, PLUGINSD_KEYWORD_FUNCTION, PLUGINSD_KEYWORD_CHART);
  799. if(!st) global = true;
  800. if (unlikely(!timeout_s || !name || !help || (!global && !st))) {
  801. netdata_log_error("PLUGINSD: 'host:%s/chart:%s' got a FUNCTION, without providing the required data (global = '%s', name = '%s', timeout = '%s', help = '%s'). Ignoring it.",
  802. rrdhost_hostname(host),
  803. st?rrdset_id(st):"(unset)",
  804. global?"yes":"no",
  805. name?name:"(unset)",
  806. timeout_s?timeout_s:"(unset)",
  807. help?help:"(unset)"
  808. );
  809. return PARSER_RC_ERROR;
  810. }
  811. int timeout = PLUGINS_FUNCTIONS_TIMEOUT_DEFAULT;
  812. if (timeout_s && *timeout_s) {
  813. timeout = str2i(timeout_s);
  814. if (unlikely(timeout <= 0))
  815. timeout = PLUGINS_FUNCTIONS_TIMEOUT_DEFAULT;
  816. }
  817. rrd_function_add(host, st, name, timeout, help, false, pluginsd_function_execute_cb, parser);
  818. parser->user.data_collections_count++;
  819. return PARSER_RC_OK;
  820. }
  821. static void pluginsd_function_result_end(struct parser *parser, void *action_data) {
  822. STRING *key = action_data;
  823. if(key)
  824. dictionary_del(parser->inflight.functions, string2str(key));
  825. string_freez(key);
  826. parser->user.data_collections_count++;
  827. }
  828. static inline PARSER_RC pluginsd_function_result_begin(char **words, size_t num_words, PARSER *parser) {
  829. char *key = get_word(words, num_words, 1);
  830. char *status = get_word(words, num_words, 2);
  831. char *format = get_word(words, num_words, 3);
  832. char *expires = get_word(words, num_words, 4);
  833. if (unlikely(!key || !*key || !status || !*status || !format || !*format || !expires || !*expires)) {
  834. netdata_log_error("got a " PLUGINSD_KEYWORD_FUNCTION_RESULT_BEGIN " without providing the required data (key = '%s', status = '%s', format = '%s', expires = '%s')."
  835. , key ? key : "(unset)"
  836. , status ? status : "(unset)"
  837. , format ? format : "(unset)"
  838. , expires ? expires : "(unset)"
  839. );
  840. }
  841. int code = (status && *status) ? str2i(status) : 0;
  842. if (code <= 0)
  843. code = HTTP_RESP_BACKEND_RESPONSE_INVALID;
  844. time_t expiration = (expires && *expires) ? str2l(expires) : 0;
  845. struct inflight_function *pf = NULL;
  846. if(key && *key)
  847. pf = (struct inflight_function *)dictionary_get(parser->inflight.functions, key);
  848. if(!pf) {
  849. netdata_log_error("got a " PLUGINSD_KEYWORD_FUNCTION_RESULT_BEGIN " for transaction '%s', but the transaction is not found.", key?key:"(unset)");
  850. }
  851. else {
  852. if(format && *format)
  853. pf->result_body_wb->content_type = functions_format_to_content_type(format);
  854. pf->code = code;
  855. pf->result_body_wb->expires = expiration;
  856. if(expiration <= now_realtime_sec())
  857. buffer_no_cacheable(pf->result_body_wb);
  858. else
  859. buffer_cacheable(pf->result_body_wb);
  860. }
  861. parser->defer.response = (pf) ? pf->result_body_wb : NULL;
  862. parser->defer.end_keyword = PLUGINSD_KEYWORD_FUNCTION_RESULT_END;
  863. parser->defer.action = pluginsd_function_result_end;
  864. parser->defer.action_data = string_strdupz(key); // it is ok is key is NULL
  865. parser->flags |= PARSER_DEFER_UNTIL_KEYWORD;
  866. return PARSER_RC_OK;
  867. }
  868. // ----------------------------------------------------------------------------
  869. static inline PARSER_RC pluginsd_variable(char **words, size_t num_words, PARSER *parser) {
  870. char *name = get_word(words, num_words, 1);
  871. char *value = get_word(words, num_words, 2);
  872. NETDATA_DOUBLE v;
  873. RRDHOST *host = pluginsd_require_scope_host(parser, PLUGINSD_KEYWORD_VARIABLE);
  874. if(!host) return PLUGINSD_DISABLE_PLUGIN(parser, NULL, NULL);
  875. RRDSET *st = pluginsd_get_scope_chart(parser);
  876. int global = (st) ? 0 : 1;
  877. if (name && *name) {
  878. if ((strcmp(name, "GLOBAL") == 0 || strcmp(name, "HOST") == 0)) {
  879. global = 1;
  880. name = get_word(words, num_words, 2);
  881. value = get_word(words, num_words, 3);
  882. } else if ((strcmp(name, "LOCAL") == 0 || strcmp(name, "CHART") == 0)) {
  883. global = 0;
  884. name = get_word(words, num_words, 2);
  885. value = get_word(words, num_words, 3);
  886. }
  887. }
  888. if (unlikely(!name || !*name))
  889. return PLUGINSD_DISABLE_PLUGIN(parser, PLUGINSD_KEYWORD_VARIABLE, "missing variable name");
  890. if (unlikely(!value || !*value))
  891. value = NULL;
  892. if (unlikely(!value)) {
  893. netdata_log_error("PLUGINSD: 'host:%s/chart:%s' cannot set %s VARIABLE '%s' to an empty value",
  894. rrdhost_hostname(host),
  895. st ? rrdset_id(st):"UNSET",
  896. (global) ? "HOST" : "CHART",
  897. name);
  898. return PARSER_RC_OK;
  899. }
  900. if (!global && !st)
  901. return PLUGINSD_DISABLE_PLUGIN(parser, PLUGINSD_KEYWORD_VARIABLE, "no chart is defined and no GLOBAL is given");
  902. char *endptr = NULL;
  903. v = (NETDATA_DOUBLE) str2ndd_encoded(value, &endptr);
  904. if (unlikely(endptr && *endptr)) {
  905. if (endptr == value)
  906. netdata_log_error("PLUGINSD: 'host:%s/chart:%s' the value '%s' of VARIABLE '%s' cannot be parsed as a number",
  907. rrdhost_hostname(host),
  908. st ? rrdset_id(st):"UNSET",
  909. value,
  910. name);
  911. else
  912. netdata_log_error("PLUGINSD: 'host:%s/chart:%s' the value '%s' of VARIABLE '%s' has leftovers: '%s'",
  913. rrdhost_hostname(host),
  914. st ? rrdset_id(st):"UNSET",
  915. value,
  916. name,
  917. endptr);
  918. }
  919. if (global) {
  920. const RRDVAR_ACQUIRED *rva = rrdvar_custom_host_variable_add_and_acquire(host, name);
  921. if (rva) {
  922. rrdvar_custom_host_variable_set(host, rva, v);
  923. rrdvar_custom_host_variable_release(host, rva);
  924. }
  925. else
  926. netdata_log_error("PLUGINSD: 'host:%s' cannot find/create HOST VARIABLE '%s'",
  927. rrdhost_hostname(host),
  928. name);
  929. } else {
  930. const RRDSETVAR_ACQUIRED *rsa = rrdsetvar_custom_chart_variable_add_and_acquire(st, name);
  931. if (rsa) {
  932. rrdsetvar_custom_chart_variable_set(st, rsa, v);
  933. rrdsetvar_custom_chart_variable_release(st, rsa);
  934. }
  935. else
  936. netdata_log_error("PLUGINSD: 'host:%s/chart:%s' cannot find/create CHART VARIABLE '%s'",
  937. rrdhost_hostname(host), rrdset_id(st), name);
  938. }
  939. return PARSER_RC_OK;
  940. }
  941. static inline PARSER_RC pluginsd_flush(char **words __maybe_unused, size_t num_words __maybe_unused, PARSER *parser) {
  942. netdata_log_debug(D_PLUGINSD, "requested a " PLUGINSD_KEYWORD_FLUSH);
  943. pluginsd_clear_scope_chart(parser, PLUGINSD_KEYWORD_FLUSH);
  944. parser->user.replay.start_time = 0;
  945. parser->user.replay.end_time = 0;
  946. parser->user.replay.start_time_ut = 0;
  947. parser->user.replay.end_time_ut = 0;
  948. return PARSER_RC_OK;
  949. }
  950. static inline PARSER_RC pluginsd_disable(char **words __maybe_unused, size_t num_words __maybe_unused, PARSER *parser) {
  951. netdata_log_info("PLUGINSD: plugin called DISABLE. Disabling it.");
  952. parser->user.enabled = 0;
  953. return PARSER_RC_STOP;
  954. }
  955. static inline PARSER_RC pluginsd_label(char **words, size_t num_words, PARSER *parser) {
  956. const char *name = get_word(words, num_words, 1);
  957. const char *label_source = get_word(words, num_words, 2);
  958. const char *value = get_word(words, num_words, 3);
  959. if (!name || !label_source || !value)
  960. return PLUGINSD_DISABLE_PLUGIN(parser, PLUGINSD_KEYWORD_LABEL, "missing parameters");
  961. char *store = (char *)value;
  962. bool allocated_store = false;
  963. if(unlikely(num_words > 4)) {
  964. allocated_store = true;
  965. store = mallocz(PLUGINSD_LINE_MAX + 1);
  966. size_t remaining = PLUGINSD_LINE_MAX;
  967. char *move = store;
  968. char *word;
  969. for(size_t i = 3; i < num_words && remaining > 2 && (word = get_word(words, num_words, i)) ;i++) {
  970. if(i > 3) {
  971. *move++ = ' ';
  972. *move = '\0';
  973. remaining--;
  974. }
  975. size_t length = strlen(word);
  976. if (length > remaining)
  977. length = remaining;
  978. remaining -= length;
  979. memcpy(move, word, length);
  980. move += length;
  981. *move = '\0';
  982. }
  983. }
  984. if(unlikely(!(parser->user.new_host_labels)))
  985. parser->user.new_host_labels = rrdlabels_create();
  986. rrdlabels_add(parser->user.new_host_labels, name, store, str2l(label_source));
  987. if (allocated_store)
  988. freez(store);
  989. return PARSER_RC_OK;
  990. }
  991. static inline PARSER_RC pluginsd_overwrite(char **words __maybe_unused, size_t num_words __maybe_unused, PARSER *parser) {
  992. RRDHOST *host = pluginsd_require_scope_host(parser, PLUGINSD_KEYWORD_OVERWRITE);
  993. if(!host) return PLUGINSD_DISABLE_PLUGIN(parser, NULL, NULL);
  994. netdata_log_debug(D_PLUGINSD, "requested to OVERWRITE host labels");
  995. if(unlikely(!host->rrdlabels))
  996. host->rrdlabels = rrdlabels_create();
  997. rrdlabels_migrate_to_these(host->rrdlabels, parser->user.new_host_labels);
  998. rrdhost_flag_set(host, RRDHOST_FLAG_METADATA_LABELS | RRDHOST_FLAG_METADATA_UPDATE);
  999. rrdlabels_destroy(parser->user.new_host_labels);
  1000. parser->user.new_host_labels = NULL;
  1001. return PARSER_RC_OK;
  1002. }
  1003. static inline PARSER_RC pluginsd_clabel(char **words, size_t num_words, PARSER *parser) {
  1004. const char *name = get_word(words, num_words, 1);
  1005. const char *value = get_word(words, num_words, 2);
  1006. const char *label_source = get_word(words, num_words, 3);
  1007. if (!name || !value || !label_source) {
  1008. netdata_log_error("Ignoring malformed or empty CHART LABEL command.");
  1009. return PLUGINSD_DISABLE_PLUGIN(parser, NULL, NULL);
  1010. }
  1011. if(unlikely(!parser->user.chart_rrdlabels_linked_temporarily)) {
  1012. RRDSET *st = pluginsd_get_scope_chart(parser);
  1013. parser->user.chart_rrdlabels_linked_temporarily = st->rrdlabels;
  1014. rrdlabels_unmark_all(parser->user.chart_rrdlabels_linked_temporarily);
  1015. }
  1016. rrdlabels_add(parser->user.chart_rrdlabels_linked_temporarily, name, value, str2l(label_source));
  1017. return PARSER_RC_OK;
  1018. }
  1019. static inline PARSER_RC pluginsd_clabel_commit(char **words __maybe_unused, size_t num_words __maybe_unused, PARSER *parser) {
  1020. RRDHOST *host = pluginsd_require_scope_host(parser, PLUGINSD_KEYWORD_CLABEL_COMMIT);
  1021. if(!host) return PLUGINSD_DISABLE_PLUGIN(parser, NULL, NULL);
  1022. RRDSET *st = pluginsd_require_scope_chart(parser, PLUGINSD_KEYWORD_CLABEL_COMMIT, PLUGINSD_KEYWORD_BEGIN);
  1023. if(!st) return PLUGINSD_DISABLE_PLUGIN(parser, NULL, NULL);
  1024. netdata_log_debug(D_PLUGINSD, "requested to commit chart labels");
  1025. if(!parser->user.chart_rrdlabels_linked_temporarily) {
  1026. netdata_log_error("PLUGINSD: 'host:%s' got CLABEL_COMMIT, without a CHART or BEGIN. Ignoring it.", rrdhost_hostname(host));
  1027. return PLUGINSD_DISABLE_PLUGIN(parser, NULL, NULL);
  1028. }
  1029. rrdlabels_remove_all_unmarked(parser->user.chart_rrdlabels_linked_temporarily);
  1030. rrdset_flag_set(st, RRDSET_FLAG_METADATA_UPDATE);
  1031. rrdhost_flag_set(st->rrdhost, RRDHOST_FLAG_METADATA_UPDATE);
  1032. rrdset_flag_clear(st, RRDSET_FLAG_UPSTREAM_EXPOSED);
  1033. parser->user.chart_rrdlabels_linked_temporarily = NULL;
  1034. return PARSER_RC_OK;
  1035. }
  1036. static inline PARSER_RC pluginsd_replay_begin(char **words, size_t num_words, PARSER *parser) {
  1037. char *id = get_word(words, num_words, 1);
  1038. char *start_time_str = get_word(words, num_words, 2);
  1039. char *end_time_str = get_word(words, num_words, 3);
  1040. char *child_now_str = get_word(words, num_words, 4);
  1041. RRDHOST *host = pluginsd_require_scope_host(parser, PLUGINSD_KEYWORD_REPLAY_BEGIN);
  1042. if(!host) return PLUGINSD_DISABLE_PLUGIN(parser, NULL, NULL);
  1043. RRDSET *st;
  1044. if (likely(!id || !*id))
  1045. st = pluginsd_require_scope_chart(parser, PLUGINSD_KEYWORD_REPLAY_BEGIN, PLUGINSD_KEYWORD_REPLAY_BEGIN);
  1046. else
  1047. st = pluginsd_find_chart(host, id, PLUGINSD_KEYWORD_REPLAY_BEGIN);
  1048. if(!st) return PLUGINSD_DISABLE_PLUGIN(parser, NULL, NULL);
  1049. if(!pluginsd_set_scope_chart(parser, st, PLUGINSD_KEYWORD_REPLAY_BEGIN))
  1050. return PLUGINSD_DISABLE_PLUGIN(parser, NULL, NULL);
  1051. if(start_time_str && end_time_str) {
  1052. time_t start_time = (time_t) str2ull_encoded(start_time_str);
  1053. time_t end_time = (time_t) str2ull_encoded(end_time_str);
  1054. time_t wall_clock_time = 0, tolerance;
  1055. bool wall_clock_comes_from_child; (void)wall_clock_comes_from_child;
  1056. if(child_now_str) {
  1057. wall_clock_time = (time_t) str2ull_encoded(child_now_str);
  1058. tolerance = st->update_every + 1;
  1059. wall_clock_comes_from_child = true;
  1060. }
  1061. if(wall_clock_time <= 0) {
  1062. wall_clock_time = now_realtime_sec();
  1063. tolerance = st->update_every + 5;
  1064. wall_clock_comes_from_child = false;
  1065. }
  1066. #ifdef NETDATA_LOG_REPLICATION_REQUESTS
  1067. internal_error(
  1068. (!st->replay.start_streaming && (end_time < st->replay.after || start_time > st->replay.before)),
  1069. "REPLAY ERROR: 'host:%s/chart:%s' got a " PLUGINSD_KEYWORD_REPLAY_BEGIN " from %ld to %ld, which does not match our request (%ld to %ld).",
  1070. rrdhost_hostname(st->rrdhost), rrdset_id(st), start_time, end_time, st->replay.after, st->replay.before);
  1071. internal_error(
  1072. true,
  1073. "REPLAY: 'host:%s/chart:%s' got a " PLUGINSD_KEYWORD_REPLAY_BEGIN " from %ld to %ld, child wall clock is %ld (%s), had requested %ld to %ld",
  1074. rrdhost_hostname(st->rrdhost), rrdset_id(st),
  1075. start_time, end_time, wall_clock_time, wall_clock_comes_from_child ? "from child" : "parent time",
  1076. st->replay.after, st->replay.before);
  1077. #endif
  1078. if(start_time && end_time && start_time < wall_clock_time + tolerance && end_time < wall_clock_time + tolerance && start_time < end_time) {
  1079. if (unlikely(end_time - start_time != st->update_every))
  1080. rrdset_set_update_every_s(st, end_time - start_time);
  1081. st->last_collected_time.tv_sec = end_time;
  1082. st->last_collected_time.tv_usec = 0;
  1083. st->last_updated.tv_sec = end_time;
  1084. st->last_updated.tv_usec = 0;
  1085. st->counter++;
  1086. st->counter_done++;
  1087. // these are only needed for db mode RAM, SAVE, MAP, ALLOC
  1088. st->db.current_entry++;
  1089. if(st->db.current_entry >= st->db.entries)
  1090. st->db.current_entry -= st->db.entries;
  1091. parser->user.replay.start_time = start_time;
  1092. parser->user.replay.end_time = end_time;
  1093. parser->user.replay.start_time_ut = (usec_t) start_time * USEC_PER_SEC;
  1094. parser->user.replay.end_time_ut = (usec_t) end_time * USEC_PER_SEC;
  1095. parser->user.replay.wall_clock_time = wall_clock_time;
  1096. parser->user.replay.rset_enabled = true;
  1097. return PARSER_RC_OK;
  1098. }
  1099. netdata_log_error("PLUGINSD REPLAY ERROR: 'host:%s/chart:%s' got a " PLUGINSD_KEYWORD_REPLAY_BEGIN
  1100. " from %ld to %ld, but timestamps are invalid "
  1101. "(now is %ld [%s], tolerance %ld). Ignoring " PLUGINSD_KEYWORD_REPLAY_SET,
  1102. rrdhost_hostname(st->rrdhost), rrdset_id(st), start_time, end_time,
  1103. wall_clock_time, wall_clock_comes_from_child ? "child wall clock" : "parent wall clock",
  1104. tolerance);
  1105. }
  1106. // the child sends an RBEGIN without any parameters initially
  1107. // setting rset_enabled to false, means the RSET should not store any metrics
  1108. // to store metrics, the RBEGIN needs to have timestamps
  1109. parser->user.replay.start_time = 0;
  1110. parser->user.replay.end_time = 0;
  1111. parser->user.replay.start_time_ut = 0;
  1112. parser->user.replay.end_time_ut = 0;
  1113. parser->user.replay.wall_clock_time = 0;
  1114. parser->user.replay.rset_enabled = false;
  1115. return PARSER_RC_OK;
  1116. }
  1117. static inline SN_FLAGS pluginsd_parse_storage_number_flags(const char *flags_str) {
  1118. SN_FLAGS flags = SN_FLAG_NONE;
  1119. char c;
  1120. while ((c = *flags_str++)) {
  1121. switch (c) {
  1122. case 'A':
  1123. flags |= SN_FLAG_NOT_ANOMALOUS;
  1124. break;
  1125. case 'R':
  1126. flags |= SN_FLAG_RESET;
  1127. break;
  1128. case 'E':
  1129. flags = SN_EMPTY_SLOT;
  1130. return flags;
  1131. default:
  1132. internal_error(true, "Unknown SN_FLAGS flag '%c'", c);
  1133. break;
  1134. }
  1135. }
  1136. return flags;
  1137. }
  1138. static inline PARSER_RC pluginsd_replay_set(char **words, size_t num_words, PARSER *parser) {
  1139. char *dimension = get_word(words, num_words, 1);
  1140. char *value_str = get_word(words, num_words, 2);
  1141. char *flags_str = get_word(words, num_words, 3);
  1142. RRDHOST *host = pluginsd_require_scope_host(parser, PLUGINSD_KEYWORD_REPLAY_SET);
  1143. if(!host) return PLUGINSD_DISABLE_PLUGIN(parser, NULL, NULL);
  1144. RRDSET *st = pluginsd_require_scope_chart(parser, PLUGINSD_KEYWORD_REPLAY_SET, PLUGINSD_KEYWORD_REPLAY_BEGIN);
  1145. if(!st) return PLUGINSD_DISABLE_PLUGIN(parser, NULL, NULL);
  1146. if(!parser->user.replay.rset_enabled) {
  1147. error_limit_static_thread_var(erl, 1, 0);
  1148. error_limit(&erl, "PLUGINSD: 'host:%s/chart:%s' got a %s but it is disabled by %s errors",
  1149. rrdhost_hostname(host), rrdset_id(st), PLUGINSD_KEYWORD_REPLAY_SET, PLUGINSD_KEYWORD_REPLAY_BEGIN);
  1150. // we have to return OK here
  1151. return PARSER_RC_OK;
  1152. }
  1153. RRDDIM *rd = pluginsd_acquire_dimension(host, st, dimension, PLUGINSD_KEYWORD_REPLAY_SET);
  1154. if(!rd) return PLUGINSD_DISABLE_PLUGIN(parser, NULL, NULL);
  1155. st->pluginsd.set = true;
  1156. if (unlikely(!parser->user.replay.start_time || !parser->user.replay.end_time)) {
  1157. netdata_log_error("PLUGINSD: 'host:%s/chart:%s/dim:%s' got a %s with invalid timestamps %ld to %ld from a %s. Disabling it.",
  1158. rrdhost_hostname(host),
  1159. rrdset_id(st),
  1160. dimension,
  1161. PLUGINSD_KEYWORD_REPLAY_SET,
  1162. parser->user.replay.start_time,
  1163. parser->user.replay.end_time,
  1164. PLUGINSD_KEYWORD_REPLAY_BEGIN);
  1165. return PLUGINSD_DISABLE_PLUGIN(parser, NULL, NULL);
  1166. }
  1167. if (unlikely(!value_str || !*value_str))
  1168. value_str = "NAN";
  1169. if(unlikely(!flags_str))
  1170. flags_str = "";
  1171. if (likely(value_str)) {
  1172. RRDDIM_FLAGS rd_flags = rrddim_flag_check(rd, RRDDIM_FLAG_OBSOLETE | RRDDIM_FLAG_ARCHIVED);
  1173. if(!(rd_flags & RRDDIM_FLAG_ARCHIVED)) {
  1174. NETDATA_DOUBLE value = str2ndd_encoded(value_str, NULL);
  1175. SN_FLAGS flags = pluginsd_parse_storage_number_flags(flags_str);
  1176. if (!netdata_double_isnumber(value) || (flags == SN_EMPTY_SLOT)) {
  1177. value = NAN;
  1178. flags = SN_EMPTY_SLOT;
  1179. }
  1180. rrddim_store_metric(rd, parser->user.replay.end_time_ut, value, flags);
  1181. rd->collector.last_collected_time.tv_sec = parser->user.replay.end_time;
  1182. rd->collector.last_collected_time.tv_usec = 0;
  1183. rd->collector.counter++;
  1184. }
  1185. else {
  1186. error_limit_static_global_var(erl, 1, 0);
  1187. error_limit(&erl, "PLUGINSD: 'host:%s/chart:%s/dim:%s' has the ARCHIVED flag set, but it is replicated. Ignoring data.",
  1188. rrdhost_hostname(st->rrdhost), rrdset_id(st), rrddim_name(rd));
  1189. }
  1190. }
  1191. return PARSER_RC_OK;
  1192. }
  1193. static inline PARSER_RC pluginsd_replay_rrddim_collection_state(char **words, size_t num_words, PARSER *parser) {
  1194. if(parser->user.replay.rset_enabled == false)
  1195. return PARSER_RC_OK;
  1196. char *dimension = get_word(words, num_words, 1);
  1197. char *last_collected_ut_str = get_word(words, num_words, 2);
  1198. char *last_collected_value_str = get_word(words, num_words, 3);
  1199. char *last_calculated_value_str = get_word(words, num_words, 4);
  1200. char *last_stored_value_str = get_word(words, num_words, 5);
  1201. RRDHOST *host = pluginsd_require_scope_host(parser, PLUGINSD_KEYWORD_REPLAY_RRDDIM_STATE);
  1202. if(!host) return PLUGINSD_DISABLE_PLUGIN(parser, NULL, NULL);
  1203. RRDSET *st = pluginsd_require_scope_chart(parser, PLUGINSD_KEYWORD_REPLAY_RRDDIM_STATE, PLUGINSD_KEYWORD_REPLAY_BEGIN);
  1204. if(!st) return PLUGINSD_DISABLE_PLUGIN(parser, NULL, NULL);
  1205. if(st->pluginsd.set) {
  1206. // reset pos to reuse the same RDAs
  1207. st->pluginsd.pos = 0;
  1208. st->pluginsd.set = false;
  1209. }
  1210. RRDDIM *rd = pluginsd_acquire_dimension(host, st, dimension, PLUGINSD_KEYWORD_REPLAY_RRDDIM_STATE);
  1211. if(!rd) return PLUGINSD_DISABLE_PLUGIN(parser, NULL, NULL);
  1212. usec_t dim_last_collected_ut = (usec_t)rd->collector.last_collected_time.tv_sec * USEC_PER_SEC + (usec_t)rd->collector.last_collected_time.tv_usec;
  1213. usec_t last_collected_ut = last_collected_ut_str ? str2ull_encoded(last_collected_ut_str) : 0;
  1214. if(last_collected_ut > dim_last_collected_ut) {
  1215. rd->collector.last_collected_time.tv_sec = (time_t)(last_collected_ut / USEC_PER_SEC);
  1216. rd->collector.last_collected_time.tv_usec = (last_collected_ut % USEC_PER_SEC);
  1217. }
  1218. rd->collector.last_collected_value = last_collected_value_str ? str2ll_encoded(last_collected_value_str) : 0;
  1219. rd->collector.last_calculated_value = last_calculated_value_str ? str2ndd_encoded(last_calculated_value_str, NULL) : 0;
  1220. rd->collector.last_stored_value = last_stored_value_str ? str2ndd_encoded(last_stored_value_str, NULL) : 0.0;
  1221. return PARSER_RC_OK;
  1222. }
  1223. static inline PARSER_RC pluginsd_replay_rrdset_collection_state(char **words, size_t num_words, PARSER *parser) {
  1224. if(parser->user.replay.rset_enabled == false)
  1225. return PARSER_RC_OK;
  1226. char *last_collected_ut_str = get_word(words, num_words, 1);
  1227. char *last_updated_ut_str = get_word(words, num_words, 2);
  1228. RRDHOST *host = pluginsd_require_scope_host(parser, PLUGINSD_KEYWORD_REPLAY_RRDSET_STATE);
  1229. if(!host) return PLUGINSD_DISABLE_PLUGIN(parser, NULL, NULL);
  1230. RRDSET *st = pluginsd_require_scope_chart(parser, PLUGINSD_KEYWORD_REPLAY_RRDSET_STATE,
  1231. PLUGINSD_KEYWORD_REPLAY_BEGIN);
  1232. if(!st) return PLUGINSD_DISABLE_PLUGIN(parser, NULL, NULL);
  1233. usec_t chart_last_collected_ut = (usec_t)st->last_collected_time.tv_sec * USEC_PER_SEC + (usec_t)st->last_collected_time.tv_usec;
  1234. usec_t last_collected_ut = last_collected_ut_str ? str2ull_encoded(last_collected_ut_str) : 0;
  1235. if(last_collected_ut > chart_last_collected_ut) {
  1236. st->last_collected_time.tv_sec = (time_t)(last_collected_ut / USEC_PER_SEC);
  1237. st->last_collected_time.tv_usec = (last_collected_ut % USEC_PER_SEC);
  1238. }
  1239. usec_t chart_last_updated_ut = (usec_t)st->last_updated.tv_sec * USEC_PER_SEC + (usec_t)st->last_updated.tv_usec;
  1240. usec_t last_updated_ut = last_updated_ut_str ? str2ull_encoded(last_updated_ut_str) : 0;
  1241. if(last_updated_ut > chart_last_updated_ut) {
  1242. st->last_updated.tv_sec = (time_t)(last_updated_ut / USEC_PER_SEC);
  1243. st->last_updated.tv_usec = (last_updated_ut % USEC_PER_SEC);
  1244. }
  1245. st->counter++;
  1246. st->counter_done++;
  1247. return PARSER_RC_OK;
  1248. }
  1249. static inline PARSER_RC pluginsd_replay_end(char **words, size_t num_words, PARSER *parser) {
  1250. if (num_words < 7) { // accepts 7, but the 7th is optional
  1251. netdata_log_error("REPLAY: malformed " PLUGINSD_KEYWORD_REPLAY_END " command");
  1252. return PARSER_RC_ERROR;
  1253. }
  1254. const char *update_every_child_txt = get_word(words, num_words, 1);
  1255. const char *first_entry_child_txt = get_word(words, num_words, 2);
  1256. const char *last_entry_child_txt = get_word(words, num_words, 3);
  1257. const char *start_streaming_txt = get_word(words, num_words, 4);
  1258. const char *first_entry_requested_txt = get_word(words, num_words, 5);
  1259. const char *last_entry_requested_txt = get_word(words, num_words, 6);
  1260. const char *child_world_time_txt = get_word(words, num_words, 7); // optional
  1261. time_t update_every_child = (time_t) str2ull_encoded(update_every_child_txt);
  1262. time_t first_entry_child = (time_t) str2ull_encoded(first_entry_child_txt);
  1263. time_t last_entry_child = (time_t) str2ull_encoded(last_entry_child_txt);
  1264. bool start_streaming = (strcmp(start_streaming_txt, "true") == 0);
  1265. time_t first_entry_requested = (time_t) str2ull_encoded(first_entry_requested_txt);
  1266. time_t last_entry_requested = (time_t) str2ull_encoded(last_entry_requested_txt);
  1267. // the optional child world time
  1268. time_t child_world_time = (child_world_time_txt && *child_world_time_txt) ? (time_t) str2ull_encoded(
  1269. child_world_time_txt) : now_realtime_sec();
  1270. RRDHOST *host = pluginsd_require_scope_host(parser, PLUGINSD_KEYWORD_REPLAY_END);
  1271. if(!host) return PLUGINSD_DISABLE_PLUGIN(parser, NULL, NULL);
  1272. RRDSET *st = pluginsd_require_scope_chart(parser, PLUGINSD_KEYWORD_REPLAY_END, PLUGINSD_KEYWORD_REPLAY_BEGIN);
  1273. if(!st) return PLUGINSD_DISABLE_PLUGIN(parser, NULL, NULL);
  1274. #ifdef NETDATA_LOG_REPLICATION_REQUESTS
  1275. internal_error(true,
  1276. "PLUGINSD REPLAY: 'host:%s/chart:%s': got a " PLUGINSD_KEYWORD_REPLAY_END " child db from %llu to %llu, start_streaming %s, had requested from %llu to %llu, wall clock %llu",
  1277. rrdhost_hostname(host), rrdset_id(st),
  1278. (unsigned long long)first_entry_child, (unsigned long long)last_entry_child,
  1279. start_streaming?"true":"false",
  1280. (unsigned long long)first_entry_requested, (unsigned long long)last_entry_requested,
  1281. (unsigned long long)child_world_time
  1282. );
  1283. #endif
  1284. parser->user.data_collections_count++;
  1285. if(parser->user.replay.rset_enabled && st->rrdhost->receiver) {
  1286. time_t now = now_realtime_sec();
  1287. time_t started = st->rrdhost->receiver->replication_first_time_t;
  1288. time_t current = parser->user.replay.end_time;
  1289. if(started && current > started) {
  1290. host->rrdpush_receiver_replication_percent = (NETDATA_DOUBLE) (current - started) * 100.0 / (NETDATA_DOUBLE) (now - started);
  1291. worker_set_metric(WORKER_RECEIVER_JOB_REPLICATION_COMPLETION,
  1292. host->rrdpush_receiver_replication_percent);
  1293. }
  1294. }
  1295. parser->user.replay.start_time = 0;
  1296. parser->user.replay.end_time = 0;
  1297. parser->user.replay.start_time_ut = 0;
  1298. parser->user.replay.end_time_ut = 0;
  1299. parser->user.replay.wall_clock_time = 0;
  1300. parser->user.replay.rset_enabled = false;
  1301. st->counter++;
  1302. st->counter_done++;
  1303. store_metric_collection_completed();
  1304. #ifdef NETDATA_LOG_REPLICATION_REQUESTS
  1305. st->replay.start_streaming = false;
  1306. st->replay.after = 0;
  1307. st->replay.before = 0;
  1308. if(start_streaming)
  1309. st->replay.log_next_data_collection = true;
  1310. #endif
  1311. if (start_streaming) {
  1312. if (st->update_every != update_every_child)
  1313. rrdset_set_update_every_s(st, update_every_child);
  1314. if(rrdset_flag_check(st, RRDSET_FLAG_RECEIVER_REPLICATION_IN_PROGRESS)) {
  1315. rrdset_flag_set(st, RRDSET_FLAG_RECEIVER_REPLICATION_FINISHED);
  1316. rrdset_flag_clear(st, RRDSET_FLAG_RECEIVER_REPLICATION_IN_PROGRESS);
  1317. rrdset_flag_clear(st, RRDSET_FLAG_SYNC_CLOCK);
  1318. rrdhost_receiver_replicating_charts_minus_one(st->rrdhost);
  1319. }
  1320. #ifdef NETDATA_LOG_REPLICATION_REQUESTS
  1321. else
  1322. internal_error(true, "REPLAY ERROR: 'host:%s/chart:%s' got a " PLUGINSD_KEYWORD_REPLAY_END " with enable_streaming = true, but there is no replication in progress for this chart.",
  1323. rrdhost_hostname(host), rrdset_id(st));
  1324. #endif
  1325. pluginsd_clear_scope_chart(parser, PLUGINSD_KEYWORD_REPLAY_END);
  1326. host->rrdpush_receiver_replication_percent = 100.0;
  1327. worker_set_metric(WORKER_RECEIVER_JOB_REPLICATION_COMPLETION, host->rrdpush_receiver_replication_percent);
  1328. return PARSER_RC_OK;
  1329. }
  1330. pluginsd_clear_scope_chart(parser, PLUGINSD_KEYWORD_REPLAY_END);
  1331. rrdcontext_updated_retention_rrdset(st);
  1332. bool ok = replicate_chart_request(send_to_plugin, parser, host, st,
  1333. first_entry_child, last_entry_child, child_world_time,
  1334. first_entry_requested, last_entry_requested);
  1335. return ok ? PARSER_RC_OK : PARSER_RC_ERROR;
  1336. }
  1337. static inline PARSER_RC pluginsd_begin_v2(char **words, size_t num_words, PARSER *parser) {
  1338. timing_init();
  1339. char *id = get_word(words, num_words, 1);
  1340. char *update_every_str = get_word(words, num_words, 2);
  1341. char *end_time_str = get_word(words, num_words, 3);
  1342. char *wall_clock_time_str = get_word(words, num_words, 4);
  1343. if(unlikely(!id || !update_every_str || !end_time_str || !wall_clock_time_str))
  1344. return PLUGINSD_DISABLE_PLUGIN(parser, PLUGINSD_KEYWORD_BEGIN_V2, "missing parameters");
  1345. RRDHOST *host = pluginsd_require_scope_host(parser, PLUGINSD_KEYWORD_BEGIN_V2);
  1346. if(unlikely(!host)) return PLUGINSD_DISABLE_PLUGIN(parser, NULL, NULL);
  1347. timing_step(TIMING_STEP_BEGIN2_PREPARE);
  1348. RRDSET *st = pluginsd_find_chart(host, id, PLUGINSD_KEYWORD_BEGIN_V2);
  1349. if(unlikely(!st)) return PLUGINSD_DISABLE_PLUGIN(parser, NULL, NULL);
  1350. if(!pluginsd_set_scope_chart(parser, st, PLUGINSD_KEYWORD_BEGIN_V2))
  1351. return PLUGINSD_DISABLE_PLUGIN(parser, NULL, NULL);
  1352. if(unlikely(rrdset_flag_check(st, RRDSET_FLAG_OBSOLETE)))
  1353. rrdset_isnot_obsolete(st);
  1354. timing_step(TIMING_STEP_BEGIN2_FIND_CHART);
  1355. // ------------------------------------------------------------------------
  1356. // parse the parameters
  1357. time_t update_every = (time_t) str2ull_encoded(update_every_str);
  1358. time_t end_time = (time_t) str2ull_encoded(end_time_str);
  1359. time_t wall_clock_time;
  1360. if(likely(*wall_clock_time_str == '#'))
  1361. wall_clock_time = end_time;
  1362. else
  1363. wall_clock_time = (time_t) str2ull_encoded(wall_clock_time_str);
  1364. if (unlikely(update_every != st->update_every))
  1365. rrdset_set_update_every_s(st, update_every);
  1366. timing_step(TIMING_STEP_BEGIN2_PARSE);
  1367. // ------------------------------------------------------------------------
  1368. // prepare our state
  1369. pluginsd_lock_rrdset_data_collection(parser);
  1370. parser->user.v2.update_every = update_every;
  1371. parser->user.v2.end_time = end_time;
  1372. parser->user.v2.wall_clock_time = wall_clock_time;
  1373. parser->user.v2.ml_locked = ml_chart_update_begin(st);
  1374. timing_step(TIMING_STEP_BEGIN2_ML);
  1375. // ------------------------------------------------------------------------
  1376. // propagate it forward in v2
  1377. if(!parser->user.v2.stream_buffer.wb && rrdhost_has_rrdpush_sender_enabled(st->rrdhost))
  1378. parser->user.v2.stream_buffer = rrdset_push_metric_initialize(parser->user.st, wall_clock_time);
  1379. if(parser->user.v2.stream_buffer.v2 && parser->user.v2.stream_buffer.wb) {
  1380. // check if receiver and sender have the same number parsing capabilities
  1381. bool can_copy = stream_has_capability(&parser->user, STREAM_CAP_IEEE754) == stream_has_capability(&parser->user.v2.stream_buffer, STREAM_CAP_IEEE754);
  1382. NUMBER_ENCODING encoding = stream_has_capability(&parser->user.v2.stream_buffer, STREAM_CAP_IEEE754) ? NUMBER_ENCODING_BASE64 : NUMBER_ENCODING_HEX;
  1383. BUFFER *wb = parser->user.v2.stream_buffer.wb;
  1384. buffer_need_bytes(wb, 1024);
  1385. if(unlikely(parser->user.v2.stream_buffer.begin_v2_added))
  1386. buffer_fast_strcat(wb, PLUGINSD_KEYWORD_END_V2 "\n", sizeof(PLUGINSD_KEYWORD_END_V2) - 1 + 1);
  1387. buffer_fast_strcat(wb, PLUGINSD_KEYWORD_BEGIN_V2 " '", sizeof(PLUGINSD_KEYWORD_BEGIN_V2) - 1 + 2);
  1388. buffer_fast_strcat(wb, rrdset_id(st), string_strlen(st->id));
  1389. buffer_fast_strcat(wb, "' ", 2);
  1390. if(can_copy)
  1391. buffer_strcat(wb, update_every_str);
  1392. else
  1393. buffer_print_uint64_encoded(wb, encoding, update_every);
  1394. buffer_fast_strcat(wb, " ", 1);
  1395. if(can_copy)
  1396. buffer_strcat(wb, end_time_str);
  1397. else
  1398. buffer_print_uint64_encoded(wb, encoding, end_time);
  1399. buffer_fast_strcat(wb, " ", 1);
  1400. if(can_copy)
  1401. buffer_strcat(wb, wall_clock_time_str);
  1402. else
  1403. buffer_print_uint64_encoded(wb, encoding, wall_clock_time);
  1404. buffer_fast_strcat(wb, "\n", 1);
  1405. parser->user.v2.stream_buffer.last_point_end_time_s = end_time;
  1406. parser->user.v2.stream_buffer.begin_v2_added = true;
  1407. }
  1408. timing_step(TIMING_STEP_BEGIN2_PROPAGATE);
  1409. // ------------------------------------------------------------------------
  1410. // store it
  1411. st->last_collected_time.tv_sec = end_time;
  1412. st->last_collected_time.tv_usec = 0;
  1413. st->last_updated.tv_sec = end_time;
  1414. st->last_updated.tv_usec = 0;
  1415. st->counter++;
  1416. st->counter_done++;
  1417. // these are only needed for db mode RAM, SAVE, MAP, ALLOC
  1418. st->db.current_entry++;
  1419. if(st->db.current_entry >= st->db.entries)
  1420. st->db.current_entry -= st->db.entries;
  1421. timing_step(TIMING_STEP_BEGIN2_STORE);
  1422. return PARSER_RC_OK;
  1423. }
  1424. static inline PARSER_RC pluginsd_set_v2(char **words, size_t num_words, PARSER *parser) {
  1425. timing_init();
  1426. char *dimension = get_word(words, num_words, 1);
  1427. char *collected_str = get_word(words, num_words, 2);
  1428. char *value_str = get_word(words, num_words, 3);
  1429. char *flags_str = get_word(words, num_words, 4);
  1430. if(unlikely(!dimension || !collected_str || !value_str || !flags_str))
  1431. return PLUGINSD_DISABLE_PLUGIN(parser, PLUGINSD_KEYWORD_SET_V2, "missing parameters");
  1432. RRDHOST *host = pluginsd_require_scope_host(parser, PLUGINSD_KEYWORD_SET_V2);
  1433. if(unlikely(!host)) return PLUGINSD_DISABLE_PLUGIN(parser, NULL, NULL);
  1434. RRDSET *st = pluginsd_require_scope_chart(parser, PLUGINSD_KEYWORD_SET_V2, PLUGINSD_KEYWORD_BEGIN_V2);
  1435. if(unlikely(!st)) return PLUGINSD_DISABLE_PLUGIN(parser, NULL, NULL);
  1436. timing_step(TIMING_STEP_SET2_PREPARE);
  1437. RRDDIM *rd = pluginsd_acquire_dimension(host, st, dimension, PLUGINSD_KEYWORD_SET_V2);
  1438. if(unlikely(!rd)) return PLUGINSD_DISABLE_PLUGIN(parser, NULL, NULL);
  1439. st->pluginsd.set = true;
  1440. if(unlikely(rrddim_flag_check(rd, RRDDIM_FLAG_OBSOLETE | RRDDIM_FLAG_ARCHIVED)))
  1441. rrddim_isnot_obsolete(st, rd);
  1442. timing_step(TIMING_STEP_SET2_LOOKUP_DIMENSION);
  1443. // ------------------------------------------------------------------------
  1444. // parse the parameters
  1445. collected_number collected_value = (collected_number) str2ll_encoded(collected_str);
  1446. NETDATA_DOUBLE value;
  1447. if(*value_str == '#')
  1448. value = (NETDATA_DOUBLE)collected_value;
  1449. else
  1450. value = str2ndd_encoded(value_str, NULL);
  1451. SN_FLAGS flags = pluginsd_parse_storage_number_flags(flags_str);
  1452. timing_step(TIMING_STEP_SET2_PARSE);
  1453. // ------------------------------------------------------------------------
  1454. // check value and ML
  1455. if (unlikely(!netdata_double_isnumber(value) || (flags == SN_EMPTY_SLOT))) {
  1456. value = NAN;
  1457. flags = SN_EMPTY_SLOT;
  1458. if(parser->user.v2.ml_locked)
  1459. ml_dimension_is_anomalous(rd, parser->user.v2.end_time, 0, false);
  1460. }
  1461. else if(parser->user.v2.ml_locked) {
  1462. if (ml_dimension_is_anomalous(rd, parser->user.v2.end_time, value, true)) {
  1463. // clear anomaly bit: 0 -> is anomalous, 1 -> not anomalous
  1464. flags &= ~((storage_number) SN_FLAG_NOT_ANOMALOUS);
  1465. }
  1466. else
  1467. flags |= SN_FLAG_NOT_ANOMALOUS;
  1468. }
  1469. timing_step(TIMING_STEP_SET2_ML);
  1470. // ------------------------------------------------------------------------
  1471. // propagate it forward in v2
  1472. if(parser->user.v2.stream_buffer.v2 && parser->user.v2.stream_buffer.begin_v2_added && parser->user.v2.stream_buffer.wb) {
  1473. // check if receiver and sender have the same number parsing capabilities
  1474. bool can_copy = stream_has_capability(&parser->user, STREAM_CAP_IEEE754) == stream_has_capability(&parser->user.v2.stream_buffer, STREAM_CAP_IEEE754);
  1475. NUMBER_ENCODING integer_encoding = stream_has_capability(&parser->user.v2.stream_buffer, STREAM_CAP_IEEE754) ? NUMBER_ENCODING_BASE64 : NUMBER_ENCODING_HEX;
  1476. NUMBER_ENCODING doubles_encoding = stream_has_capability(&parser->user.v2.stream_buffer, STREAM_CAP_IEEE754) ? NUMBER_ENCODING_BASE64 : NUMBER_ENCODING_DECIMAL;
  1477. BUFFER *wb = parser->user.v2.stream_buffer.wb;
  1478. buffer_need_bytes(wb, 1024);
  1479. buffer_fast_strcat(wb, PLUGINSD_KEYWORD_SET_V2 " '", sizeof(PLUGINSD_KEYWORD_SET_V2) - 1 + 2);
  1480. buffer_fast_strcat(wb, rrddim_id(rd), string_strlen(rd->id));
  1481. buffer_fast_strcat(wb, "' ", 2);
  1482. if(can_copy)
  1483. buffer_strcat(wb, collected_str);
  1484. else
  1485. buffer_print_int64_encoded(wb, integer_encoding, collected_value); // original v2 had hex
  1486. buffer_fast_strcat(wb, " ", 1);
  1487. if(can_copy)
  1488. buffer_strcat(wb, value_str);
  1489. else
  1490. buffer_print_netdata_double_encoded(wb, doubles_encoding, value); // original v2 had decimal
  1491. buffer_fast_strcat(wb, " ", 1);
  1492. buffer_print_sn_flags(wb, flags, true);
  1493. buffer_fast_strcat(wb, "\n", 1);
  1494. }
  1495. timing_step(TIMING_STEP_SET2_PROPAGATE);
  1496. // ------------------------------------------------------------------------
  1497. // store it
  1498. rrddim_store_metric(rd, parser->user.v2.end_time * USEC_PER_SEC, value, flags);
  1499. rd->collector.last_collected_time.tv_sec = parser->user.v2.end_time;
  1500. rd->collector.last_collected_time.tv_usec = 0;
  1501. rd->collector.last_collected_value = collected_value;
  1502. rd->collector.last_stored_value = value;
  1503. rd->collector.last_calculated_value = value;
  1504. rd->collector.counter++;
  1505. rrddim_set_updated(rd);
  1506. timing_step(TIMING_STEP_SET2_STORE);
  1507. return PARSER_RC_OK;
  1508. }
  1509. void pluginsd_cleanup_v2(PARSER *parser) {
  1510. // this is called when the thread is stopped while processing
  1511. pluginsd_clear_scope_chart(parser, "THREAD CLEANUP");
  1512. }
  1513. static inline PARSER_RC pluginsd_end_v2(char **words __maybe_unused, size_t num_words __maybe_unused, PARSER *parser) {
  1514. timing_init();
  1515. RRDHOST *host = pluginsd_require_scope_host(parser, PLUGINSD_KEYWORD_END_V2);
  1516. if(unlikely(!host)) return PLUGINSD_DISABLE_PLUGIN(parser, NULL, NULL);
  1517. RRDSET *st = pluginsd_require_scope_chart(parser, PLUGINSD_KEYWORD_END_V2, PLUGINSD_KEYWORD_BEGIN_V2);
  1518. if(unlikely(!st)) return PLUGINSD_DISABLE_PLUGIN(parser, NULL, NULL);
  1519. parser->user.data_collections_count++;
  1520. timing_step(TIMING_STEP_END2_PREPARE);
  1521. // ------------------------------------------------------------------------
  1522. // propagate the whole chart update in v1
  1523. if(unlikely(!parser->user.v2.stream_buffer.v2 && !parser->user.v2.stream_buffer.begin_v2_added && parser->user.v2.stream_buffer.wb))
  1524. rrdset_push_metrics_v1(&parser->user.v2.stream_buffer, st);
  1525. timing_step(TIMING_STEP_END2_PUSH_V1);
  1526. // ------------------------------------------------------------------------
  1527. // unblock data collection
  1528. pluginsd_unlock_previous_scope_chart(parser, PLUGINSD_KEYWORD_END_V2, false);
  1529. rrdcontext_collected_rrdset(st);
  1530. store_metric_collection_completed();
  1531. timing_step(TIMING_STEP_END2_RRDSET);
  1532. // ------------------------------------------------------------------------
  1533. // propagate it forward
  1534. rrdset_push_metrics_finished(&parser->user.v2.stream_buffer, st);
  1535. timing_step(TIMING_STEP_END2_PROPAGATE);
  1536. // ------------------------------------------------------------------------
  1537. // cleanup RRDSET / RRDDIM
  1538. RRDDIM *rd;
  1539. rrddim_foreach_read(rd, st) {
  1540. rd->collector.calculated_value = 0;
  1541. rd->collector.collected_value = 0;
  1542. rrddim_clear_updated(rd);
  1543. }
  1544. rrddim_foreach_done(rd);
  1545. // ------------------------------------------------------------------------
  1546. // reset state
  1547. parser->user.v2 = (struct parser_user_object_v2){ 0 };
  1548. timing_step(TIMING_STEP_END2_STORE);
  1549. timing_report();
  1550. return PARSER_RC_OK;
  1551. }
  1552. static inline PARSER_RC pluginsd_exit(char **words __maybe_unused, size_t num_words __maybe_unused, PARSER *parser __maybe_unused) {
  1553. netdata_log_info("PLUGINSD: plugin called EXIT.");
  1554. return PARSER_RC_STOP;
  1555. }
  1556. struct mutex_cond {
  1557. pthread_mutex_t lock;
  1558. pthread_cond_t cond;
  1559. int rc;
  1560. };
  1561. static void virt_fnc_got_data_cb(BUFFER *wb __maybe_unused, int code, void *callback_data)
  1562. {
  1563. struct mutex_cond *ctx = callback_data;
  1564. pthread_mutex_lock(&ctx->lock);
  1565. ctx->rc = code;
  1566. pthread_cond_broadcast(&ctx->cond);
  1567. pthread_mutex_unlock(&ctx->lock);
  1568. }
  1569. #define VIRT_FNC_TIMEOUT 1
  1570. #define VIRT_FNC_BUF_SIZE (4096)
  1571. void call_virtual_function_async(BUFFER *wb, RRDHOST *host, const char *name, const char *payload, rrd_function_result_callback_t callback, void *callback_data) {
  1572. PARSER *parser = NULL;
  1573. //TODO simplify (as we really need only first parameter to get plugin name maybe we can avoid parsing all)
  1574. char *words[PLUGINSD_MAX_WORDS];
  1575. char *function_with_params = strdupz(name);
  1576. size_t num_words = quoted_strings_splitter(function_with_params, words, PLUGINSD_MAX_WORDS, isspace_map_pluginsd);
  1577. if (num_words < 2) {
  1578. netdata_log_error("PLUGINSD: virtual function name is empty.");
  1579. freez(function_with_params);
  1580. return;
  1581. }
  1582. const DICTIONARY_ITEM *cpi = dictionary_get_and_acquire_item(host->configurable_plugins, get_word(words, num_words, 1));
  1583. if (unlikely(cpi == NULL)) {
  1584. netdata_log_error("PLUGINSD: virtual function plugin '%s' not found.", name);
  1585. freez(function_with_params);
  1586. return;
  1587. }
  1588. struct configurable_plugin *cp = dictionary_acquired_item_value(cpi);
  1589. parser = (PARSER *)cp->cb_usr_ctx;
  1590. BUFFER *function_out = buffer_create(VIRT_FNC_BUF_SIZE, NULL);
  1591. // if we are forwarding this to a plugin (as opposed to streaming/child) we have to remove the first parameter (plugin_name)
  1592. buffer_strcat(function_out, get_word(words, num_words, 0));
  1593. for (size_t i = 1; i < num_words; i++) {
  1594. if (i == 1 && SERVING_PLUGINSD(parser))
  1595. continue;
  1596. buffer_sprintf(function_out, " %s", get_word(words, num_words, i));
  1597. }
  1598. freez(function_with_params);
  1599. usec_t now = now_realtime_usec();
  1600. struct inflight_function tmp = {
  1601. .started_ut = now,
  1602. .timeout_ut = now + VIRT_FNC_TIMEOUT + USEC_PER_SEC,
  1603. .result_body_wb = wb,
  1604. .timeout = VIRT_FNC_TIMEOUT * 10,
  1605. .function = string_strdupz(buffer_tostring(function_out)),
  1606. .result_cb = callback,
  1607. .result_cb_data = callback_data,
  1608. .payload = payload != NULL ? strdupz(payload) : NULL,
  1609. .virtual = true,
  1610. };
  1611. buffer_free(function_out);
  1612. uuid_t uuid;
  1613. uuid_generate_time(uuid);
  1614. char key[UUID_STR_LEN];
  1615. uuid_unparse_lower(uuid, key);
  1616. dictionary_write_lock(parser->inflight.functions);
  1617. // if there is any error, our dictionary callbacks will call the caller callback to notify
  1618. // the caller about the error - no need for error handling here.
  1619. dictionary_set(parser->inflight.functions, key, &tmp, sizeof(struct inflight_function));
  1620. if(!parser->inflight.smaller_timeout || tmp.timeout_ut < parser->inflight.smaller_timeout)
  1621. parser->inflight.smaller_timeout = tmp.timeout_ut;
  1622. // garbage collect stale inflight functions
  1623. if(parser->inflight.smaller_timeout < now)
  1624. inflight_functions_garbage_collect(parser, now);
  1625. dictionary_write_unlock(parser->inflight.functions);
  1626. }
  1627. dyncfg_config_t call_virtual_function_blocking(PARSER *parser, const char *name, int *rc, const char *payload) {
  1628. usec_t now = now_realtime_usec();
  1629. BUFFER *wb = buffer_create(VIRT_FNC_BUF_SIZE, NULL);
  1630. struct mutex_cond cond = {
  1631. .lock = PTHREAD_MUTEX_INITIALIZER,
  1632. .cond = PTHREAD_COND_INITIALIZER
  1633. };
  1634. struct inflight_function tmp = {
  1635. .started_ut = now,
  1636. .timeout_ut = now + VIRT_FNC_TIMEOUT + USEC_PER_SEC,
  1637. .result_body_wb = wb,
  1638. .timeout = VIRT_FNC_TIMEOUT,
  1639. .function = string_strdupz(name),
  1640. .result_cb = virt_fnc_got_data_cb,
  1641. .result_cb_data = &cond,
  1642. .payload = payload != NULL ? strdupz(payload) : NULL,
  1643. .virtual = true,
  1644. };
  1645. uuid_t uuid;
  1646. uuid_generate_time(uuid);
  1647. char key[UUID_STR_LEN];
  1648. uuid_unparse_lower(uuid, key);
  1649. dictionary_write_lock(parser->inflight.functions);
  1650. // if there is any error, our dictionary callbacks will call the caller callback to notify
  1651. // the caller about the error - no need for error handling here.
  1652. dictionary_set(parser->inflight.functions, key, &tmp, sizeof(struct inflight_function));
  1653. if(!parser->inflight.smaller_timeout || tmp.timeout_ut < parser->inflight.smaller_timeout)
  1654. parser->inflight.smaller_timeout = tmp.timeout_ut;
  1655. // garbage collect stale inflight functions
  1656. if(parser->inflight.smaller_timeout < now)
  1657. inflight_functions_garbage_collect(parser, now);
  1658. dictionary_write_unlock(parser->inflight.functions);
  1659. struct timespec tp;
  1660. clock_gettime(CLOCK_REALTIME, &tp);
  1661. tp.tv_sec += (time_t)VIRT_FNC_TIMEOUT;
  1662. pthread_mutex_lock(&cond.lock);
  1663. int ret = pthread_cond_timedwait(&cond.cond, &cond.lock, &tp);
  1664. if (ret == ETIMEDOUT)
  1665. netdata_log_error("PLUGINSD: DYNCFG virtual function %s timed out", name);
  1666. pthread_mutex_unlock(&cond.lock);
  1667. dyncfg_config_t cfg;
  1668. cfg.data = strdupz(buffer_tostring(wb));
  1669. cfg.data_size = buffer_strlen(wb);
  1670. if (rc != NULL)
  1671. *rc = cond.rc;
  1672. buffer_free(wb);
  1673. return cfg;
  1674. }
  1675. #define CVF_MAX_LEN (1024)
  1676. static dyncfg_config_t get_plugin_config_cb(void *usr_ctx, const char *plugin_name)
  1677. {
  1678. PARSER *parser = usr_ctx;
  1679. if (SERVING_STREAMING(parser)) {
  1680. char buf[CVF_MAX_LEN + 1];
  1681. snprintfz(buf, CVF_MAX_LEN, FUNCTION_NAME_GET_PLUGIN_CONFIG " %s", plugin_name);
  1682. return call_virtual_function_blocking(parser, buf, NULL, NULL);
  1683. }
  1684. return call_virtual_function_blocking(parser, FUNCTION_NAME_GET_PLUGIN_CONFIG, NULL, NULL);
  1685. }
  1686. static dyncfg_config_t get_plugin_config_schema_cb(void *usr_ctx, const char *plugin_name)
  1687. {
  1688. PARSER *parser = usr_ctx;
  1689. if (SERVING_STREAMING(parser)) {
  1690. char buf[CVF_MAX_LEN + 1];
  1691. snprintfz(buf, CVF_MAX_LEN, FUNCTION_NAME_GET_PLUGIN_CONFIG_SCHEMA " %s", plugin_name);
  1692. return call_virtual_function_blocking(parser, buf, NULL, NULL);
  1693. }
  1694. return call_virtual_function_blocking(parser, "get_plugin_config_schema", NULL, NULL);
  1695. }
  1696. static dyncfg_config_t get_module_config_cb(void *usr_ctx, const char *plugin_name, const char *module_name)
  1697. {
  1698. PARSER *parser = usr_ctx;
  1699. BUFFER *wb = buffer_create(CVF_MAX_LEN, NULL);
  1700. buffer_strcat(wb, FUNCTION_NAME_GET_MODULE_CONFIG);
  1701. if (SERVING_STREAMING(parser))
  1702. buffer_sprintf(wb, " %s", plugin_name);
  1703. buffer_sprintf(wb, " %s", module_name);
  1704. dyncfg_config_t ret = call_virtual_function_blocking(parser, buffer_tostring(wb), NULL, NULL);
  1705. buffer_free(wb);
  1706. return ret;
  1707. }
  1708. static dyncfg_config_t get_module_config_schema_cb(void *usr_ctx, const char *plugin_name, const char *module_name)
  1709. {
  1710. PARSER *parser = usr_ctx;
  1711. BUFFER *wb = buffer_create(CVF_MAX_LEN, NULL);
  1712. buffer_strcat(wb, FUNCTION_NAME_GET_MODULE_CONFIG_SCHEMA);
  1713. if (SERVING_STREAMING(parser))
  1714. buffer_sprintf(wb, " %s", plugin_name);
  1715. buffer_sprintf(wb, " %s", module_name);
  1716. dyncfg_config_t ret = call_virtual_function_blocking(parser, buffer_tostring(wb), NULL, NULL);
  1717. buffer_free(wb);
  1718. return ret;
  1719. }
  1720. static dyncfg_config_t get_job_config_schema_cb(void *usr_ctx, const char *plugin_name, const char *module_name)
  1721. {
  1722. PARSER *parser = usr_ctx;
  1723. BUFFER *wb = buffer_create(CVF_MAX_LEN, NULL);
  1724. buffer_strcat(wb, FUNCTION_NAME_GET_JOB_CONFIG_SCHEMA);
  1725. if (SERVING_STREAMING(parser))
  1726. buffer_sprintf(wb, " %s", plugin_name);
  1727. buffer_sprintf(wb, " %s", module_name);
  1728. dyncfg_config_t ret = call_virtual_function_blocking(parser, buffer_tostring(wb), NULL, NULL);
  1729. buffer_free(wb);
  1730. return ret;
  1731. }
  1732. static dyncfg_config_t get_job_config_cb(void *usr_ctx, const char *plugin_name, const char *module_name, const char* job_name)
  1733. {
  1734. PARSER *parser = usr_ctx;
  1735. BUFFER *wb = buffer_create(CVF_MAX_LEN, NULL);
  1736. buffer_strcat(wb, FUNCTION_NAME_GET_JOB_CONFIG);
  1737. if (SERVING_STREAMING(parser))
  1738. buffer_sprintf(wb, " %s", plugin_name);
  1739. buffer_sprintf(wb, " %s %s", module_name, job_name);
  1740. dyncfg_config_t ret = call_virtual_function_blocking(parser, buffer_tostring(wb), NULL, NULL);
  1741. buffer_free(wb);
  1742. return ret;
  1743. }
  1744. enum set_config_result set_plugin_config_cb(void *usr_ctx, const char *plugin_name, dyncfg_config_t *cfg)
  1745. {
  1746. PARSER *parser = usr_ctx;
  1747. BUFFER *wb = buffer_create(CVF_MAX_LEN, NULL);
  1748. buffer_strcat(wb, FUNCTION_NAME_SET_PLUGIN_CONFIG);
  1749. if (SERVING_STREAMING(parser))
  1750. buffer_sprintf(wb, " %s", plugin_name);
  1751. int rc;
  1752. call_virtual_function_blocking(parser, buffer_tostring(wb), &rc, cfg->data);
  1753. buffer_free(wb);
  1754. if(rc != DYNCFG_VFNC_RET_CFG_ACCEPTED)
  1755. return SET_CONFIG_REJECTED;
  1756. return SET_CONFIG_ACCEPTED;
  1757. }
  1758. enum set_config_result set_module_config_cb(void *usr_ctx, const char *plugin_name, const char *module_name, dyncfg_config_t *cfg)
  1759. {
  1760. PARSER *parser = usr_ctx;
  1761. BUFFER *wb = buffer_create(CVF_MAX_LEN, NULL);
  1762. buffer_strcat(wb, FUNCTION_NAME_SET_MODULE_CONFIG);
  1763. if (SERVING_STREAMING(parser))
  1764. buffer_sprintf(wb, " %s", plugin_name);
  1765. buffer_sprintf(wb, " %s", module_name);
  1766. int rc;
  1767. call_virtual_function_blocking(parser, buffer_tostring(wb), &rc, cfg->data);
  1768. buffer_free(wb);
  1769. if(rc != DYNCFG_VFNC_RET_CFG_ACCEPTED)
  1770. return SET_CONFIG_REJECTED;
  1771. return SET_CONFIG_ACCEPTED;
  1772. }
  1773. enum set_config_result set_job_config_cb(void *usr_ctx, const char *plugin_name, const char *module_name, const char *job_name, dyncfg_config_t *cfg)
  1774. {
  1775. PARSER *parser = usr_ctx;
  1776. BUFFER *wb = buffer_create(CVF_MAX_LEN, NULL);
  1777. buffer_strcat(wb, FUNCTION_NAME_SET_JOB_CONFIG);
  1778. if (SERVING_STREAMING(parser))
  1779. buffer_sprintf(wb, " %s", plugin_name);
  1780. buffer_sprintf(wb, " %s %s", module_name, job_name);
  1781. int rc;
  1782. call_virtual_function_blocking(parser, buffer_tostring(wb), &rc, cfg->data);
  1783. buffer_free(wb);
  1784. if(rc != DYNCFG_VFNC_RET_CFG_ACCEPTED)
  1785. return SET_CONFIG_REJECTED;
  1786. return SET_CONFIG_ACCEPTED;
  1787. }
  1788. enum set_config_result delete_job_cb(void *usr_ctx, const char *plugin_name ,const char *module_name, const char *job_name)
  1789. {
  1790. PARSER *parser = usr_ctx;
  1791. BUFFER *wb = buffer_create(CVF_MAX_LEN, NULL);
  1792. buffer_strcat(wb, FUNCTION_NAME_DELETE_JOB);
  1793. if (SERVING_STREAMING(parser))
  1794. buffer_sprintf(wb, " %s", plugin_name);
  1795. buffer_sprintf(wb, " %s %s", module_name, job_name);
  1796. int rc;
  1797. call_virtual_function_blocking(parser, buffer_tostring(wb), &rc, NULL);
  1798. buffer_free(wb);
  1799. if(rc != DYNCFG_VFNC_RET_CFG_ACCEPTED)
  1800. return SET_CONFIG_REJECTED;
  1801. return SET_CONFIG_ACCEPTED;
  1802. }
  1803. static inline PARSER_RC pluginsd_register_plugin(char **words __maybe_unused, size_t num_words __maybe_unused, PARSER *parser __maybe_unused) {
  1804. netdata_log_info("PLUGINSD: DYNCFG_ENABLE");
  1805. if (unlikely (num_words != 2))
  1806. return PLUGINSD_DISABLE_PLUGIN(parser, PLUGINSD_KEYWORD_DYNCFG_ENABLE, "missing name parameter");
  1807. struct configurable_plugin *cfg = callocz(1, sizeof(struct configurable_plugin));
  1808. cfg->name = strdupz(words[1]);
  1809. cfg->set_config_cb = set_plugin_config_cb;
  1810. cfg->get_config_cb = get_plugin_config_cb;
  1811. cfg->get_config_schema_cb = get_plugin_config_schema_cb;
  1812. cfg->cb_usr_ctx = parser;
  1813. const DICTIONARY_ITEM *di = register_plugin(parser->user.host->configurable_plugins, cfg, SERVING_PLUGINSD(parser));
  1814. if (unlikely(di == NULL)) {
  1815. freez(cfg->name);
  1816. freez(cfg);
  1817. return PLUGINSD_DISABLE_PLUGIN(parser, PLUGINSD_KEYWORD_DYNCFG_ENABLE, "error registering plugin");
  1818. }
  1819. if (SERVING_PLUGINSD(parser)) {
  1820. // this is optimization for pluginsd to avoid extra dictionary lookup
  1821. // as we know which plugin is comunicating with us
  1822. parser->user.cd->cfg_dict_item = di;
  1823. parser->user.cd->configuration = cfg;
  1824. } else {
  1825. // register_plugin keeps the item acquired, so we need to release it
  1826. dictionary_acquired_item_release(parser->user.host->configurable_plugins, di);
  1827. }
  1828. rrdpush_send_dyncfg_enable(parser->user.host, cfg->name);
  1829. return PARSER_RC_OK;
  1830. }
  1831. #define LOG_MSG_SIZE (1024)
  1832. #define MODULE_NAME_IDX (SERVING_PLUGINSD(parser) ? 1 : 2)
  1833. #define MODULE_TYPE_IDX (SERVING_PLUGINSD(parser) ? 2 : 3)
  1834. static inline PARSER_RC pluginsd_register_module(char **words __maybe_unused, size_t num_words __maybe_unused, PARSER *parser __maybe_unused) {
  1835. netdata_log_info("PLUGINSD: DYNCFG_REG_MODULE");
  1836. size_t expected_num_words = SERVING_PLUGINSD(parser) ? 3 : 4;
  1837. if (unlikely(num_words != expected_num_words)) {
  1838. char log[LOG_MSG_SIZE + 1];
  1839. snprintfz(log, LOG_MSG_SIZE, "expected %zu (got %zu) parameters: %smodule_name module_type", expected_num_words - 1, num_words - 1, SERVING_PLUGINSD(parser) ? "" : "plugin_name ");
  1840. return PLUGINSD_DISABLE_PLUGIN(parser, PLUGINSD_KEYWORD_DYNCFG_REGISTER_MODULE, log);
  1841. }
  1842. struct configurable_plugin *plug_cfg;
  1843. const DICTIONARY_ITEM *di = NULL;
  1844. if (SERVING_PLUGINSD(parser)) {
  1845. plug_cfg = parser->user.cd->configuration;
  1846. if (unlikely(plug_cfg == NULL))
  1847. return PLUGINSD_DISABLE_PLUGIN(parser, PLUGINSD_KEYWORD_DYNCFG_REGISTER_MODULE, "you have to enable dynamic configuration first using " PLUGINSD_KEYWORD_DYNCFG_ENABLE);
  1848. } else {
  1849. di = dictionary_get_and_acquire_item(parser->user.host->configurable_plugins, words[1]);
  1850. if (unlikely(di == NULL))
  1851. return PLUGINSD_DISABLE_PLUGIN(parser, PLUGINSD_KEYWORD_DYNCFG_REGISTER_MODULE, "plugin not found");
  1852. plug_cfg = (struct configurable_plugin *)dictionary_acquired_item_value(di);
  1853. }
  1854. struct module *mod = callocz(1, sizeof(struct module));
  1855. mod->type = str2_module_type(words[MODULE_TYPE_IDX]);
  1856. if (unlikely(mod->type == MOD_TYPE_UNKNOWN)) {
  1857. freez(mod);
  1858. return PLUGINSD_DISABLE_PLUGIN(parser, PLUGINSD_KEYWORD_DYNCFG_REGISTER_MODULE, "unknown module type (allowed: job_array, single)");
  1859. }
  1860. mod->name = strdupz(words[MODULE_NAME_IDX]);
  1861. mod->set_config_cb = set_module_config_cb;
  1862. mod->get_config_cb = get_module_config_cb;
  1863. mod->get_config_schema_cb = get_module_config_schema_cb;
  1864. mod->config_cb_usr_ctx = parser;
  1865. mod->get_job_config_cb = get_job_config_cb;
  1866. mod->get_job_config_schema_cb = get_job_config_schema_cb;
  1867. mod->set_job_config_cb = set_job_config_cb;
  1868. mod->delete_job_cb = delete_job_cb;
  1869. mod->job_config_cb_usr_ctx = parser;
  1870. register_module(parser->user.host->configurable_plugins, plug_cfg, mod, SERVING_PLUGINSD(parser));
  1871. if (di != NULL)
  1872. dictionary_acquired_item_release(parser->user.host->configurable_plugins, di);
  1873. rrdpush_send_dyncfg_reg_module(parser->user.host, plug_cfg->name, mod->name, mod->type);
  1874. return PARSER_RC_OK;
  1875. }
  1876. static inline PARSER_RC pluginsd_register_job_common(char **words __maybe_unused, size_t num_words __maybe_unused, PARSER *parser __maybe_unused, const char *plugin_name) {
  1877. if (atol(words[3]) < 0)
  1878. return PLUGINSD_DISABLE_PLUGIN(parser, PLUGINSD_KEYWORD_DYNCFG_REGISTER_JOB, "invalid flags");
  1879. dyncfg_job_flg_t flags = atol(words[3]);
  1880. if (SERVING_PLUGINSD(parser))
  1881. flags |= JOB_FLG_PLUGIN_PUSHED;
  1882. else
  1883. flags |= JOB_FLG_STREAMING_PUSHED;
  1884. enum job_type job_type = str2job_type(words[2]);
  1885. if (job_type == JOB_TYPE_UNKNOWN)
  1886. return PLUGINSD_DISABLE_PLUGIN(parser, PLUGINSD_KEYWORD_DYNCFG_REGISTER_JOB, "unknown job type");
  1887. if (SERVING_PLUGINSD(parser) && job_type == JOB_TYPE_USER)
  1888. return PLUGINSD_DISABLE_PLUGIN(parser, PLUGINSD_KEYWORD_DYNCFG_REGISTER_JOB, "plugins cannot push jobs of type \"user\" (this is allowed only in streaming)");
  1889. if (register_job(parser->user.host->configurable_plugins, plugin_name, words[0], words[1], job_type, flags, 0)) // ignore existing is off as this is explicitly called register job
  1890. return PLUGINSD_DISABLE_PLUGIN(parser, PLUGINSD_KEYWORD_DYNCFG_REGISTER_JOB, "error registering job");
  1891. rrdpush_send_dyncfg_reg_job(parser->user.host, plugin_name, words[0], words[1], job_type, flags);
  1892. return PARSER_RC_OK;
  1893. }
  1894. static inline PARSER_RC pluginsd_register_job(char **words __maybe_unused, size_t num_words __maybe_unused, PARSER *parser __maybe_unused) {
  1895. size_t expected_num_words = SERVING_PLUGINSD(parser) ? 5 : 6;
  1896. if (unlikely(num_words != expected_num_words)) {
  1897. char log[LOG_MSG_SIZE + 1];
  1898. snprintfz(log, LOG_MSG_SIZE, "expected %zu (got %zu) parameters: %smodule_name job_name job_type", expected_num_words - 1, num_words - 1, SERVING_PLUGINSD(parser) ? "" : "plugin_name ");
  1899. return PLUGINSD_DISABLE_PLUGIN(parser, PLUGINSD_KEYWORD_DYNCFG_REGISTER_JOB, log);
  1900. }
  1901. if (SERVING_PLUGINSD(parser)) {
  1902. return pluginsd_register_job_common(&words[1], num_words - 1, parser, parser->user.cd->configuration->name);
  1903. }
  1904. return pluginsd_register_job_common(&words[2], num_words - 2, parser, words[1]);
  1905. }
  1906. static inline PARSER_RC pluginsd_job_status_common(char **words, size_t num_words, PARSER *parser, const char *plugin_name) {
  1907. int state = str2i(words[3]);
  1908. enum job_status status = str2job_state(words[2]);
  1909. if (unlikely(SERVING_PLUGINSD(parser) && status == JOB_STATUS_UNKNOWN))
  1910. return PLUGINSD_DISABLE_PLUGIN(parser, PLUGINSD_KEYWORD_REPORT_JOB_STATUS, "unknown job status");
  1911. char *message = NULL;
  1912. if (num_words == 5)
  1913. message = words[4];
  1914. const DICTIONARY_ITEM *plugin_item;
  1915. DICTIONARY *job_dict;
  1916. const DICTIONARY_ITEM *job_item = report_job_status_acq_lock(parser->user.host->configurable_plugins, &plugin_item, &job_dict, plugin_name, words[0], words[1], status, state, message);
  1917. if (job_item != NULL) {
  1918. struct job *job = dictionary_acquired_item_value(job_item);
  1919. rrdpush_send_job_status_update(parser->user.host, plugin_name, words[0], job);
  1920. pthread_mutex_unlock(&job->lock);
  1921. dictionary_acquired_item_release(job_dict, job_item);
  1922. dictionary_acquired_item_release(parser->user.host->configurable_plugins, plugin_item);
  1923. }
  1924. return PARSER_RC_OK;
  1925. }
  1926. // job_status [plugin_name if streaming] <module_name> <job_name> <status_code> <state> [message]
  1927. static PARSER_RC pluginsd_job_status(char **words, size_t num_words, PARSER *parser) {
  1928. if (SERVING_PLUGINSD(parser)) {
  1929. if (unlikely(num_words != 5 && num_words != 6))
  1930. return PLUGINSD_DISABLE_PLUGIN(parser, PLUGINSD_KEYWORD_REPORT_JOB_STATUS, "expected 4 or 5 parameters: module_name, job_name, status_code, state, [optional: message]");
  1931. } else {
  1932. if (unlikely(num_words != 6 && num_words != 7))
  1933. return PLUGINSD_DISABLE_PLUGIN(parser, PLUGINSD_KEYWORD_REPORT_JOB_STATUS, "expected 5 or 6 parameters: plugin_name, module_name, job_name, status_code, state, [optional: message]");
  1934. }
  1935. if (SERVING_PLUGINSD(parser)) {
  1936. return pluginsd_job_status_common(&words[1], num_words - 1, parser, parser->user.cd->configuration->name);
  1937. }
  1938. return pluginsd_job_status_common(&words[2], num_words - 2, parser, words[1]);
  1939. }
  1940. static PARSER_RC pluginsd_delete_job(char **words, size_t num_words, PARSER *parser) {
  1941. // this can confuse a bit but there is a diference between KEYWORD_DELETE_JOB and actual delete_job function
  1942. // they are of opossite direction
  1943. if (num_words != 4)
  1944. return PLUGINSD_DISABLE_PLUGIN(parser, PLUGINSD_KEYWORD_DELETE_JOB, "expected 2 parameters: plugin_name, module_name, job_name");
  1945. const char *plugin_name = get_word(words, num_words, 1);
  1946. const char *module_name = get_word(words, num_words, 2);
  1947. const char *job_name = get_word(words, num_words, 3);
  1948. if (SERVING_STREAMING(parser))
  1949. delete_job_pname(parser->user.host->configurable_plugins, plugin_name, module_name, job_name);
  1950. // forward to parent if any
  1951. rrdpush_send_job_deleted(parser->user.host, plugin_name, module_name, job_name);
  1952. return PARSER_RC_OK;
  1953. }
  1954. static inline PARSER_RC streaming_claimed_id(char **words, size_t num_words, PARSER *parser)
  1955. {
  1956. const char *host_uuid_str = get_word(words, num_words, 1);
  1957. const char *claim_id_str = get_word(words, num_words, 2);
  1958. if (!host_uuid_str || !claim_id_str) {
  1959. netdata_log_error("Command CLAIMED_ID came malformed, uuid = '%s', claim_id = '%s'",
  1960. host_uuid_str ? host_uuid_str : "[unset]",
  1961. claim_id_str ? claim_id_str : "[unset]");
  1962. return PARSER_RC_ERROR;
  1963. }
  1964. uuid_t uuid;
  1965. RRDHOST *host = parser->user.host;
  1966. // We don't need the parsed UUID
  1967. // just do it to check the format
  1968. if(uuid_parse(host_uuid_str, uuid)) {
  1969. netdata_log_error("1st parameter (host GUID) to CLAIMED_ID command is not valid GUID. Received: \"%s\".", host_uuid_str);
  1970. return PARSER_RC_ERROR;
  1971. }
  1972. if(uuid_parse(claim_id_str, uuid) && strcmp(claim_id_str, "NULL") != 0) {
  1973. netdata_log_error("2nd parameter (Claim ID) to CLAIMED_ID command is not valid GUID. Received: \"%s\".", claim_id_str);
  1974. return PARSER_RC_ERROR;
  1975. }
  1976. if(strcmp(host_uuid_str, host->machine_guid) != 0) {
  1977. netdata_log_error("Claim ID is for host \"%s\" but it came over connection for \"%s\"", host_uuid_str, host->machine_guid);
  1978. return PARSER_RC_OK; //the message is OK problem must be somewhere else
  1979. }
  1980. rrdhost_aclk_state_lock(host);
  1981. if (host->aclk_state.claimed_id)
  1982. freez(host->aclk_state.claimed_id);
  1983. host->aclk_state.claimed_id = strcmp(claim_id_str, "NULL") ? strdupz(claim_id_str) : NULL;
  1984. rrdhost_aclk_state_unlock(host);
  1985. rrdhost_flag_set(host, RRDHOST_FLAG_METADATA_CLAIMID |RRDHOST_FLAG_METADATA_UPDATE);
  1986. rrdpush_send_claimed_id(host);
  1987. return PARSER_RC_OK;
  1988. }
  1989. // ----------------------------------------------------------------------------
  1990. static inline bool buffered_reader_read(struct buffered_reader *reader, int fd) {
  1991. #ifdef NETDATA_INTERNAL_CHECKS
  1992. if(reader->read_buffer[reader->read_len] != '\0')
  1993. fatal("%s(): read_buffer does not start with zero", __FUNCTION__ );
  1994. #endif
  1995. ssize_t bytes_read = read(fd, reader->read_buffer + reader->read_len, sizeof(reader->read_buffer) - reader->read_len - 1);
  1996. if(unlikely(bytes_read <= 0))
  1997. return false;
  1998. reader->read_len += bytes_read;
  1999. reader->read_buffer[reader->read_len] = '\0';
  2000. return true;
  2001. }
  2002. static inline bool buffered_reader_read_timeout(struct buffered_reader *reader, int fd, int timeout_ms) {
  2003. errno = 0;
  2004. struct pollfd fds[1];
  2005. fds[0].fd = fd;
  2006. fds[0].events = POLLIN;
  2007. int ret = poll(fds, 1, timeout_ms);
  2008. if (ret > 0) {
  2009. /* There is data to read */
  2010. if (fds[0].revents & POLLIN)
  2011. return buffered_reader_read(reader, fd);
  2012. else if(fds[0].revents & POLLERR) {
  2013. netdata_log_error("PARSER: read failed: POLLERR.");
  2014. return false;
  2015. }
  2016. else if(fds[0].revents & POLLHUP) {
  2017. netdata_log_error("PARSER: read failed: POLLHUP.");
  2018. return false;
  2019. }
  2020. else if(fds[0].revents & POLLNVAL) {
  2021. netdata_log_error("PARSER: read failed: POLLNVAL.");
  2022. return false;
  2023. }
  2024. netdata_log_error("PARSER: poll() returned positive number, but POLLIN|POLLERR|POLLHUP|POLLNVAL are not set.");
  2025. return false;
  2026. }
  2027. else if (ret == 0) {
  2028. netdata_log_error("PARSER: timeout while waiting for data.");
  2029. return false;
  2030. }
  2031. netdata_log_error("PARSER: poll() failed with code %d.", ret);
  2032. return false;
  2033. }
  2034. void pluginsd_process_thread_cleanup(void *ptr) {
  2035. PARSER *parser = (PARSER *)ptr;
  2036. pluginsd_cleanup_v2(parser);
  2037. pluginsd_host_define_cleanup(parser);
  2038. rrd_collector_finished();
  2039. parser_destroy(parser);
  2040. }
  2041. inline size_t pluginsd_process(RRDHOST *host, struct plugind *cd, FILE *fp_plugin_input, FILE *fp_plugin_output, int trust_durations)
  2042. {
  2043. int enabled = cd->unsafe.enabled;
  2044. if (!fp_plugin_input || !fp_plugin_output || !enabled) {
  2045. cd->unsafe.enabled = 0;
  2046. return 0;
  2047. }
  2048. if (unlikely(fileno(fp_plugin_input) == -1)) {
  2049. netdata_log_error("input file descriptor given is not a valid stream");
  2050. cd->serial_failures++;
  2051. return 0;
  2052. }
  2053. if (unlikely(fileno(fp_plugin_output) == -1)) {
  2054. netdata_log_error("output file descriptor given is not a valid stream");
  2055. cd->serial_failures++;
  2056. return 0;
  2057. }
  2058. clearerr(fp_plugin_input);
  2059. clearerr(fp_plugin_output);
  2060. PARSER *parser;
  2061. {
  2062. PARSER_USER_OBJECT user = {
  2063. .enabled = cd->unsafe.enabled,
  2064. .host = host,
  2065. .cd = cd,
  2066. .trust_durations = trust_durations
  2067. };
  2068. // fp_plugin_output = our input; fp_plugin_input = our output
  2069. parser = parser_init(&user, fp_plugin_output, fp_plugin_input, -1, PARSER_INPUT_SPLIT, NULL);
  2070. }
  2071. pluginsd_keywords_init(parser, PARSER_INIT_PLUGINSD);
  2072. rrd_collector_started();
  2073. size_t count = 0;
  2074. // this keeps the parser with its current value
  2075. // so, parser needs to be allocated before pushing it
  2076. netdata_thread_cleanup_push(pluginsd_process_thread_cleanup, parser);
  2077. buffered_reader_init(&parser->reader);
  2078. BUFFER *buffer = buffer_create(sizeof(parser->reader.read_buffer) + 2, NULL);
  2079. while(likely(service_running(SERVICE_COLLECTORS))) {
  2080. if (unlikely(!buffered_reader_next_line(&parser->reader, buffer))) {
  2081. if(unlikely(!buffered_reader_read_timeout(&parser->reader, fileno((FILE *)parser->fp_input), 2 * 60 * MSEC_PER_SEC)))
  2082. break;
  2083. continue;
  2084. }
  2085. if(unlikely(parser_action(parser, buffer->buffer)))
  2086. break;
  2087. buffer->len = 0;
  2088. buffer->buffer[0] = '\0';
  2089. }
  2090. buffer_free(buffer);
  2091. cd->unsafe.enabled = parser->user.enabled;
  2092. count = parser->user.data_collections_count;
  2093. if (likely(count)) {
  2094. cd->successful_collections += count;
  2095. cd->serial_failures = 0;
  2096. }
  2097. else
  2098. cd->serial_failures++;
  2099. // free parser with the pop function
  2100. netdata_thread_cleanup_pop(1);
  2101. return count;
  2102. }
  2103. void pluginsd_keywords_init(PARSER *parser, PARSER_REPERTOIRE repertoire) {
  2104. parser_init_repertoire(parser, repertoire);
  2105. if (repertoire & (PARSER_INIT_PLUGINSD | PARSER_INIT_STREAMING))
  2106. inflight_functions_init(parser);
  2107. }
  2108. PARSER *parser_init(struct parser_user_object *user, FILE *fp_input, FILE *fp_output, int fd,
  2109. PARSER_INPUT_TYPE flags, void *ssl __maybe_unused) {
  2110. PARSER *parser;
  2111. parser = callocz(1, sizeof(*parser));
  2112. if(user)
  2113. parser->user = *user;
  2114. parser->fd = fd;
  2115. parser->fp_input = fp_input;
  2116. parser->fp_output = fp_output;
  2117. #ifdef ENABLE_HTTPS
  2118. parser->ssl_output = ssl;
  2119. #endif
  2120. parser->flags = flags;
  2121. spinlock_init(&parser->writer.spinlock);
  2122. return parser;
  2123. }
  2124. PARSER_RC parser_execute(PARSER *parser, PARSER_KEYWORD *keyword, char **words, size_t num_words) {
  2125. switch(keyword->id) {
  2126. case 1:
  2127. return pluginsd_set_v2(words, num_words, parser);
  2128. case 2:
  2129. return pluginsd_begin_v2(words, num_words, parser);
  2130. case 3:
  2131. return pluginsd_end_v2(words, num_words, parser);
  2132. case 11:
  2133. return pluginsd_set(words, num_words, parser);
  2134. case 12:
  2135. return pluginsd_begin(words, num_words, parser);
  2136. case 13:
  2137. return pluginsd_end(words, num_words, parser);
  2138. case 21:
  2139. return pluginsd_replay_set(words, num_words, parser);
  2140. case 22:
  2141. return pluginsd_replay_begin(words, num_words, parser);
  2142. case 23:
  2143. return pluginsd_replay_rrddim_collection_state(words, num_words, parser);
  2144. case 24:
  2145. return pluginsd_replay_rrdset_collection_state(words, num_words, parser);
  2146. case 25:
  2147. return pluginsd_replay_end(words, num_words, parser);
  2148. case 31:
  2149. return pluginsd_dimension(words, num_words, parser);
  2150. case 32:
  2151. return pluginsd_chart(words, num_words, parser);
  2152. case 33:
  2153. return pluginsd_chart_definition_end(words, num_words, parser);
  2154. case 34:
  2155. return pluginsd_clabel(words, num_words, parser);
  2156. case 35:
  2157. return pluginsd_clabel_commit(words, num_words, parser);
  2158. case 41:
  2159. return pluginsd_function(words, num_words, parser);
  2160. case 42:
  2161. return pluginsd_function_result_begin(words, num_words, parser);
  2162. case 51:
  2163. return pluginsd_label(words, num_words, parser);
  2164. case 52:
  2165. return pluginsd_overwrite(words, num_words, parser);
  2166. case 53:
  2167. return pluginsd_variable(words, num_words, parser);
  2168. case 61:
  2169. return streaming_claimed_id(words, num_words, parser);
  2170. case 71:
  2171. return pluginsd_host(words, num_words, parser);
  2172. case 72:
  2173. return pluginsd_host_define(words, num_words, parser);
  2174. case 73:
  2175. return pluginsd_host_define_end(words, num_words, parser);
  2176. case 74:
  2177. return pluginsd_host_labels(words, num_words, parser);
  2178. case 97:
  2179. return pluginsd_flush(words, num_words, parser);
  2180. case 98:
  2181. return pluginsd_disable(words, num_words, parser);
  2182. case 99:
  2183. return pluginsd_exit(words, num_words, parser);
  2184. case 101:
  2185. return pluginsd_register_plugin(words, num_words, parser);
  2186. case 102:
  2187. return pluginsd_register_module(words, num_words, parser);
  2188. case 103:
  2189. return pluginsd_register_job(words, num_words, parser);
  2190. case 110:
  2191. return pluginsd_job_status(words, num_words, parser);
  2192. case 111:
  2193. return pluginsd_delete_job(words, num_words, parser);
  2194. default:
  2195. fatal("Unknown keyword '%s' with id %zu", keyword->keyword, keyword->id);
  2196. }
  2197. }
  2198. #include "gperf-hashtable.h"
  2199. void parser_init_repertoire(PARSER *parser, PARSER_REPERTOIRE repertoire) {
  2200. parser->repertoire = repertoire;
  2201. for(size_t i = GPERF_PARSER_MIN_HASH_VALUE ; i <= GPERF_PARSER_MAX_HASH_VALUE ;i++) {
  2202. if(gperf_keywords[i].keyword && *gperf_keywords[i].keyword && (parser->repertoire & gperf_keywords[i].repertoire))
  2203. worker_register_job_name(gperf_keywords[i].worker_job_id, gperf_keywords[i].keyword);
  2204. }
  2205. }
  2206. static void parser_destroy_dyncfg(PARSER *parser) {
  2207. if (parser->user.cd != NULL && parser->user.cd->configuration != NULL) {
  2208. unregister_plugin(parser->user.host->configurable_plugins, parser->user.cd->cfg_dict_item);
  2209. parser->user.cd->configuration = NULL;
  2210. } else if (parser->user.host != NULL && SERVING_STREAMING(parser) && parser->user.host != localhost){
  2211. dictionary_flush(parser->user.host->configurable_plugins);
  2212. }
  2213. }
  2214. void parser_destroy(PARSER *parser) {
  2215. if (unlikely(!parser))
  2216. return;
  2217. parser_destroy_dyncfg(parser);
  2218. dictionary_destroy(parser->inflight.functions);
  2219. freez(parser);
  2220. }
  2221. int pluginsd_parser_unittest(void) {
  2222. PARSER *p = parser_init(NULL, NULL, NULL, -1, PARSER_INPUT_SPLIT, NULL);
  2223. pluginsd_keywords_init(p, PARSER_INIT_PLUGINSD | PARSER_INIT_STREAMING);
  2224. char *lines[] = {
  2225. "BEGIN2 abcdefghijklmnopqr 123",
  2226. "SET2 abcdefg 0x12345678 0 0",
  2227. "SET2 hijklmnoqr 0x12345678 0 0",
  2228. "SET2 stuvwxyz 0x12345678 0 0",
  2229. "END2",
  2230. NULL,
  2231. };
  2232. char *words[PLUGINSD_MAX_WORDS];
  2233. size_t iterations = 1000000;
  2234. size_t count = 0;
  2235. char input[PLUGINSD_LINE_MAX + 1];
  2236. usec_t started = now_realtime_usec();
  2237. while(--iterations) {
  2238. for(size_t line = 0; lines[line] ;line++) {
  2239. strncpyz(input, lines[line], PLUGINSD_LINE_MAX);
  2240. size_t num_words = quoted_strings_splitter_pluginsd(input, words, PLUGINSD_MAX_WORDS);
  2241. const char *command = get_word(words, num_words, 0);
  2242. PARSER_KEYWORD *keyword = parser_find_keyword(p, command);
  2243. if(unlikely(!keyword))
  2244. fatal("Cannot parse the line '%s'", lines[line]);
  2245. count++;
  2246. }
  2247. }
  2248. usec_t ended = now_realtime_usec();
  2249. netdata_log_info("Parsed %zu lines in %0.2f secs, %0.2f klines/sec", count,
  2250. (double)(ended - started) / (double)USEC_PER_SEC,
  2251. (double)count / ((double)(ended - started) / (double)USEC_PER_SEC) / 1000.0);
  2252. parser_destroy(p);
  2253. return 0;
  2254. }