pluginsd_parser.c 82 KB

1234567891011121314151617181920212223242526272829303132333435363738394041424344454647484950515253545556575859606162636465666768697071727374757677787980818283848586878889909192939495969798991001011021031041051061071081091101111121131141151161171181191201211221231241251261271281291301311321331341351361371381391401411421431441451461471481491501511521531541551561571581591601611621631641651661671681691701711721731741751761771781791801811821831841851861871881891901911921931941951961971981992002012022032042052062072082092102112122132142152162172182192202212222232242252262272282292302312322332342352362372382392402412422432442452462472482492502512522532542552562572582592602612622632642652662672682692702712722732742752762772782792802812822832842852862872882892902912922932942952962972982993003013023033043053063073083093103113123133143153163173183193203213223233243253263273283293303313323333343353363373383393403413423433443453463473483493503513523533543553563573583593603613623633643653663673683693703713723733743753763773783793803813823833843853863873883893903913923933943953963973983994004014024034044054064074084094104114124134144154164174184194204214224234244254264274284294304314324334344354364374384394404414424434444454464474484494504514524534544554564574584594604614624634644654664674684694704714724734744754764774784794804814824834844854864874884894904914924934944954964974984995005015025035045055065075085095105115125135145155165175185195205215225235245255265275285295305315325335345355365375385395405415425435445455465475485495505515525535545555565575585595605615625635645655665675685695705715725735745755765775785795805815825835845855865875885895905915925935945955965975985996006016026036046056066076086096106116126136146156166176186196206216226236246256266276286296306316326336346356366376386396406416426436446456466476486496506516526536546556566576586596606616626636646656666676686696706716726736746756766776786796806816826836846856866876886896906916926936946956966976986997007017027037047057067077087097107117127137147157167177187197207217227237247257267277287297307317327337347357367377387397407417427437447457467477487497507517527537547557567577587597607617627637647657667677687697707717727737747757767777787797807817827837847857867877887897907917927937947957967977987998008018028038048058068078088098108118128138148158168178188198208218228238248258268278288298308318328338348358368378388398408418428438448458468478488498508518528538548558568578588598608618628638648658668678688698708718728738748758768778788798808818828838848858868878888898908918928938948958968978988999009019029039049059069079089099109119129139149159169179189199209219229239249259269279289299309319329339349359369379389399409419429439449459469479489499509519529539549559569579589599609619629639649659669679689699709719729739749759769779789799809819829839849859869879889899909919929939949959969979989991000100110021003100410051006100710081009101010111012101310141015101610171018101910201021102210231024102510261027102810291030103110321033103410351036103710381039104010411042104310441045104610471048104910501051105210531054105510561057105810591060106110621063106410651066106710681069107010711072107310741075107610771078107910801081108210831084108510861087108810891090109110921093109410951096109710981099110011011102110311041105110611071108110911101111111211131114111511161117111811191120112111221123112411251126112711281129113011311132113311341135113611371138113911401141114211431144114511461147114811491150115111521153115411551156115711581159116011611162116311641165116611671168116911701171117211731174117511761177117811791180118111821183118411851186118711881189119011911192119311941195119611971198119912001201120212031204120512061207120812091210121112121213121412151216121712181219122012211222122312241225122612271228122912301231123212331234123512361237123812391240124112421243124412451246124712481249125012511252125312541255125612571258125912601261126212631264126512661267126812691270127112721273127412751276127712781279128012811282128312841285128612871288128912901291129212931294129512961297129812991300130113021303130413051306130713081309131013111312131313141315131613171318131913201321132213231324132513261327132813291330133113321333133413351336133713381339134013411342134313441345134613471348134913501351135213531354135513561357135813591360136113621363136413651366136713681369137013711372137313741375137613771378137913801381138213831384138513861387138813891390139113921393139413951396139713981399140014011402140314041405140614071408140914101411141214131414141514161417141814191420142114221423142414251426142714281429143014311432143314341435143614371438143914401441144214431444144514461447144814491450145114521453145414551456145714581459146014611462146314641465146614671468146914701471147214731474147514761477147814791480148114821483148414851486148714881489149014911492149314941495149614971498149915001501150215031504150515061507150815091510151115121513151415151516151715181519152015211522152315241525152615271528152915301531153215331534153515361537153815391540154115421543154415451546154715481549155015511552155315541555155615571558155915601561156215631564156515661567156815691570157115721573157415751576157715781579158015811582158315841585158615871588158915901591159215931594159515961597159815991600160116021603160416051606160716081609161016111612161316141615161616171618161916201621162216231624162516261627162816291630163116321633163416351636163716381639164016411642164316441645164616471648164916501651165216531654165516561657165816591660166116621663166416651666166716681669167016711672167316741675167616771678167916801681168216831684168516861687168816891690169116921693169416951696169716981699170017011702170317041705170617071708170917101711171217131714171517161717171817191720172117221723172417251726172717281729173017311732173317341735173617371738173917401741174217431744174517461747174817491750175117521753175417551756175717581759176017611762176317641765176617671768176917701771177217731774177517761777177817791780178117821783178417851786178717881789179017911792179317941795179617971798179918001801180218031804180518061807180818091810181118121813181418151816181718181819182018211822182318241825182618271828182918301831183218331834183518361837183818391840184118421843184418451846184718481849185018511852185318541855185618571858185918601861186218631864186518661867186818691870187118721873187418751876187718781879188018811882188318841885188618871888188918901891189218931894189518961897189818991900190119021903190419051906190719081909191019111912191319141915191619171918191919201921192219231924192519261927192819291930193119321933193419351936193719381939194019411942194319441945194619471948194919501951195219531954195519561957195819591960196119621963196419651966196719681969197019711972197319741975197619771978197919801981198219831984198519861987198819891990199119921993199419951996199719981999200020012002200320042005200620072008200920102011201220132014201520162017201820192020202120222023202420252026202720282029203020312032203320342035203620372038203920402041204220432044204520462047204820492050205120522053205420552056205720582059206020612062206320642065206620672068206920702071207220732074207520762077207820792080208120822083208420852086208720882089209020912092209320942095209620972098209921002101210221032104210521062107210821092110211121122113211421152116211721182119212021212122212321242125212621272128212921302131213221332134213521362137213821392140214121422143214421452146214721482149215021512152215321542155215621572158215921602161216221632164216521662167216821692170217121722173217421752176
  1. // SPDX-License-Identifier: GPL-3.0-or-later
  2. #include "pluginsd_parser.h"
  3. #define LOG_FUNCTIONS false
  4. static ssize_t send_to_plugin(const char *txt, void *data) {
  5. PARSER *parser = data;
  6. if(!txt || !*txt)
  7. return 0;
  8. errno = 0;
  9. spinlock_lock(&parser->writer.spinlock);
  10. ssize_t bytes = -1;
  11. #ifdef ENABLE_HTTPS
  12. NETDATA_SSL *ssl = parser->ssl_output;
  13. if(ssl) {
  14. if(SSL_connection(ssl))
  15. bytes = netdata_ssl_write(ssl, (void *) txt, strlen(txt));
  16. else
  17. netdata_log_error("PLUGINSD: cannot send command (SSL)");
  18. spinlock_unlock(&parser->writer.spinlock);
  19. return bytes;
  20. }
  21. #endif
  22. if(parser->fp_output) {
  23. bytes = fprintf(parser->fp_output, "%s", txt);
  24. if(bytes <= 0) {
  25. netdata_log_error("PLUGINSD: cannot send command (FILE)");
  26. bytes = -2;
  27. }
  28. else
  29. fflush(parser->fp_output);
  30. spinlock_unlock(&parser->writer.spinlock);
  31. return bytes;
  32. }
  33. if(parser->fd != -1) {
  34. bytes = 0;
  35. ssize_t total = (ssize_t)strlen(txt);
  36. ssize_t sent;
  37. do {
  38. sent = write(parser->fd, &txt[bytes], total - bytes);
  39. if(sent <= 0) {
  40. netdata_log_error("PLUGINSD: cannot send command (fd)");
  41. spinlock_unlock(&parser->writer.spinlock);
  42. return -3;
  43. }
  44. bytes += sent;
  45. }
  46. while(bytes < total);
  47. spinlock_unlock(&parser->writer.spinlock);
  48. return (int)bytes;
  49. }
  50. spinlock_unlock(&parser->writer.spinlock);
  51. netdata_log_error("PLUGINSD: cannot send command (no output socket/pipe/file given to plugins.d parser)");
  52. return -4;
  53. }
  54. static inline RRDHOST *pluginsd_require_host_from_parent(PARSER *parser, const char *cmd) {
  55. RRDHOST *host = parser->user.host;
  56. if(unlikely(!host))
  57. netdata_log_error("PLUGINSD: command %s requires a host, but is not set.", cmd);
  58. return host;
  59. }
  60. static inline RRDSET *pluginsd_require_chart_from_parent(PARSER *parser, const char *cmd, const char *parent_cmd) {
  61. RRDSET *st = parser->user.st;
  62. if(unlikely(!st))
  63. netdata_log_error("PLUGINSD: command %s requires a chart defined via command %s, but is not set.", cmd, parent_cmd);
  64. return st;
  65. }
  66. static inline RRDSET *pluginsd_get_chart_from_parent(PARSER *parser) {
  67. return parser->user.st;
  68. }
  69. static inline void pluginsd_lock_rrdset_data_collection(PARSER *parser) {
  70. if(parser->user.st && !parser->user.v2.locked_data_collection) {
  71. spinlock_lock(&parser->user.st->data_collection_lock);
  72. parser->user.v2.locked_data_collection = true;
  73. }
  74. }
  75. static inline bool pluginsd_unlock_rrdset_data_collection(PARSER *parser) {
  76. if(parser->user.st && parser->user.v2.locked_data_collection) {
  77. spinlock_unlock(&parser->user.st->data_collection_lock);
  78. parser->user.v2.locked_data_collection = false;
  79. return true;
  80. }
  81. return false;
  82. }
  83. void pluginsd_rrdset_cleanup(RRDSET *st) {
  84. for(size_t i = 0; i < st->pluginsd.used ; i++) {
  85. if (st->pluginsd.rda[i]) {
  86. rrddim_acquired_release(st->pluginsd.rda[i]);
  87. st->pluginsd.rda[i] = NULL;
  88. }
  89. }
  90. freez(st->pluginsd.rda);
  91. st->pluginsd.rda = NULL;
  92. st->pluginsd.size = 0;
  93. st->pluginsd.used = 0;
  94. st->pluginsd.pos = 0;
  95. }
  96. static inline void pluginsd_unlock_previous_chart(PARSER *parser, const char *keyword, bool stale) {
  97. if(unlikely(pluginsd_unlock_rrdset_data_collection(parser))) {
  98. if(stale)
  99. netdata_log_error("PLUGINSD: 'host:%s/chart:%s/' stale data collection lock found during %s; it has been unlocked",
  100. rrdhost_hostname(parser->user.st->rrdhost),
  101. rrdset_id(parser->user.st),
  102. keyword);
  103. }
  104. if(unlikely(parser->user.v2.ml_locked)) {
  105. ml_chart_update_end(parser->user.st);
  106. parser->user.v2.ml_locked = false;
  107. if(stale)
  108. netdata_log_error("PLUGINSD: 'host:%s/chart:%s/' stale ML lock found during %s, it has been unlocked",
  109. rrdhost_hostname(parser->user.st->rrdhost),
  110. rrdset_id(parser->user.st),
  111. keyword);
  112. }
  113. }
  114. static inline void pluginsd_set_chart_from_parent(PARSER *parser, RRDSET *st, const char *keyword) {
  115. pluginsd_unlock_previous_chart(parser, keyword, true);
  116. if(st) {
  117. size_t dims = dictionary_entries(st->rrddim_root_index);
  118. if(unlikely(st->pluginsd.size < dims)) {
  119. st->pluginsd.rda = reallocz(st->pluginsd.rda, dims * sizeof(RRDDIM_ACQUIRED *));
  120. st->pluginsd.size = dims;
  121. }
  122. if(st->pluginsd.pos > st->pluginsd.used && st->pluginsd.pos <= st->pluginsd.size)
  123. st->pluginsd.used = st->pluginsd.pos;
  124. st->pluginsd.pos = 0;
  125. }
  126. parser->user.st = st;
  127. }
  128. static inline RRDDIM *pluginsd_acquire_dimension(RRDHOST *host, RRDSET *st, const char *dimension, const char *cmd) {
  129. if (unlikely(!dimension || !*dimension)) {
  130. netdata_log_error("PLUGINSD: 'host:%s/chart:%s' got a %s, without a dimension.",
  131. rrdhost_hostname(host), rrdset_id(st), cmd);
  132. return NULL;
  133. }
  134. RRDDIM_ACQUIRED *rda;
  135. if(likely(st->pluginsd.pos < st->pluginsd.used)) {
  136. rda = st->pluginsd.rda[st->pluginsd.pos];
  137. RRDDIM *rd = rrddim_acquired_to_rrddim(rda);
  138. if (likely(rd && string_strcmp(rd->id, dimension) == 0)) {
  139. st->pluginsd.pos++;
  140. return rd;
  141. }
  142. else {
  143. rrddim_acquired_release(rda);
  144. st->pluginsd.rda[st->pluginsd.pos] = NULL;
  145. }
  146. }
  147. rda = rrddim_find_and_acquire(st, dimension);
  148. if (unlikely(!rda)) {
  149. netdata_log_error("PLUGINSD: 'host:%s/chart:%s/dim:%s' got a %s but dimension does not exist.",
  150. rrdhost_hostname(host), rrdset_id(st), dimension, cmd);
  151. return NULL;
  152. }
  153. if(likely(st->pluginsd.pos < st->pluginsd.size))
  154. st->pluginsd.rda[st->pluginsd.pos++] = rda;
  155. return rrddim_acquired_to_rrddim(rda);
  156. }
  157. static inline RRDSET *pluginsd_find_chart(RRDHOST *host, const char *chart, const char *cmd) {
  158. if (unlikely(!chart || !*chart)) {
  159. netdata_log_error("PLUGINSD: 'host:%s' got a %s without a chart id.",
  160. rrdhost_hostname(host), cmd);
  161. return NULL;
  162. }
  163. RRDSET *st = rrdset_find(host, chart);
  164. if (unlikely(!st))
  165. netdata_log_error("PLUGINSD: 'host:%s/chart:%s' got a %s but chart does not exist.",
  166. rrdhost_hostname(host), chart, cmd);
  167. return st;
  168. }
  169. static inline PARSER_RC PLUGINSD_DISABLE_PLUGIN(PARSER *parser, const char *keyword, const char *msg) {
  170. parser->user.enabled = 0;
  171. if(keyword && msg) {
  172. error_limit_static_global_var(erl, 1, 0);
  173. error_limit(&erl, "PLUGINSD: keyword %s: %s", keyword, msg);
  174. }
  175. return PARSER_RC_ERROR;
  176. }
  177. static inline PARSER_RC pluginsd_set(char **words, size_t num_words, PARSER *parser) {
  178. char *dimension = get_word(words, num_words, 1);
  179. char *value = get_word(words, num_words, 2);
  180. RRDHOST *host = pluginsd_require_host_from_parent(parser, PLUGINSD_KEYWORD_SET);
  181. if(!host) return PLUGINSD_DISABLE_PLUGIN(parser, NULL, NULL);
  182. RRDSET *st = pluginsd_require_chart_from_parent(parser, PLUGINSD_KEYWORD_SET, PLUGINSD_KEYWORD_CHART);
  183. if(!st) return PLUGINSD_DISABLE_PLUGIN(parser, NULL, NULL);
  184. RRDDIM *rd = pluginsd_acquire_dimension(host, st, dimension, PLUGINSD_KEYWORD_SET);
  185. if(!rd) return PLUGINSD_DISABLE_PLUGIN(parser, NULL, NULL);
  186. if (unlikely(rrdset_flag_check(st, RRDSET_FLAG_DEBUG)))
  187. netdata_log_debug(D_PLUGINSD, "PLUGINSD: 'host:%s/chart:%s/dim:%s' SET is setting value to '%s'",
  188. rrdhost_hostname(host), rrdset_id(st), dimension, value && *value ? value : "UNSET");
  189. if (value && *value)
  190. rrddim_set_by_pointer(st, rd, str2ll_encoded(value));
  191. return PARSER_RC_OK;
  192. }
  193. static inline PARSER_RC pluginsd_begin(char **words, size_t num_words, PARSER *parser) {
  194. char *id = get_word(words, num_words, 1);
  195. char *microseconds_txt = get_word(words, num_words, 2);
  196. RRDHOST *host = pluginsd_require_host_from_parent(parser, PLUGINSD_KEYWORD_BEGIN);
  197. if(!host) return PLUGINSD_DISABLE_PLUGIN(parser, NULL, NULL);
  198. RRDSET *st = pluginsd_find_chart(host, id, PLUGINSD_KEYWORD_BEGIN);
  199. if(!st) return PLUGINSD_DISABLE_PLUGIN(parser, NULL, NULL);
  200. pluginsd_set_chart_from_parent(parser, st, PLUGINSD_KEYWORD_BEGIN);
  201. usec_t microseconds = 0;
  202. if (microseconds_txt && *microseconds_txt) {
  203. long long t = str2ll(microseconds_txt, NULL);
  204. if(t >= 0)
  205. microseconds = t;
  206. }
  207. #ifdef NETDATA_LOG_REPLICATION_REQUESTS
  208. if(st->replay.log_next_data_collection) {
  209. st->replay.log_next_data_collection = false;
  210. internal_error(true,
  211. "REPLAY: 'host:%s/chart:%s' first BEGIN after replication, last collected %llu, last updated %llu, microseconds %llu",
  212. rrdhost_hostname(host), rrdset_id(st),
  213. st->last_collected_time.tv_sec * USEC_PER_SEC + st->last_collected_time.tv_usec,
  214. st->last_updated.tv_sec * USEC_PER_SEC + st->last_updated.tv_usec,
  215. microseconds
  216. );
  217. }
  218. #endif
  219. if (likely(st->counter_done)) {
  220. if (likely(microseconds)) {
  221. if (parser->user.trust_durations)
  222. rrdset_next_usec_unfiltered(st, microseconds);
  223. else
  224. rrdset_next_usec(st, microseconds);
  225. }
  226. else
  227. rrdset_next(st);
  228. }
  229. return PARSER_RC_OK;
  230. }
  231. static inline PARSER_RC pluginsd_end(char **words, size_t num_words, PARSER *parser) {
  232. UNUSED(words);
  233. UNUSED(num_words);
  234. RRDHOST *host = pluginsd_require_host_from_parent(parser, PLUGINSD_KEYWORD_END);
  235. if(!host) return PLUGINSD_DISABLE_PLUGIN(parser, NULL, NULL);
  236. RRDSET *st = pluginsd_require_chart_from_parent(parser, PLUGINSD_KEYWORD_END, PLUGINSD_KEYWORD_BEGIN);
  237. if(!st) return PLUGINSD_DISABLE_PLUGIN(parser, NULL, NULL);
  238. if (unlikely(rrdset_flag_check(st, RRDSET_FLAG_DEBUG)))
  239. netdata_log_debug(D_PLUGINSD, "requested an END on chart '%s'", rrdset_id(st));
  240. pluginsd_set_chart_from_parent(parser, NULL, PLUGINSD_KEYWORD_END);
  241. parser->user.data_collections_count++;
  242. struct timeval now;
  243. now_realtime_timeval(&now);
  244. rrdset_timed_done(st, now, /* pending_rrdset_next = */ false);
  245. return PARSER_RC_OK;
  246. }
  247. static void pluginsd_host_define_cleanup(PARSER *parser) {
  248. string_freez(parser->user.host_define.hostname);
  249. dictionary_destroy(parser->user.host_define.rrdlabels);
  250. parser->user.host_define.hostname = NULL;
  251. parser->user.host_define.rrdlabels = NULL;
  252. parser->user.host_define.parsing_host = false;
  253. }
  254. static inline bool pluginsd_validate_machine_guid(const char *guid, uuid_t *uuid, char *output) {
  255. if(uuid_parse(guid, *uuid))
  256. return false;
  257. uuid_unparse_lower(*uuid, output);
  258. return true;
  259. }
  260. static inline PARSER_RC pluginsd_host_define(char **words, size_t num_words, PARSER *parser) {
  261. char *guid = get_word(words, num_words, 1);
  262. char *hostname = get_word(words, num_words, 2);
  263. if(unlikely(!guid || !*guid || !hostname || !*hostname))
  264. return PLUGINSD_DISABLE_PLUGIN(parser, PLUGINSD_KEYWORD_HOST_DEFINE, "missing parameters");
  265. if(unlikely(parser->user.host_define.parsing_host))
  266. return PLUGINSD_DISABLE_PLUGIN(parser, PLUGINSD_KEYWORD_HOST_DEFINE,
  267. "another host definition is already open - did you send " PLUGINSD_KEYWORD_HOST_DEFINE_END "?");
  268. if(!pluginsd_validate_machine_guid(guid, &parser->user.host_define.machine_guid, parser->user.host_define.machine_guid_str))
  269. return PLUGINSD_DISABLE_PLUGIN(parser, PLUGINSD_KEYWORD_HOST_DEFINE, "cannot parse MACHINE_GUID - is it a valid UUID?");
  270. parser->user.host_define.hostname = string_strdupz(hostname);
  271. parser->user.host_define.rrdlabels = rrdlabels_create();
  272. parser->user.host_define.parsing_host = true;
  273. return PARSER_RC_OK;
  274. }
  275. static inline PARSER_RC pluginsd_host_dictionary(char **words, size_t num_words, PARSER *parser, DICTIONARY *dict, const char *keyword) {
  276. char *name = get_word(words, num_words, 1);
  277. char *value = get_word(words, num_words, 2);
  278. if(!name || !*name || !value)
  279. return PLUGINSD_DISABLE_PLUGIN(parser, keyword, "missing parameters");
  280. if(!parser->user.host_define.parsing_host || !dict)
  281. return PLUGINSD_DISABLE_PLUGIN(parser, keyword, "host is not defined, send " PLUGINSD_KEYWORD_HOST_DEFINE " before this");
  282. rrdlabels_add(dict, name, value, RRDLABEL_SRC_CONFIG);
  283. return PARSER_RC_OK;
  284. }
  285. static inline PARSER_RC pluginsd_host_labels(char **words, size_t num_words, PARSER *parser) {
  286. return pluginsd_host_dictionary(words, num_words, parser,
  287. parser->user.host_define.rrdlabels,
  288. PLUGINSD_KEYWORD_HOST_LABEL);
  289. }
  290. static inline PARSER_RC pluginsd_host_define_end(char **words __maybe_unused, size_t num_words __maybe_unused, PARSER *parser) {
  291. if(!parser->user.host_define.parsing_host)
  292. return PLUGINSD_DISABLE_PLUGIN(parser, PLUGINSD_KEYWORD_HOST_DEFINE_END, "missing initialization, send " PLUGINSD_KEYWORD_HOST_DEFINE " before this");
  293. RRDHOST *host = rrdhost_find_or_create(
  294. string2str(parser->user.host_define.hostname),
  295. string2str(parser->user.host_define.hostname),
  296. parser->user.host_define.machine_guid_str,
  297. "Netdata Virtual Host 1.0",
  298. netdata_configured_timezone,
  299. netdata_configured_abbrev_timezone,
  300. netdata_configured_utc_offset,
  301. NULL,
  302. program_name,
  303. program_version,
  304. default_rrd_update_every,
  305. default_rrd_history_entries,
  306. default_rrd_memory_mode,
  307. default_health_enabled,
  308. default_rrdpush_enabled,
  309. default_rrdpush_destination,
  310. default_rrdpush_api_key,
  311. default_rrdpush_send_charts_matching,
  312. default_rrdpush_enable_replication,
  313. default_rrdpush_seconds_to_replicate,
  314. default_rrdpush_replication_step,
  315. rrdhost_labels_to_system_info(parser->user.host_define.rrdlabels),
  316. false
  317. );
  318. rrdhost_option_set(host, RRDHOST_OPTION_VIRTUAL_HOST);
  319. if(host->rrdlabels) {
  320. rrdlabels_migrate_to_these(host->rrdlabels, parser->user.host_define.rrdlabels);
  321. }
  322. else {
  323. host->rrdlabels = parser->user.host_define.rrdlabels;
  324. parser->user.host_define.rrdlabels = NULL;
  325. }
  326. pluginsd_host_define_cleanup(parser);
  327. parser->user.host = host;
  328. pluginsd_set_chart_from_parent(parser, NULL, PLUGINSD_KEYWORD_HOST_DEFINE_END);
  329. rrdhost_flag_clear(host, RRDHOST_FLAG_ORPHAN);
  330. rrdcontext_host_child_connected(host);
  331. schedule_node_info_update(host);
  332. return PARSER_RC_OK;
  333. }
  334. static inline PARSER_RC pluginsd_host(char **words, size_t num_words, PARSER *parser) {
  335. char *guid = get_word(words, num_words, 1);
  336. if(!guid || !*guid || strcmp(guid, "localhost") == 0) {
  337. parser->user.host = localhost;
  338. return PARSER_RC_OK;
  339. }
  340. uuid_t uuid;
  341. char uuid_str[UUID_STR_LEN];
  342. if(!pluginsd_validate_machine_guid(guid, &uuid, uuid_str))
  343. return PLUGINSD_DISABLE_PLUGIN(parser, PLUGINSD_KEYWORD_HOST, "cannot parse MACHINE_GUID - is it a valid UUID?");
  344. RRDHOST *host = rrdhost_find_by_guid(uuid_str);
  345. if(unlikely(!host))
  346. return PLUGINSD_DISABLE_PLUGIN(parser, PLUGINSD_KEYWORD_HOST, "cannot find a host with this machine guid - have you created it?");
  347. parser->user.host = host;
  348. return PARSER_RC_OK;
  349. }
  350. static inline PARSER_RC pluginsd_chart(char **words, size_t num_words, PARSER *parser) {
  351. RRDHOST *host = pluginsd_require_host_from_parent(parser, PLUGINSD_KEYWORD_CHART);
  352. if(!host) return PLUGINSD_DISABLE_PLUGIN(parser, NULL, NULL);
  353. char *type = get_word(words, num_words, 1);
  354. char *name = get_word(words, num_words, 2);
  355. char *title = get_word(words, num_words, 3);
  356. char *units = get_word(words, num_words, 4);
  357. char *family = get_word(words, num_words, 5);
  358. char *context = get_word(words, num_words, 6);
  359. char *chart = get_word(words, num_words, 7);
  360. char *priority_s = get_word(words, num_words, 8);
  361. char *update_every_s = get_word(words, num_words, 9);
  362. char *options = get_word(words, num_words, 10);
  363. char *plugin = get_word(words, num_words, 11);
  364. char *module = get_word(words, num_words, 12);
  365. // parse the id from type
  366. char *id = NULL;
  367. if (likely(type && (id = strchr(type, '.')))) {
  368. *id = '\0';
  369. id++;
  370. }
  371. // make sure we have the required variables
  372. if (unlikely((!type || !*type || !id || !*id)))
  373. return PLUGINSD_DISABLE_PLUGIN(parser, PLUGINSD_KEYWORD_CHART, "missing parameters");
  374. // parse the name, and make sure it does not include 'type.'
  375. if (unlikely(name && *name)) {
  376. // when data are streamed from child nodes
  377. // name will be type.name
  378. // so, we have to remove 'type.' from name too
  379. size_t len = strlen(type);
  380. if (strncmp(type, name, len) == 0 && name[len] == '.')
  381. name = &name[len + 1];
  382. // if the name is the same with the id,
  383. // or is just 'NULL', clear it.
  384. if (unlikely(strcmp(name, id) == 0 || strcasecmp(name, "NULL") == 0 || strcasecmp(name, "(NULL)") == 0))
  385. name = NULL;
  386. }
  387. int priority = 1000;
  388. if (likely(priority_s && *priority_s))
  389. priority = str2i(priority_s);
  390. int update_every = parser->user.cd->update_every;
  391. if (likely(update_every_s && *update_every_s))
  392. update_every = str2i(update_every_s);
  393. if (unlikely(!update_every))
  394. update_every = parser->user.cd->update_every;
  395. RRDSET_TYPE chart_type = RRDSET_TYPE_LINE;
  396. if (unlikely(chart))
  397. chart_type = rrdset_type_id(chart);
  398. if (unlikely(name && !*name))
  399. name = NULL;
  400. if (unlikely(family && !*family))
  401. family = NULL;
  402. if (unlikely(context && !*context))
  403. context = NULL;
  404. if (unlikely(!title))
  405. title = "";
  406. if (unlikely(!units))
  407. units = "unknown";
  408. netdata_log_debug(
  409. D_PLUGINSD,
  410. "creating chart type='%s', id='%s', name='%s', family='%s', context='%s', chart='%s', priority=%d, update_every=%d",
  411. type, id, name ? name : "", family ? family : "", context ? context : "", rrdset_type_name(chart_type),
  412. priority, update_every);
  413. RRDSET *st = NULL;
  414. st = rrdset_create(
  415. host, type, id, name, family, context, title, units,
  416. (plugin && *plugin) ? plugin : parser->user.cd->filename,
  417. module, priority, update_every,
  418. chart_type);
  419. if (likely(st)) {
  420. if (options && *options) {
  421. if (strstr(options, "obsolete")) {
  422. pluginsd_rrdset_cleanup(st);
  423. rrdset_is_obsolete(st);
  424. }
  425. else
  426. rrdset_isnot_obsolete(st);
  427. if (strstr(options, "detail"))
  428. rrdset_flag_set(st, RRDSET_FLAG_DETAIL);
  429. else
  430. rrdset_flag_clear(st, RRDSET_FLAG_DETAIL);
  431. if (strstr(options, "hidden"))
  432. rrdset_flag_set(st, RRDSET_FLAG_HIDDEN);
  433. else
  434. rrdset_flag_clear(st, RRDSET_FLAG_HIDDEN);
  435. if (strstr(options, "store_first"))
  436. rrdset_flag_set(st, RRDSET_FLAG_STORE_FIRST);
  437. else
  438. rrdset_flag_clear(st, RRDSET_FLAG_STORE_FIRST);
  439. } else {
  440. rrdset_isnot_obsolete(st);
  441. rrdset_flag_clear(st, RRDSET_FLAG_DETAIL);
  442. rrdset_flag_clear(st, RRDSET_FLAG_STORE_FIRST);
  443. }
  444. }
  445. pluginsd_set_chart_from_parent(parser, st, PLUGINSD_KEYWORD_CHART);
  446. return PARSER_RC_OK;
  447. }
  448. static inline PARSER_RC pluginsd_chart_definition_end(char **words, size_t num_words, PARSER *parser) {
  449. const char *first_entry_txt = get_word(words, num_words, 1);
  450. const char *last_entry_txt = get_word(words, num_words, 2);
  451. const char *wall_clock_time_txt = get_word(words, num_words, 3);
  452. RRDHOST *host = pluginsd_require_host_from_parent(parser, PLUGINSD_KEYWORD_CHART_DEFINITION_END);
  453. if(!host) return PLUGINSD_DISABLE_PLUGIN(parser, NULL, NULL);
  454. RRDSET *st = pluginsd_require_chart_from_parent(parser, PLUGINSD_KEYWORD_CHART_DEFINITION_END, PLUGINSD_KEYWORD_CHART);
  455. if(!st) return PLUGINSD_DISABLE_PLUGIN(parser, NULL, NULL);
  456. time_t first_entry_child = (first_entry_txt && *first_entry_txt) ? (time_t)str2ul(first_entry_txt) : 0;
  457. time_t last_entry_child = (last_entry_txt && *last_entry_txt) ? (time_t)str2ul(last_entry_txt) : 0;
  458. time_t child_wall_clock_time = (wall_clock_time_txt && *wall_clock_time_txt) ? (time_t)str2ul(wall_clock_time_txt) : now_realtime_sec();
  459. bool ok = true;
  460. if(!rrdset_flag_check(st, RRDSET_FLAG_RECEIVER_REPLICATION_IN_PROGRESS)) {
  461. #ifdef NETDATA_LOG_REPLICATION_REQUESTS
  462. st->replay.start_streaming = false;
  463. st->replay.after = 0;
  464. st->replay.before = 0;
  465. #endif
  466. rrdset_flag_set(st, RRDSET_FLAG_RECEIVER_REPLICATION_IN_PROGRESS);
  467. rrdset_flag_clear(st, RRDSET_FLAG_RECEIVER_REPLICATION_FINISHED);
  468. rrdhost_receiver_replicating_charts_plus_one(st->rrdhost);
  469. ok = replicate_chart_request(send_to_plugin, parser, host, st,
  470. first_entry_child, last_entry_child, child_wall_clock_time,
  471. 0, 0);
  472. }
  473. #ifdef NETDATA_LOG_REPLICATION_REQUESTS
  474. else {
  475. internal_error(true, "REPLAY: 'host:%s/chart:%s' not sending duplicate replication request",
  476. rrdhost_hostname(st->rrdhost), rrdset_id(st));
  477. }
  478. #endif
  479. return ok ? PARSER_RC_OK : PARSER_RC_ERROR;
  480. }
  481. static inline PARSER_RC pluginsd_dimension(char **words, size_t num_words, PARSER *parser) {
  482. char *id = get_word(words, num_words, 1);
  483. char *name = get_word(words, num_words, 2);
  484. char *algorithm = get_word(words, num_words, 3);
  485. char *multiplier_s = get_word(words, num_words, 4);
  486. char *divisor_s = get_word(words, num_words, 5);
  487. char *options = get_word(words, num_words, 6);
  488. RRDHOST *host = pluginsd_require_host_from_parent(parser, PLUGINSD_KEYWORD_DIMENSION);
  489. if(!host) return PLUGINSD_DISABLE_PLUGIN(parser, NULL, NULL);
  490. RRDSET *st = pluginsd_require_chart_from_parent(parser, PLUGINSD_KEYWORD_DIMENSION, PLUGINSD_KEYWORD_CHART);
  491. if(!st) return PLUGINSD_DISABLE_PLUGIN(parser, NULL, NULL);
  492. if (unlikely(!id))
  493. return PLUGINSD_DISABLE_PLUGIN(parser, PLUGINSD_KEYWORD_DIMENSION, "missing dimension id");
  494. long multiplier = 1;
  495. if (multiplier_s && *multiplier_s) {
  496. multiplier = str2ll_encoded(multiplier_s);
  497. if (unlikely(!multiplier))
  498. multiplier = 1;
  499. }
  500. long divisor = 1;
  501. if (likely(divisor_s && *divisor_s)) {
  502. divisor = str2ll_encoded(divisor_s);
  503. if (unlikely(!divisor))
  504. divisor = 1;
  505. }
  506. if (unlikely(!algorithm || !*algorithm))
  507. algorithm = "absolute";
  508. if (unlikely(st && rrdset_flag_check(st, RRDSET_FLAG_DEBUG)))
  509. netdata_log_debug(
  510. D_PLUGINSD,
  511. "creating dimension in chart %s, id='%s', name='%s', algorithm='%s', multiplier=%ld, divisor=%ld, hidden='%s'",
  512. rrdset_id(st), id, name ? name : "", rrd_algorithm_name(rrd_algorithm_id(algorithm)), multiplier, divisor,
  513. options ? options : "");
  514. RRDDIM *rd = rrddim_add(st, id, name, multiplier, divisor, rrd_algorithm_id(algorithm));
  515. int unhide_dimension = 1;
  516. rrddim_option_clear(rd, RRDDIM_OPTION_DONT_DETECT_RESETS_OR_OVERFLOWS);
  517. if (options && *options) {
  518. if (strstr(options, "obsolete") != NULL)
  519. rrddim_is_obsolete(st, rd);
  520. else
  521. rrddim_isnot_obsolete(st, rd);
  522. unhide_dimension = !strstr(options, "hidden");
  523. if (strstr(options, "noreset") != NULL)
  524. rrddim_option_set(rd, RRDDIM_OPTION_DONT_DETECT_RESETS_OR_OVERFLOWS);
  525. if (strstr(options, "nooverflow") != NULL)
  526. rrddim_option_set(rd, RRDDIM_OPTION_DONT_DETECT_RESETS_OR_OVERFLOWS);
  527. } else
  528. rrddim_isnot_obsolete(st, rd);
  529. bool should_update_dimension = false;
  530. if (likely(unhide_dimension)) {
  531. rrddim_option_clear(rd, RRDDIM_OPTION_HIDDEN);
  532. should_update_dimension = rrddim_flag_check(rd, RRDDIM_FLAG_META_HIDDEN);
  533. }
  534. else {
  535. rrddim_option_set(rd, RRDDIM_OPTION_HIDDEN);
  536. should_update_dimension = !rrddim_flag_check(rd, RRDDIM_FLAG_META_HIDDEN);
  537. }
  538. if (should_update_dimension) {
  539. rrddim_flag_set(rd, RRDDIM_FLAG_METADATA_UPDATE);
  540. rrdhost_flag_set(rd->rrdset->rrdhost, RRDHOST_FLAG_METADATA_UPDATE);
  541. }
  542. return PARSER_RC_OK;
  543. }
  544. // ----------------------------------------------------------------------------
  545. // execution of functions
  546. struct inflight_function {
  547. int code;
  548. int timeout;
  549. BUFFER *destination_wb;
  550. STRING *function;
  551. void (*callback)(BUFFER *wb, int code, void *callback_data);
  552. void *callback_data;
  553. usec_t timeout_ut;
  554. usec_t started_ut;
  555. usec_t sent_ut;
  556. };
  557. static void inflight_functions_insert_callback(const DICTIONARY_ITEM *item, void *func, void *parser_ptr) {
  558. struct inflight_function *pf = func;
  559. PARSER *parser = parser_ptr;
  560. // leave this code as default, so that when the dictionary is destroyed this will be sent back to the caller
  561. pf->code = HTTP_RESP_GATEWAY_TIMEOUT;
  562. char buffer[2048 + 1];
  563. snprintfz(buffer, 2048, "FUNCTION %s %d \"%s\"\n",
  564. dictionary_acquired_item_name(item),
  565. pf->timeout,
  566. string2str(pf->function));
  567. // send the command to the plugin
  568. int ret = send_to_plugin(buffer, parser);
  569. pf->sent_ut = now_realtime_usec();
  570. if(ret < 0) {
  571. netdata_log_error("FUNCTION: failed to send function to plugin, error %d", ret);
  572. rrd_call_function_error(pf->destination_wb, "Failed to communicate with collector", HTTP_RESP_BACKEND_FETCH_FAILED);
  573. }
  574. else {
  575. internal_error(LOG_FUNCTIONS,
  576. "FUNCTION '%s' with transaction '%s' sent to collector (%d bytes, in %llu usec)",
  577. string2str(pf->function), dictionary_acquired_item_name(item), ret,
  578. pf->sent_ut - pf->started_ut);
  579. }
  580. }
  581. static bool inflight_functions_conflict_callback(const DICTIONARY_ITEM *item __maybe_unused, void *func __maybe_unused, void *new_func, void *parser_ptr __maybe_unused) {
  582. struct inflight_function *pf = new_func;
  583. netdata_log_error("PLUGINSD_PARSER: duplicate UUID on pending function '%s' detected. Ignoring the second one.", string2str(pf->function));
  584. pf->code = rrd_call_function_error(pf->destination_wb, "This request is already in progress", HTTP_RESP_BAD_REQUEST);
  585. pf->callback(pf->destination_wb, pf->code, pf->callback_data);
  586. string_freez(pf->function);
  587. return false;
  588. }
  589. static void inflight_functions_delete_callback(const DICTIONARY_ITEM *item __maybe_unused, void *func, void *parser_ptr __maybe_unused) {
  590. struct inflight_function *pf = func;
  591. internal_error(LOG_FUNCTIONS,
  592. "FUNCTION '%s' result of transaction '%s' received from collector (%zu bytes, request %llu usec, response %llu usec)",
  593. string2str(pf->function), dictionary_acquired_item_name(item),
  594. buffer_strlen(pf->destination_wb), pf->sent_ut - pf->started_ut, now_realtime_usec() - pf->sent_ut);
  595. pf->callback(pf->destination_wb, pf->code, pf->callback_data);
  596. string_freez(pf->function);
  597. }
  598. void inflight_functions_init(PARSER *parser) {
  599. parser->inflight.functions = dictionary_create_advanced(DICT_OPTION_DONT_OVERWRITE_VALUE, &dictionary_stats_category_functions, 0);
  600. dictionary_register_insert_callback(parser->inflight.functions, inflight_functions_insert_callback, parser);
  601. dictionary_register_delete_callback(parser->inflight.functions, inflight_functions_delete_callback, parser);
  602. dictionary_register_conflict_callback(parser->inflight.functions, inflight_functions_conflict_callback, parser);
  603. }
  604. static void inflight_functions_garbage_collect(PARSER *parser, usec_t now) {
  605. parser->inflight.smaller_timeout = 0;
  606. struct inflight_function *pf;
  607. dfe_start_write(parser->inflight.functions, pf) {
  608. if (pf->timeout_ut < now) {
  609. internal_error(true,
  610. "FUNCTION '%s' removing expired transaction '%s', after %llu usec.",
  611. string2str(pf->function), pf_dfe.name, now - pf->started_ut);
  612. if(!buffer_strlen(pf->destination_wb) || pf->code == HTTP_RESP_OK)
  613. pf->code = rrd_call_function_error(pf->destination_wb,
  614. "Timeout waiting for collector response.",
  615. HTTP_RESP_GATEWAY_TIMEOUT);
  616. dictionary_del(parser->inflight.functions, pf_dfe.name);
  617. }
  618. else if(!parser->inflight.smaller_timeout || pf->timeout_ut < parser->inflight.smaller_timeout)
  619. parser->inflight.smaller_timeout = pf->timeout_ut;
  620. }
  621. dfe_done(pf);
  622. }
  623. // this is the function that is called from
  624. // rrd_call_function_and_wait() and rrd_call_function_async()
  625. static int pluginsd_execute_function_callback(BUFFER *destination_wb, int timeout, const char *function, void *collector_data, void (*callback)(BUFFER *wb, int code, void *callback_data), void *callback_data) {
  626. PARSER *parser = collector_data;
  627. usec_t now = now_realtime_usec();
  628. struct inflight_function tmp = {
  629. .started_ut = now,
  630. .timeout_ut = now + timeout * USEC_PER_SEC,
  631. .destination_wb = destination_wb,
  632. .timeout = timeout,
  633. .function = string_strdupz(function),
  634. .callback = callback,
  635. .callback_data = callback_data,
  636. };
  637. uuid_t uuid;
  638. uuid_generate_time(uuid);
  639. char key[UUID_STR_LEN];
  640. uuid_unparse_lower(uuid, key);
  641. dictionary_write_lock(parser->inflight.functions);
  642. // if there is any error, our dictionary callbacks will call the caller callback to notify
  643. // the caller about the error - no need for error handling here.
  644. dictionary_set(parser->inflight.functions, key, &tmp, sizeof(struct inflight_function));
  645. if(!parser->inflight.smaller_timeout || tmp.timeout_ut < parser->inflight.smaller_timeout)
  646. parser->inflight.smaller_timeout = tmp.timeout_ut;
  647. // garbage collect stale inflight functions
  648. if(parser->inflight.smaller_timeout < now)
  649. inflight_functions_garbage_collect(parser, now);
  650. dictionary_write_unlock(parser->inflight.functions);
  651. return HTTP_RESP_OK;
  652. }
  653. static inline PARSER_RC pluginsd_function(char **words, size_t num_words, PARSER *parser) {
  654. bool global = false;
  655. size_t i = 1;
  656. if(num_words >= 2 && strcmp(get_word(words, num_words, 1), "GLOBAL") == 0) {
  657. i++;
  658. global = true;
  659. }
  660. char *name = get_word(words, num_words, i++);
  661. char *timeout_s = get_word(words, num_words, i++);
  662. char *help = get_word(words, num_words, i++);
  663. RRDHOST *host = pluginsd_require_host_from_parent(parser, PLUGINSD_KEYWORD_FUNCTION);
  664. if(!host) return PARSER_RC_ERROR;
  665. RRDSET *st = (global)?NULL:pluginsd_require_chart_from_parent(parser, PLUGINSD_KEYWORD_FUNCTION, PLUGINSD_KEYWORD_CHART);
  666. if(!st) global = true;
  667. if (unlikely(!timeout_s || !name || !help || (!global && !st))) {
  668. netdata_log_error("PLUGINSD: 'host:%s/chart:%s' got a FUNCTION, without providing the required data (global = '%s', name = '%s', timeout = '%s', help = '%s'). Ignoring it.",
  669. rrdhost_hostname(host),
  670. st?rrdset_id(st):"(unset)",
  671. global?"yes":"no",
  672. name?name:"(unset)",
  673. timeout_s?timeout_s:"(unset)",
  674. help?help:"(unset)"
  675. );
  676. return PARSER_RC_ERROR;
  677. }
  678. int timeout = PLUGINS_FUNCTIONS_TIMEOUT_DEFAULT;
  679. if (timeout_s && *timeout_s) {
  680. timeout = str2i(timeout_s);
  681. if (unlikely(timeout <= 0))
  682. timeout = PLUGINS_FUNCTIONS_TIMEOUT_DEFAULT;
  683. }
  684. rrd_collector_add_function(host, st, name, timeout, help, false, pluginsd_execute_function_callback, parser);
  685. return PARSER_RC_OK;
  686. }
  687. static void pluginsd_function_result_end(struct parser *parser, void *action_data) {
  688. STRING *key = action_data;
  689. if(key)
  690. dictionary_del(parser->inflight.functions, string2str(key));
  691. string_freez(key);
  692. }
  693. static inline PARSER_RC pluginsd_function_result_begin(char **words, size_t num_words, PARSER *parser) {
  694. char *key = get_word(words, num_words, 1);
  695. char *status = get_word(words, num_words, 2);
  696. char *format = get_word(words, num_words, 3);
  697. char *expires = get_word(words, num_words, 4);
  698. if (unlikely(!key || !*key || !status || !*status || !format || !*format || !expires || !*expires)) {
  699. netdata_log_error("got a " PLUGINSD_KEYWORD_FUNCTION_RESULT_BEGIN " without providing the required data (key = '%s', status = '%s', format = '%s', expires = '%s')."
  700. , key ? key : "(unset)"
  701. , status ? status : "(unset)"
  702. , format ? format : "(unset)"
  703. , expires ? expires : "(unset)"
  704. );
  705. }
  706. int code = (status && *status) ? str2i(status) : 0;
  707. if (code <= 0)
  708. code = HTTP_RESP_BACKEND_RESPONSE_INVALID;
  709. time_t expiration = (expires && *expires) ? str2l(expires) : 0;
  710. struct inflight_function *pf = NULL;
  711. if(key && *key)
  712. pf = (struct inflight_function *)dictionary_get(parser->inflight.functions, key);
  713. if(!pf) {
  714. netdata_log_error("got a " PLUGINSD_KEYWORD_FUNCTION_RESULT_BEGIN " for transaction '%s', but the transaction is not found.", key?key:"(unset)");
  715. }
  716. else {
  717. if(format && *format)
  718. pf->destination_wb->content_type = functions_format_to_content_type(format);
  719. pf->code = code;
  720. pf->destination_wb->expires = expiration;
  721. if(expiration <= now_realtime_sec())
  722. buffer_no_cacheable(pf->destination_wb);
  723. else
  724. buffer_cacheable(pf->destination_wb);
  725. }
  726. parser->defer.response = (pf) ? pf->destination_wb : NULL;
  727. parser->defer.end_keyword = PLUGINSD_KEYWORD_FUNCTION_RESULT_END;
  728. parser->defer.action = pluginsd_function_result_end;
  729. parser->defer.action_data = string_strdupz(key); // it is ok is key is NULL
  730. parser->flags |= PARSER_DEFER_UNTIL_KEYWORD;
  731. return PARSER_RC_OK;
  732. }
  733. // ----------------------------------------------------------------------------
  734. static inline PARSER_RC pluginsd_variable(char **words, size_t num_words, PARSER *parser) {
  735. char *name = get_word(words, num_words, 1);
  736. char *value = get_word(words, num_words, 2);
  737. NETDATA_DOUBLE v;
  738. RRDHOST *host = pluginsd_require_host_from_parent(parser, PLUGINSD_KEYWORD_VARIABLE);
  739. if(!host) return PLUGINSD_DISABLE_PLUGIN(parser, NULL, NULL);
  740. RRDSET *st = pluginsd_get_chart_from_parent(parser);
  741. int global = (st) ? 0 : 1;
  742. if (name && *name) {
  743. if ((strcmp(name, "GLOBAL") == 0 || strcmp(name, "HOST") == 0)) {
  744. global = 1;
  745. name = get_word(words, num_words, 2);
  746. value = get_word(words, num_words, 3);
  747. } else if ((strcmp(name, "LOCAL") == 0 || strcmp(name, "CHART") == 0)) {
  748. global = 0;
  749. name = get_word(words, num_words, 2);
  750. value = get_word(words, num_words, 3);
  751. }
  752. }
  753. if (unlikely(!name || !*name))
  754. return PLUGINSD_DISABLE_PLUGIN(parser, PLUGINSD_KEYWORD_VARIABLE, "missing variable name");
  755. if (unlikely(!value || !*value))
  756. value = NULL;
  757. if (unlikely(!value)) {
  758. netdata_log_error("PLUGINSD: 'host:%s/chart:%s' cannot set %s VARIABLE '%s' to an empty value",
  759. rrdhost_hostname(host),
  760. st ? rrdset_id(st):"UNSET",
  761. (global) ? "HOST" : "CHART",
  762. name);
  763. return PARSER_RC_OK;
  764. }
  765. if (!global && !st)
  766. return PLUGINSD_DISABLE_PLUGIN(parser, PLUGINSD_KEYWORD_VARIABLE, "no chart is defined and no GLOBAL is given");
  767. char *endptr = NULL;
  768. v = (NETDATA_DOUBLE) str2ndd_encoded(value, &endptr);
  769. if (unlikely(endptr && *endptr)) {
  770. if (endptr == value)
  771. netdata_log_error("PLUGINSD: 'host:%s/chart:%s' the value '%s' of VARIABLE '%s' cannot be parsed as a number",
  772. rrdhost_hostname(host),
  773. st ? rrdset_id(st):"UNSET",
  774. value,
  775. name);
  776. else
  777. netdata_log_error("PLUGINSD: 'host:%s/chart:%s' the value '%s' of VARIABLE '%s' has leftovers: '%s'",
  778. rrdhost_hostname(host),
  779. st ? rrdset_id(st):"UNSET",
  780. value,
  781. name,
  782. endptr);
  783. }
  784. if (global) {
  785. const RRDVAR_ACQUIRED *rva = rrdvar_custom_host_variable_add_and_acquire(host, name);
  786. if (rva) {
  787. rrdvar_custom_host_variable_set(host, rva, v);
  788. rrdvar_custom_host_variable_release(host, rva);
  789. }
  790. else
  791. netdata_log_error("PLUGINSD: 'host:%s' cannot find/create HOST VARIABLE '%s'",
  792. rrdhost_hostname(host),
  793. name);
  794. } else {
  795. const RRDSETVAR_ACQUIRED *rsa = rrdsetvar_custom_chart_variable_add_and_acquire(st, name);
  796. if (rsa) {
  797. rrdsetvar_custom_chart_variable_set(st, rsa, v);
  798. rrdsetvar_custom_chart_variable_release(st, rsa);
  799. }
  800. else
  801. netdata_log_error("PLUGINSD: 'host:%s/chart:%s' cannot find/create CHART VARIABLE '%s'",
  802. rrdhost_hostname(host), rrdset_id(st), name);
  803. }
  804. return PARSER_RC_OK;
  805. }
  806. static inline PARSER_RC pluginsd_flush(char **words __maybe_unused, size_t num_words __maybe_unused, PARSER *parser) {
  807. netdata_log_debug(D_PLUGINSD, "requested a " PLUGINSD_KEYWORD_FLUSH);
  808. pluginsd_set_chart_from_parent(parser, NULL, PLUGINSD_KEYWORD_FLUSH);
  809. parser->user.replay.start_time = 0;
  810. parser->user.replay.end_time = 0;
  811. parser->user.replay.start_time_ut = 0;
  812. parser->user.replay.end_time_ut = 0;
  813. return PARSER_RC_OK;
  814. }
  815. static inline PARSER_RC pluginsd_disable(char **words __maybe_unused, size_t num_words __maybe_unused, PARSER *parser) {
  816. netdata_log_info("PLUGINSD: plugin called DISABLE. Disabling it.");
  817. parser->user.enabled = 0;
  818. return PARSER_RC_STOP;
  819. }
  820. static inline PARSER_RC pluginsd_label(char **words, size_t num_words, PARSER *parser) {
  821. const char *name = get_word(words, num_words, 1);
  822. const char *label_source = get_word(words, num_words, 2);
  823. const char *value = get_word(words, num_words, 3);
  824. if (!name || !label_source || !value)
  825. return PLUGINSD_DISABLE_PLUGIN(parser, PLUGINSD_KEYWORD_LABEL, "missing parameters");
  826. char *store = (char *)value;
  827. bool allocated_store = false;
  828. if(unlikely(num_words > 4)) {
  829. allocated_store = true;
  830. store = mallocz(PLUGINSD_LINE_MAX + 1);
  831. size_t remaining = PLUGINSD_LINE_MAX;
  832. char *move = store;
  833. char *word;
  834. for(size_t i = 3; i < num_words && remaining > 2 && (word = get_word(words, num_words, i)) ;i++) {
  835. if(i > 3) {
  836. *move++ = ' ';
  837. *move = '\0';
  838. remaining--;
  839. }
  840. size_t length = strlen(word);
  841. if (length > remaining)
  842. length = remaining;
  843. remaining -= length;
  844. memcpy(move, word, length);
  845. move += length;
  846. *move = '\0';
  847. }
  848. }
  849. if(unlikely(!(parser->user.new_host_labels)))
  850. parser->user.new_host_labels = rrdlabels_create();
  851. rrdlabels_add(parser->user.new_host_labels, name, store, str2l(label_source));
  852. if (allocated_store)
  853. freez(store);
  854. return PARSER_RC_OK;
  855. }
  856. static inline PARSER_RC pluginsd_overwrite(char **words __maybe_unused, size_t num_words __maybe_unused, PARSER *parser) {
  857. RRDHOST *host = pluginsd_require_host_from_parent(parser, PLUGINSD_KEYWORD_OVERWRITE);
  858. if(!host) return PLUGINSD_DISABLE_PLUGIN(parser, NULL, NULL);
  859. netdata_log_debug(D_PLUGINSD, "requested to OVERWRITE host labels");
  860. if(unlikely(!host->rrdlabels))
  861. host->rrdlabels = rrdlabels_create();
  862. rrdlabels_migrate_to_these(host->rrdlabels, parser->user.new_host_labels);
  863. rrdhost_flag_set(host, RRDHOST_FLAG_METADATA_LABELS | RRDHOST_FLAG_METADATA_UPDATE);
  864. rrdlabels_destroy(parser->user.new_host_labels);
  865. parser->user.new_host_labels = NULL;
  866. return PARSER_RC_OK;
  867. }
  868. static inline PARSER_RC pluginsd_clabel(char **words, size_t num_words, PARSER *parser) {
  869. const char *name = get_word(words, num_words, 1);
  870. const char *value = get_word(words, num_words, 2);
  871. const char *label_source = get_word(words, num_words, 3);
  872. if (!name || !value || !*label_source) {
  873. netdata_log_error("Ignoring malformed or empty CHART LABEL command.");
  874. return PLUGINSD_DISABLE_PLUGIN(parser, NULL, NULL);
  875. }
  876. if(unlikely(!parser->user.chart_rrdlabels_linked_temporarily)) {
  877. RRDSET *st = pluginsd_get_chart_from_parent(parser);
  878. parser->user.chart_rrdlabels_linked_temporarily = st->rrdlabels;
  879. rrdlabels_unmark_all(parser->user.chart_rrdlabels_linked_temporarily);
  880. }
  881. rrdlabels_add(parser->user.chart_rrdlabels_linked_temporarily, name, value, str2l(label_source));
  882. return PARSER_RC_OK;
  883. }
  884. static inline PARSER_RC pluginsd_clabel_commit(char **words __maybe_unused, size_t num_words __maybe_unused, PARSER *parser) {
  885. RRDHOST *host = pluginsd_require_host_from_parent(parser, PLUGINSD_KEYWORD_CLABEL_COMMIT);
  886. if(!host) return PLUGINSD_DISABLE_PLUGIN(parser, NULL, NULL);
  887. RRDSET *st = pluginsd_require_chart_from_parent(parser, PLUGINSD_KEYWORD_CLABEL_COMMIT, PLUGINSD_KEYWORD_BEGIN);
  888. if(!st) return PLUGINSD_DISABLE_PLUGIN(parser, NULL, NULL);
  889. netdata_log_debug(D_PLUGINSD, "requested to commit chart labels");
  890. if(!parser->user.chart_rrdlabels_linked_temporarily) {
  891. netdata_log_error("PLUGINSD: 'host:%s' got CLABEL_COMMIT, without a CHART or BEGIN. Ignoring it.", rrdhost_hostname(host));
  892. return PLUGINSD_DISABLE_PLUGIN(parser, NULL, NULL);
  893. }
  894. rrdlabels_remove_all_unmarked(parser->user.chart_rrdlabels_linked_temporarily);
  895. rrdset_flag_set(st, RRDSET_FLAG_METADATA_UPDATE);
  896. rrdhost_flag_set(st->rrdhost, RRDHOST_FLAG_METADATA_UPDATE);
  897. parser->user.chart_rrdlabels_linked_temporarily = NULL;
  898. return PARSER_RC_OK;
  899. }
  900. static inline PARSER_RC pluginsd_replay_begin(char **words, size_t num_words, PARSER *parser) {
  901. char *id = get_word(words, num_words, 1);
  902. char *start_time_str = get_word(words, num_words, 2);
  903. char *end_time_str = get_word(words, num_words, 3);
  904. char *child_now_str = get_word(words, num_words, 4);
  905. RRDHOST *host = pluginsd_require_host_from_parent(parser, PLUGINSD_KEYWORD_REPLAY_BEGIN);
  906. if(!host) return PLUGINSD_DISABLE_PLUGIN(parser, NULL, NULL);
  907. RRDSET *st;
  908. if (likely(!id || !*id))
  909. st = pluginsd_require_chart_from_parent(parser, PLUGINSD_KEYWORD_REPLAY_BEGIN, PLUGINSD_KEYWORD_REPLAY_BEGIN);
  910. else
  911. st = pluginsd_find_chart(host, id, PLUGINSD_KEYWORD_REPLAY_BEGIN);
  912. if(!st) return PLUGINSD_DISABLE_PLUGIN(parser, NULL, NULL);
  913. pluginsd_set_chart_from_parent(parser, st, PLUGINSD_KEYWORD_REPLAY_BEGIN);
  914. if(start_time_str && end_time_str) {
  915. time_t start_time = (time_t) str2ull_encoded(start_time_str);
  916. time_t end_time = (time_t) str2ull_encoded(end_time_str);
  917. time_t wall_clock_time = 0, tolerance;
  918. bool wall_clock_comes_from_child; (void)wall_clock_comes_from_child;
  919. if(child_now_str) {
  920. wall_clock_time = (time_t) str2ull_encoded(child_now_str);
  921. tolerance = st->update_every + 1;
  922. wall_clock_comes_from_child = true;
  923. }
  924. if(wall_clock_time <= 0) {
  925. wall_clock_time = now_realtime_sec();
  926. tolerance = st->update_every + 5;
  927. wall_clock_comes_from_child = false;
  928. }
  929. #ifdef NETDATA_LOG_REPLICATION_REQUESTS
  930. internal_error(
  931. (!st->replay.start_streaming && (end_time < st->replay.after || start_time > st->replay.before)),
  932. "REPLAY ERROR: 'host:%s/chart:%s' got a " PLUGINSD_KEYWORD_REPLAY_BEGIN " from %ld to %ld, which does not match our request (%ld to %ld).",
  933. rrdhost_hostname(st->rrdhost), rrdset_id(st), start_time, end_time, st->replay.after, st->replay.before);
  934. internal_error(
  935. true,
  936. "REPLAY: 'host:%s/chart:%s' got a " PLUGINSD_KEYWORD_REPLAY_BEGIN " from %ld to %ld, child wall clock is %ld (%s), had requested %ld to %ld",
  937. rrdhost_hostname(st->rrdhost), rrdset_id(st),
  938. start_time, end_time, wall_clock_time, wall_clock_comes_from_child ? "from child" : "parent time",
  939. st->replay.after, st->replay.before);
  940. #endif
  941. if(start_time && end_time && start_time < wall_clock_time + tolerance && end_time < wall_clock_time + tolerance && start_time < end_time) {
  942. if (unlikely(end_time - start_time != st->update_every))
  943. rrdset_set_update_every_s(st, end_time - start_time);
  944. st->last_collected_time.tv_sec = end_time;
  945. st->last_collected_time.tv_usec = 0;
  946. st->last_updated.tv_sec = end_time;
  947. st->last_updated.tv_usec = 0;
  948. st->counter++;
  949. st->counter_done++;
  950. // these are only needed for db mode RAM, SAVE, MAP, ALLOC
  951. st->db.current_entry++;
  952. if(st->db.current_entry >= st->db.entries)
  953. st->db.current_entry -= st->db.entries;
  954. parser->user.replay.start_time = start_time;
  955. parser->user.replay.end_time = end_time;
  956. parser->user.replay.start_time_ut = (usec_t) start_time * USEC_PER_SEC;
  957. parser->user.replay.end_time_ut = (usec_t) end_time * USEC_PER_SEC;
  958. parser->user.replay.wall_clock_time = wall_clock_time;
  959. parser->user.replay.rset_enabled = true;
  960. return PARSER_RC_OK;
  961. }
  962. netdata_log_error("PLUGINSD REPLAY ERROR: 'host:%s/chart:%s' got a " PLUGINSD_KEYWORD_REPLAY_BEGIN
  963. " from %ld to %ld, but timestamps are invalid "
  964. "(now is %ld [%s], tolerance %ld). Ignoring " PLUGINSD_KEYWORD_REPLAY_SET,
  965. rrdhost_hostname(st->rrdhost), rrdset_id(st), start_time, end_time,
  966. wall_clock_time, wall_clock_comes_from_child ? "child wall clock" : "parent wall clock",
  967. tolerance);
  968. }
  969. // the child sends an RBEGIN without any parameters initially
  970. // setting rset_enabled to false, means the RSET should not store any metrics
  971. // to store metrics, the RBEGIN needs to have timestamps
  972. parser->user.replay.start_time = 0;
  973. parser->user.replay.end_time = 0;
  974. parser->user.replay.start_time_ut = 0;
  975. parser->user.replay.end_time_ut = 0;
  976. parser->user.replay.wall_clock_time = 0;
  977. parser->user.replay.rset_enabled = false;
  978. return PARSER_RC_OK;
  979. }
  980. static inline SN_FLAGS pluginsd_parse_storage_number_flags(const char *flags_str) {
  981. SN_FLAGS flags = SN_FLAG_NONE;
  982. char c;
  983. while ((c = *flags_str++)) {
  984. switch (c) {
  985. case 'A':
  986. flags |= SN_FLAG_NOT_ANOMALOUS;
  987. break;
  988. case 'R':
  989. flags |= SN_FLAG_RESET;
  990. break;
  991. case 'E':
  992. flags = SN_EMPTY_SLOT;
  993. return flags;
  994. default:
  995. internal_error(true, "Unknown SN_FLAGS flag '%c'", c);
  996. break;
  997. }
  998. }
  999. return flags;
  1000. }
  1001. static inline PARSER_RC pluginsd_replay_set(char **words, size_t num_words, PARSER *parser) {
  1002. char *dimension = get_word(words, num_words, 1);
  1003. char *value_str = get_word(words, num_words, 2);
  1004. char *flags_str = get_word(words, num_words, 3);
  1005. RRDHOST *host = pluginsd_require_host_from_parent(parser, PLUGINSD_KEYWORD_REPLAY_SET);
  1006. if(!host) return PLUGINSD_DISABLE_PLUGIN(parser, NULL, NULL);
  1007. RRDSET *st = pluginsd_require_chart_from_parent(parser, PLUGINSD_KEYWORD_REPLAY_SET, PLUGINSD_KEYWORD_REPLAY_BEGIN);
  1008. if(!st) return PLUGINSD_DISABLE_PLUGIN(parser, NULL, NULL);
  1009. if(!parser->user.replay.rset_enabled) {
  1010. error_limit_static_thread_var(erl, 1, 0);
  1011. error_limit(&erl, "PLUGINSD: 'host:%s/chart:%s' got a %s but it is disabled by %s errors",
  1012. rrdhost_hostname(host), rrdset_id(st), PLUGINSD_KEYWORD_REPLAY_SET, PLUGINSD_KEYWORD_REPLAY_BEGIN);
  1013. // we have to return OK here
  1014. return PARSER_RC_OK;
  1015. }
  1016. RRDDIM *rd = pluginsd_acquire_dimension(host, st, dimension, PLUGINSD_KEYWORD_REPLAY_SET);
  1017. if(!rd) return PLUGINSD_DISABLE_PLUGIN(parser, NULL, NULL);
  1018. if (unlikely(!parser->user.replay.start_time || !parser->user.replay.end_time)) {
  1019. netdata_log_error("PLUGINSD: 'host:%s/chart:%s/dim:%s' got a %s with invalid timestamps %ld to %ld from a %s. Disabling it.",
  1020. rrdhost_hostname(host),
  1021. rrdset_id(st),
  1022. dimension,
  1023. PLUGINSD_KEYWORD_REPLAY_SET,
  1024. parser->user.replay.start_time,
  1025. parser->user.replay.end_time,
  1026. PLUGINSD_KEYWORD_REPLAY_BEGIN);
  1027. return PLUGINSD_DISABLE_PLUGIN(parser, NULL, NULL);
  1028. }
  1029. if (unlikely(!value_str || !*value_str))
  1030. value_str = "NAN";
  1031. if(unlikely(!flags_str))
  1032. flags_str = "";
  1033. if (likely(value_str)) {
  1034. RRDDIM_FLAGS rd_flags = rrddim_flag_check(rd, RRDDIM_FLAG_OBSOLETE | RRDDIM_FLAG_ARCHIVED);
  1035. if(!(rd_flags & RRDDIM_FLAG_ARCHIVED)) {
  1036. NETDATA_DOUBLE value = str2ndd_encoded(value_str, NULL);
  1037. SN_FLAGS flags = pluginsd_parse_storage_number_flags(flags_str);
  1038. if (!netdata_double_isnumber(value) || (flags == SN_EMPTY_SLOT)) {
  1039. value = NAN;
  1040. flags = SN_EMPTY_SLOT;
  1041. }
  1042. rrddim_store_metric(rd, parser->user.replay.end_time_ut, value, flags);
  1043. rd->collector.last_collected_time.tv_sec = parser->user.replay.end_time;
  1044. rd->collector.last_collected_time.tv_usec = 0;
  1045. rd->collector.counter++;
  1046. }
  1047. else {
  1048. error_limit_static_global_var(erl, 1, 0);
  1049. error_limit(&erl, "PLUGINSD: 'host:%s/chart:%s/dim:%s' has the ARCHIVED flag set, but it is replicated. Ignoring data.",
  1050. rrdhost_hostname(st->rrdhost), rrdset_id(st), rrddim_name(rd));
  1051. }
  1052. }
  1053. return PARSER_RC_OK;
  1054. }
  1055. static inline PARSER_RC pluginsd_replay_rrddim_collection_state(char **words, size_t num_words, PARSER *parser) {
  1056. if(parser->user.replay.rset_enabled == false)
  1057. return PARSER_RC_OK;
  1058. char *dimension = get_word(words, num_words, 1);
  1059. char *last_collected_ut_str = get_word(words, num_words, 2);
  1060. char *last_collected_value_str = get_word(words, num_words, 3);
  1061. char *last_calculated_value_str = get_word(words, num_words, 4);
  1062. char *last_stored_value_str = get_word(words, num_words, 5);
  1063. RRDHOST *host = pluginsd_require_host_from_parent(parser, PLUGINSD_KEYWORD_REPLAY_RRDDIM_STATE);
  1064. if(!host) return PLUGINSD_DISABLE_PLUGIN(parser, NULL, NULL);
  1065. RRDSET *st = pluginsd_require_chart_from_parent(parser, PLUGINSD_KEYWORD_REPLAY_RRDDIM_STATE, PLUGINSD_KEYWORD_REPLAY_BEGIN);
  1066. if(!st) return PLUGINSD_DISABLE_PLUGIN(parser, NULL, NULL);
  1067. RRDDIM *rd = pluginsd_acquire_dimension(host, st, dimension, PLUGINSD_KEYWORD_REPLAY_RRDDIM_STATE);
  1068. if(!rd) return PLUGINSD_DISABLE_PLUGIN(parser, NULL, NULL);
  1069. usec_t dim_last_collected_ut = (usec_t)rd->collector.last_collected_time.tv_sec * USEC_PER_SEC + (usec_t)rd->collector.last_collected_time.tv_usec;
  1070. usec_t last_collected_ut = last_collected_ut_str ? str2ull_encoded(last_collected_ut_str) : 0;
  1071. if(last_collected_ut > dim_last_collected_ut) {
  1072. rd->collector.last_collected_time.tv_sec = (time_t)(last_collected_ut / USEC_PER_SEC);
  1073. rd->collector.last_collected_time.tv_usec = (last_collected_ut % USEC_PER_SEC);
  1074. }
  1075. rd->collector.last_collected_value = last_collected_value_str ? str2ll_encoded(last_collected_value_str) : 0;
  1076. rd->collector.last_calculated_value = last_calculated_value_str ? str2ndd_encoded(last_calculated_value_str, NULL) : 0;
  1077. rd->collector.last_stored_value = last_stored_value_str ? str2ndd_encoded(last_stored_value_str, NULL) : 0.0;
  1078. return PARSER_RC_OK;
  1079. }
  1080. static inline PARSER_RC pluginsd_replay_rrdset_collection_state(char **words, size_t num_words, PARSER *parser) {
  1081. if(parser->user.replay.rset_enabled == false)
  1082. return PARSER_RC_OK;
  1083. char *last_collected_ut_str = get_word(words, num_words, 1);
  1084. char *last_updated_ut_str = get_word(words, num_words, 2);
  1085. RRDHOST *host = pluginsd_require_host_from_parent(parser, PLUGINSD_KEYWORD_REPLAY_RRDSET_STATE);
  1086. if(!host) return PLUGINSD_DISABLE_PLUGIN(parser, NULL, NULL);
  1087. RRDSET *st = pluginsd_require_chart_from_parent(parser, PLUGINSD_KEYWORD_REPLAY_RRDSET_STATE, PLUGINSD_KEYWORD_REPLAY_BEGIN);
  1088. if(!st) return PLUGINSD_DISABLE_PLUGIN(parser, NULL, NULL);
  1089. usec_t chart_last_collected_ut = (usec_t)st->last_collected_time.tv_sec * USEC_PER_SEC + (usec_t)st->last_collected_time.tv_usec;
  1090. usec_t last_collected_ut = last_collected_ut_str ? str2ull_encoded(last_collected_ut_str) : 0;
  1091. if(last_collected_ut > chart_last_collected_ut) {
  1092. st->last_collected_time.tv_sec = (time_t)(last_collected_ut / USEC_PER_SEC);
  1093. st->last_collected_time.tv_usec = (last_collected_ut % USEC_PER_SEC);
  1094. }
  1095. usec_t chart_last_updated_ut = (usec_t)st->last_updated.tv_sec * USEC_PER_SEC + (usec_t)st->last_updated.tv_usec;
  1096. usec_t last_updated_ut = last_updated_ut_str ? str2ull_encoded(last_updated_ut_str) : 0;
  1097. if(last_updated_ut > chart_last_updated_ut) {
  1098. st->last_updated.tv_sec = (time_t)(last_updated_ut / USEC_PER_SEC);
  1099. st->last_updated.tv_usec = (last_updated_ut % USEC_PER_SEC);
  1100. }
  1101. st->counter++;
  1102. st->counter_done++;
  1103. return PARSER_RC_OK;
  1104. }
  1105. static inline PARSER_RC pluginsd_replay_end(char **words, size_t num_words, PARSER *parser) {
  1106. if (num_words < 7) { // accepts 7, but the 7th is optional
  1107. netdata_log_error("REPLAY: malformed " PLUGINSD_KEYWORD_REPLAY_END " command");
  1108. return PARSER_RC_ERROR;
  1109. }
  1110. const char *update_every_child_txt = get_word(words, num_words, 1);
  1111. const char *first_entry_child_txt = get_word(words, num_words, 2);
  1112. const char *last_entry_child_txt = get_word(words, num_words, 3);
  1113. const char *start_streaming_txt = get_word(words, num_words, 4);
  1114. const char *first_entry_requested_txt = get_word(words, num_words, 5);
  1115. const char *last_entry_requested_txt = get_word(words, num_words, 6);
  1116. const char *child_world_time_txt = get_word(words, num_words, 7); // optional
  1117. time_t update_every_child = (time_t) str2ull_encoded(update_every_child_txt);
  1118. time_t first_entry_child = (time_t) str2ull_encoded(first_entry_child_txt);
  1119. time_t last_entry_child = (time_t) str2ull_encoded(last_entry_child_txt);
  1120. bool start_streaming = (strcmp(start_streaming_txt, "true") == 0);
  1121. time_t first_entry_requested = (time_t) str2ull_encoded(first_entry_requested_txt);
  1122. time_t last_entry_requested = (time_t) str2ull_encoded(last_entry_requested_txt);
  1123. // the optional child world time
  1124. time_t child_world_time = (child_world_time_txt && *child_world_time_txt) ? (time_t) str2ull_encoded(
  1125. child_world_time_txt) : now_realtime_sec();
  1126. RRDHOST *host = pluginsd_require_host_from_parent(parser, PLUGINSD_KEYWORD_REPLAY_END);
  1127. if(!host) return PLUGINSD_DISABLE_PLUGIN(parser, NULL, NULL);
  1128. RRDSET *st = pluginsd_require_chart_from_parent(parser, PLUGINSD_KEYWORD_REPLAY_END, PLUGINSD_KEYWORD_REPLAY_BEGIN);
  1129. if(!st) return PLUGINSD_DISABLE_PLUGIN(parser, NULL, NULL);
  1130. #ifdef NETDATA_LOG_REPLICATION_REQUESTS
  1131. internal_error(true,
  1132. "PLUGINSD REPLAY: 'host:%s/chart:%s': got a " PLUGINSD_KEYWORD_REPLAY_END " child db from %llu to %llu, start_streaming %s, had requested from %llu to %llu, wall clock %llu",
  1133. rrdhost_hostname(host), rrdset_id(st),
  1134. (unsigned long long)first_entry_child, (unsigned long long)last_entry_child,
  1135. start_streaming?"true":"false",
  1136. (unsigned long long)first_entry_requested, (unsigned long long)last_entry_requested,
  1137. (unsigned long long)child_world_time
  1138. );
  1139. #endif
  1140. parser->user.data_collections_count++;
  1141. if(parser->user.replay.rset_enabled && st->rrdhost->receiver) {
  1142. time_t now = now_realtime_sec();
  1143. time_t started = st->rrdhost->receiver->replication_first_time_t;
  1144. time_t current = parser->user.replay.end_time;
  1145. if(started && current > started) {
  1146. host->rrdpush_receiver_replication_percent = (NETDATA_DOUBLE) (current - started) * 100.0 / (NETDATA_DOUBLE) (now - started);
  1147. worker_set_metric(WORKER_RECEIVER_JOB_REPLICATION_COMPLETION,
  1148. host->rrdpush_receiver_replication_percent);
  1149. }
  1150. }
  1151. parser->user.replay.start_time = 0;
  1152. parser->user.replay.end_time = 0;
  1153. parser->user.replay.start_time_ut = 0;
  1154. parser->user.replay.end_time_ut = 0;
  1155. parser->user.replay.wall_clock_time = 0;
  1156. parser->user.replay.rset_enabled = false;
  1157. st->counter++;
  1158. st->counter_done++;
  1159. store_metric_collection_completed();
  1160. #ifdef NETDATA_LOG_REPLICATION_REQUESTS
  1161. st->replay.start_streaming = false;
  1162. st->replay.after = 0;
  1163. st->replay.before = 0;
  1164. if(start_streaming)
  1165. st->replay.log_next_data_collection = true;
  1166. #endif
  1167. if (start_streaming) {
  1168. if (st->update_every != update_every_child)
  1169. rrdset_set_update_every_s(st, update_every_child);
  1170. if(rrdset_flag_check(st, RRDSET_FLAG_RECEIVER_REPLICATION_IN_PROGRESS)) {
  1171. rrdset_flag_set(st, RRDSET_FLAG_RECEIVER_REPLICATION_FINISHED);
  1172. rrdset_flag_clear(st, RRDSET_FLAG_RECEIVER_REPLICATION_IN_PROGRESS);
  1173. rrdset_flag_clear(st, RRDSET_FLAG_SYNC_CLOCK);
  1174. rrdhost_receiver_replicating_charts_minus_one(st->rrdhost);
  1175. }
  1176. #ifdef NETDATA_LOG_REPLICATION_REQUESTS
  1177. else
  1178. internal_error(true, "REPLAY ERROR: 'host:%s/chart:%s' got a " PLUGINSD_KEYWORD_REPLAY_END " with enable_streaming = true, but there is no replication in progress for this chart.",
  1179. rrdhost_hostname(host), rrdset_id(st));
  1180. #endif
  1181. pluginsd_set_chart_from_parent(parser, NULL, PLUGINSD_KEYWORD_REPLAY_END);
  1182. host->rrdpush_receiver_replication_percent = 100.0;
  1183. worker_set_metric(WORKER_RECEIVER_JOB_REPLICATION_COMPLETION, host->rrdpush_receiver_replication_percent);
  1184. return PARSER_RC_OK;
  1185. }
  1186. pluginsd_set_chart_from_parent(parser, NULL, PLUGINSD_KEYWORD_REPLAY_END);
  1187. rrdcontext_updated_retention_rrdset(st);
  1188. bool ok = replicate_chart_request(send_to_plugin, parser, host, st,
  1189. first_entry_child, last_entry_child, child_world_time,
  1190. first_entry_requested, last_entry_requested);
  1191. return ok ? PARSER_RC_OK : PARSER_RC_ERROR;
  1192. }
  1193. static inline PARSER_RC pluginsd_begin_v2(char **words, size_t num_words, PARSER *parser) {
  1194. timing_init();
  1195. char *id = get_word(words, num_words, 1);
  1196. char *update_every_str = get_word(words, num_words, 2);
  1197. char *end_time_str = get_word(words, num_words, 3);
  1198. char *wall_clock_time_str = get_word(words, num_words, 4);
  1199. if(unlikely(!id || !update_every_str || !end_time_str || !wall_clock_time_str))
  1200. return PLUGINSD_DISABLE_PLUGIN(parser, PLUGINSD_KEYWORD_BEGIN_V2, "missing parameters");
  1201. RRDHOST *host = pluginsd_require_host_from_parent(parser, PLUGINSD_KEYWORD_BEGIN_V2);
  1202. if(unlikely(!host)) return PLUGINSD_DISABLE_PLUGIN(parser, NULL, NULL);
  1203. timing_step(TIMING_STEP_BEGIN2_PREPARE);
  1204. RRDSET *st = pluginsd_find_chart(host, id, PLUGINSD_KEYWORD_BEGIN_V2);
  1205. if(unlikely(!st)) return PLUGINSD_DISABLE_PLUGIN(parser, NULL, NULL);
  1206. pluginsd_set_chart_from_parent(parser, st, PLUGINSD_KEYWORD_BEGIN_V2);
  1207. if(unlikely(rrdset_flag_check(st, RRDSET_FLAG_OBSOLETE | RRDSET_FLAG_ARCHIVED)))
  1208. rrdset_isnot_obsolete(st);
  1209. timing_step(TIMING_STEP_BEGIN2_FIND_CHART);
  1210. // ------------------------------------------------------------------------
  1211. // parse the parameters
  1212. time_t update_every = (time_t) str2ull_encoded(update_every_str);
  1213. time_t end_time = (time_t) str2ull_encoded(end_time_str);
  1214. time_t wall_clock_time;
  1215. if(likely(*wall_clock_time_str == '#'))
  1216. wall_clock_time = end_time;
  1217. else
  1218. wall_clock_time = (time_t) str2ull_encoded(wall_clock_time_str);
  1219. if (unlikely(update_every != st->update_every))
  1220. rrdset_set_update_every_s(st, update_every);
  1221. timing_step(TIMING_STEP_BEGIN2_PARSE);
  1222. // ------------------------------------------------------------------------
  1223. // prepare our state
  1224. pluginsd_lock_rrdset_data_collection(parser);
  1225. parser->user.v2.update_every = update_every;
  1226. parser->user.v2.end_time = end_time;
  1227. parser->user.v2.wall_clock_time = wall_clock_time;
  1228. parser->user.v2.ml_locked = ml_chart_update_begin(st);
  1229. timing_step(TIMING_STEP_BEGIN2_ML);
  1230. // ------------------------------------------------------------------------
  1231. // propagate it forward in v2
  1232. if(!parser->user.v2.stream_buffer.wb && rrdhost_has_rrdpush_sender_enabled(st->rrdhost))
  1233. parser->user.v2.stream_buffer = rrdset_push_metric_initialize(parser->user.st, wall_clock_time);
  1234. if(parser->user.v2.stream_buffer.v2 && parser->user.v2.stream_buffer.wb) {
  1235. // check if receiver and sender have the same number parsing capabilities
  1236. bool can_copy = stream_has_capability(&parser->user, STREAM_CAP_IEEE754) == stream_has_capability(&parser->user.v2.stream_buffer, STREAM_CAP_IEEE754);
  1237. NUMBER_ENCODING encoding = stream_has_capability(&parser->user.v2.stream_buffer, STREAM_CAP_IEEE754) ? NUMBER_ENCODING_BASE64 : NUMBER_ENCODING_HEX;
  1238. BUFFER *wb = parser->user.v2.stream_buffer.wb;
  1239. buffer_need_bytes(wb, 1024);
  1240. if(unlikely(parser->user.v2.stream_buffer.begin_v2_added))
  1241. buffer_fast_strcat(wb, PLUGINSD_KEYWORD_END_V2 "\n", sizeof(PLUGINSD_KEYWORD_END_V2) - 1 + 1);
  1242. buffer_fast_strcat(wb, PLUGINSD_KEYWORD_BEGIN_V2 " '", sizeof(PLUGINSD_KEYWORD_BEGIN_V2) - 1 + 2);
  1243. buffer_fast_strcat(wb, rrdset_id(st), string_strlen(st->id));
  1244. buffer_fast_strcat(wb, "' ", 2);
  1245. if(can_copy)
  1246. buffer_strcat(wb, update_every_str);
  1247. else
  1248. buffer_print_uint64_encoded(wb, encoding, update_every);
  1249. buffer_fast_strcat(wb, " ", 1);
  1250. if(can_copy)
  1251. buffer_strcat(wb, end_time_str);
  1252. else
  1253. buffer_print_uint64_encoded(wb, encoding, end_time);
  1254. buffer_fast_strcat(wb, " ", 1);
  1255. if(can_copy)
  1256. buffer_strcat(wb, wall_clock_time_str);
  1257. else
  1258. buffer_print_uint64_encoded(wb, encoding, wall_clock_time);
  1259. buffer_fast_strcat(wb, "\n", 1);
  1260. parser->user.v2.stream_buffer.last_point_end_time_s = end_time;
  1261. parser->user.v2.stream_buffer.begin_v2_added = true;
  1262. }
  1263. timing_step(TIMING_STEP_BEGIN2_PROPAGATE);
  1264. // ------------------------------------------------------------------------
  1265. // store it
  1266. st->last_collected_time.tv_sec = end_time;
  1267. st->last_collected_time.tv_usec = 0;
  1268. st->last_updated.tv_sec = end_time;
  1269. st->last_updated.tv_usec = 0;
  1270. st->counter++;
  1271. st->counter_done++;
  1272. // these are only needed for db mode RAM, SAVE, MAP, ALLOC
  1273. st->db.current_entry++;
  1274. if(st->db.current_entry >= st->db.entries)
  1275. st->db.current_entry -= st->db.entries;
  1276. timing_step(TIMING_STEP_BEGIN2_STORE);
  1277. return PARSER_RC_OK;
  1278. }
  1279. static inline PARSER_RC pluginsd_set_v2(char **words, size_t num_words, PARSER *parser) {
  1280. timing_init();
  1281. char *dimension = get_word(words, num_words, 1);
  1282. char *collected_str = get_word(words, num_words, 2);
  1283. char *value_str = get_word(words, num_words, 3);
  1284. char *flags_str = get_word(words, num_words, 4);
  1285. if(unlikely(!dimension || !collected_str || !value_str || !flags_str))
  1286. return PLUGINSD_DISABLE_PLUGIN(parser, PLUGINSD_KEYWORD_SET_V2, "missing parameters");
  1287. RRDHOST *host = pluginsd_require_host_from_parent(parser, PLUGINSD_KEYWORD_SET_V2);
  1288. if(unlikely(!host)) return PLUGINSD_DISABLE_PLUGIN(parser, NULL, NULL);
  1289. RRDSET *st = pluginsd_require_chart_from_parent(parser, PLUGINSD_KEYWORD_SET_V2, PLUGINSD_KEYWORD_BEGIN_V2);
  1290. if(unlikely(!st)) return PLUGINSD_DISABLE_PLUGIN(parser, NULL, NULL);
  1291. timing_step(TIMING_STEP_SET2_PREPARE);
  1292. RRDDIM *rd = pluginsd_acquire_dimension(host, st, dimension, PLUGINSD_KEYWORD_SET_V2);
  1293. if(unlikely(!rd)) return PLUGINSD_DISABLE_PLUGIN(parser, NULL, NULL);
  1294. if(unlikely(rrddim_flag_check(rd, RRDDIM_FLAG_OBSOLETE | RRDDIM_FLAG_ARCHIVED)))
  1295. rrddim_isnot_obsolete(st, rd);
  1296. timing_step(TIMING_STEP_SET2_LOOKUP_DIMENSION);
  1297. // ------------------------------------------------------------------------
  1298. // parse the parameters
  1299. collected_number collected_value = (collected_number) str2ll_encoded(collected_str);
  1300. NETDATA_DOUBLE value;
  1301. if(*value_str == '#')
  1302. value = (NETDATA_DOUBLE)collected_value;
  1303. else
  1304. value = str2ndd_encoded(value_str, NULL);
  1305. SN_FLAGS flags = pluginsd_parse_storage_number_flags(flags_str);
  1306. timing_step(TIMING_STEP_SET2_PARSE);
  1307. // ------------------------------------------------------------------------
  1308. // check value and ML
  1309. if (unlikely(!netdata_double_isnumber(value) || (flags == SN_EMPTY_SLOT))) {
  1310. value = NAN;
  1311. flags = SN_EMPTY_SLOT;
  1312. if(parser->user.v2.ml_locked)
  1313. ml_dimension_is_anomalous(rd, parser->user.v2.end_time, 0, false);
  1314. }
  1315. else if(parser->user.v2.ml_locked) {
  1316. if (ml_dimension_is_anomalous(rd, parser->user.v2.end_time, value, true)) {
  1317. // clear anomaly bit: 0 -> is anomalous, 1 -> not anomalous
  1318. flags &= ~((storage_number) SN_FLAG_NOT_ANOMALOUS);
  1319. }
  1320. else
  1321. flags |= SN_FLAG_NOT_ANOMALOUS;
  1322. }
  1323. timing_step(TIMING_STEP_SET2_ML);
  1324. // ------------------------------------------------------------------------
  1325. // propagate it forward in v2
  1326. if(parser->user.v2.stream_buffer.v2 && parser->user.v2.stream_buffer.begin_v2_added && parser->user.v2.stream_buffer.wb) {
  1327. // check if receiver and sender have the same number parsing capabilities
  1328. bool can_copy = stream_has_capability(&parser->user, STREAM_CAP_IEEE754) == stream_has_capability(&parser->user.v2.stream_buffer, STREAM_CAP_IEEE754);
  1329. NUMBER_ENCODING integer_encoding = stream_has_capability(&parser->user.v2.stream_buffer, STREAM_CAP_IEEE754) ? NUMBER_ENCODING_BASE64 : NUMBER_ENCODING_HEX;
  1330. NUMBER_ENCODING doubles_encoding = stream_has_capability(&parser->user.v2.stream_buffer, STREAM_CAP_IEEE754) ? NUMBER_ENCODING_BASE64 : NUMBER_ENCODING_DECIMAL;
  1331. BUFFER *wb = parser->user.v2.stream_buffer.wb;
  1332. buffer_need_bytes(wb, 1024);
  1333. buffer_fast_strcat(wb, PLUGINSD_KEYWORD_SET_V2 " '", sizeof(PLUGINSD_KEYWORD_SET_V2) - 1 + 2);
  1334. buffer_fast_strcat(wb, rrddim_id(rd), string_strlen(rd->id));
  1335. buffer_fast_strcat(wb, "' ", 2);
  1336. if(can_copy)
  1337. buffer_strcat(wb, collected_str);
  1338. else
  1339. buffer_print_int64_encoded(wb, integer_encoding, collected_value); // original v2 had hex
  1340. buffer_fast_strcat(wb, " ", 1);
  1341. if(can_copy)
  1342. buffer_strcat(wb, value_str);
  1343. else
  1344. buffer_print_netdata_double_encoded(wb, doubles_encoding, value); // original v2 had decimal
  1345. buffer_fast_strcat(wb, " ", 1);
  1346. buffer_print_sn_flags(wb, flags, true);
  1347. buffer_fast_strcat(wb, "\n", 1);
  1348. }
  1349. timing_step(TIMING_STEP_SET2_PROPAGATE);
  1350. // ------------------------------------------------------------------------
  1351. // store it
  1352. rrddim_store_metric(rd, parser->user.v2.end_time * USEC_PER_SEC, value, flags);
  1353. rd->collector.last_collected_time.tv_sec = parser->user.v2.end_time;
  1354. rd->collector.last_collected_time.tv_usec = 0;
  1355. rd->collector.last_collected_value = collected_value;
  1356. rd->collector.last_stored_value = value;
  1357. rd->collector.last_calculated_value = value;
  1358. rd->collector.counter++;
  1359. rrddim_set_updated(rd);
  1360. timing_step(TIMING_STEP_SET2_STORE);
  1361. return PARSER_RC_OK;
  1362. }
  1363. void pluginsd_cleanup_v2(PARSER *parser) {
  1364. // this is called when the thread is stopped while processing
  1365. pluginsd_set_chart_from_parent(parser, NULL, "THREAD CLEANUP");
  1366. }
  1367. static inline PARSER_RC pluginsd_end_v2(char **words __maybe_unused, size_t num_words __maybe_unused, PARSER *parser) {
  1368. timing_init();
  1369. RRDHOST *host = pluginsd_require_host_from_parent(parser, PLUGINSD_KEYWORD_END_V2);
  1370. if(unlikely(!host)) return PLUGINSD_DISABLE_PLUGIN(parser, NULL, NULL);
  1371. RRDSET *st = pluginsd_require_chart_from_parent(parser, PLUGINSD_KEYWORD_END_V2, PLUGINSD_KEYWORD_BEGIN_V2);
  1372. if(unlikely(!st)) return PLUGINSD_DISABLE_PLUGIN(parser, NULL, NULL);
  1373. parser->user.data_collections_count++;
  1374. timing_step(TIMING_STEP_END2_PREPARE);
  1375. // ------------------------------------------------------------------------
  1376. // propagate the whole chart update in v1
  1377. if(unlikely(!parser->user.v2.stream_buffer.v2 && !parser->user.v2.stream_buffer.begin_v2_added && parser->user.v2.stream_buffer.wb))
  1378. rrdset_push_metrics_v1(&parser->user.v2.stream_buffer, st);
  1379. timing_step(TIMING_STEP_END2_PUSH_V1);
  1380. // ------------------------------------------------------------------------
  1381. // unblock data collection
  1382. pluginsd_unlock_previous_chart(parser, PLUGINSD_KEYWORD_END_V2, false);
  1383. rrdcontext_collected_rrdset(st);
  1384. store_metric_collection_completed();
  1385. timing_step(TIMING_STEP_END2_RRDSET);
  1386. // ------------------------------------------------------------------------
  1387. // propagate it forward
  1388. rrdset_push_metrics_finished(&parser->user.v2.stream_buffer, st);
  1389. timing_step(TIMING_STEP_END2_PROPAGATE);
  1390. // ------------------------------------------------------------------------
  1391. // cleanup RRDSET / RRDDIM
  1392. RRDDIM *rd;
  1393. rrddim_foreach_read(rd, st) {
  1394. rd->collector.calculated_value = 0;
  1395. rd->collector.collected_value = 0;
  1396. rrddim_clear_updated(rd);
  1397. }
  1398. rrddim_foreach_done(rd);
  1399. // ------------------------------------------------------------------------
  1400. // reset state
  1401. parser->user.v2 = (struct parser_user_object_v2){ 0 };
  1402. timing_step(TIMING_STEP_END2_STORE);
  1403. timing_report();
  1404. return PARSER_RC_OK;
  1405. }
  1406. static inline PARSER_RC pluginsd_exit(char **words __maybe_unused, size_t num_words __maybe_unused, PARSER *parser __maybe_unused) {
  1407. netdata_log_info("PLUGINSD: plugin called EXIT.");
  1408. return PARSER_RC_STOP;
  1409. }
  1410. static inline PARSER_RC streaming_claimed_id(char **words, size_t num_words, PARSER *parser)
  1411. {
  1412. const char *host_uuid_str = get_word(words, num_words, 1);
  1413. const char *claim_id_str = get_word(words, num_words, 2);
  1414. if (!host_uuid_str || !claim_id_str) {
  1415. netdata_log_error("Command CLAIMED_ID came malformed, uuid = '%s', claim_id = '%s'",
  1416. host_uuid_str ? host_uuid_str : "[unset]",
  1417. claim_id_str ? claim_id_str : "[unset]");
  1418. return PARSER_RC_ERROR;
  1419. }
  1420. uuid_t uuid;
  1421. RRDHOST *host = parser->user.host;
  1422. // We don't need the parsed UUID
  1423. // just do it to check the format
  1424. if(uuid_parse(host_uuid_str, uuid)) {
  1425. netdata_log_error("1st parameter (host GUID) to CLAIMED_ID command is not valid GUID. Received: \"%s\".", host_uuid_str);
  1426. return PARSER_RC_ERROR;
  1427. }
  1428. if(uuid_parse(claim_id_str, uuid) && strcmp(claim_id_str, "NULL") != 0) {
  1429. netdata_log_error("2nd parameter (Claim ID) to CLAIMED_ID command is not valid GUID. Received: \"%s\".", claim_id_str);
  1430. return PARSER_RC_ERROR;
  1431. }
  1432. if(strcmp(host_uuid_str, host->machine_guid) != 0) {
  1433. netdata_log_error("Claim ID is for host \"%s\" but it came over connection for \"%s\"", host_uuid_str, host->machine_guid);
  1434. return PARSER_RC_OK; //the message is OK problem must be somewhere else
  1435. }
  1436. rrdhost_aclk_state_lock(host);
  1437. if (host->aclk_state.claimed_id)
  1438. freez(host->aclk_state.claimed_id);
  1439. host->aclk_state.claimed_id = strcmp(claim_id_str, "NULL") ? strdupz(claim_id_str) : NULL;
  1440. rrdhost_aclk_state_unlock(host);
  1441. rrdhost_flag_set(host, RRDHOST_FLAG_METADATA_CLAIMID |RRDHOST_FLAG_METADATA_UPDATE);
  1442. rrdpush_send_claimed_id(host);
  1443. return PARSER_RC_OK;
  1444. }
  1445. // ----------------------------------------------------------------------------
  1446. static inline bool buffered_reader_read(struct buffered_reader *reader, int fd) {
  1447. #ifdef NETDATA_INTERNAL_CHECKS
  1448. if(reader->read_buffer[reader->read_len] != '\0')
  1449. fatal("%s(): read_buffer does not start with zero", __FUNCTION__ );
  1450. #endif
  1451. ssize_t bytes_read = read(fd, reader->read_buffer + reader->read_len, sizeof(reader->read_buffer) - reader->read_len - 1);
  1452. if(unlikely(bytes_read <= 0))
  1453. return false;
  1454. reader->read_len += bytes_read;
  1455. reader->read_buffer[reader->read_len] = '\0';
  1456. return true;
  1457. }
  1458. static inline bool buffered_reader_read_timeout(struct buffered_reader *reader, int fd, int timeout_ms) {
  1459. errno = 0;
  1460. struct pollfd fds[1];
  1461. fds[0].fd = fd;
  1462. fds[0].events = POLLIN;
  1463. int ret = poll(fds, 1, timeout_ms);
  1464. if (ret > 0) {
  1465. /* There is data to read */
  1466. if (fds[0].revents & POLLIN)
  1467. return buffered_reader_read(reader, fd);
  1468. else if(fds[0].revents & POLLERR) {
  1469. netdata_log_error("PARSER: read failed: POLLERR.");
  1470. return false;
  1471. }
  1472. else if(fds[0].revents & POLLHUP) {
  1473. netdata_log_error("PARSER: read failed: POLLHUP.");
  1474. return false;
  1475. }
  1476. else if(fds[0].revents & POLLNVAL) {
  1477. netdata_log_error("PARSER: read failed: POLLNVAL.");
  1478. return false;
  1479. }
  1480. netdata_log_error("PARSER: poll() returned positive number, but POLLIN|POLLERR|POLLHUP|POLLNVAL are not set.");
  1481. return false;
  1482. }
  1483. else if (ret == 0) {
  1484. netdata_log_error("PARSER: timeout while waiting for data.");
  1485. return false;
  1486. }
  1487. netdata_log_error("PARSER: poll() failed with code %d.", ret);
  1488. return false;
  1489. }
  1490. void pluginsd_process_thread_cleanup(void *ptr) {
  1491. PARSER *parser = (PARSER *)ptr;
  1492. pluginsd_cleanup_v2(parser);
  1493. pluginsd_host_define_cleanup(parser);
  1494. rrd_collector_finished();
  1495. parser_destroy(parser);
  1496. }
  1497. inline size_t pluginsd_process(RRDHOST *host, struct plugind *cd, FILE *fp_plugin_input, FILE *fp_plugin_output, int trust_durations)
  1498. {
  1499. int enabled = cd->unsafe.enabled;
  1500. if (!fp_plugin_input || !fp_plugin_output || !enabled) {
  1501. cd->unsafe.enabled = 0;
  1502. return 0;
  1503. }
  1504. if (unlikely(fileno(fp_plugin_input) == -1)) {
  1505. netdata_log_error("input file descriptor given is not a valid stream");
  1506. cd->serial_failures++;
  1507. return 0;
  1508. }
  1509. if (unlikely(fileno(fp_plugin_output) == -1)) {
  1510. netdata_log_error("output file descriptor given is not a valid stream");
  1511. cd->serial_failures++;
  1512. return 0;
  1513. }
  1514. clearerr(fp_plugin_input);
  1515. clearerr(fp_plugin_output);
  1516. PARSER *parser;
  1517. {
  1518. PARSER_USER_OBJECT user = {
  1519. .enabled = cd->unsafe.enabled,
  1520. .host = host,
  1521. .cd = cd,
  1522. .trust_durations = trust_durations
  1523. };
  1524. // fp_plugin_output = our input; fp_plugin_input = our output
  1525. parser = parser_init(&user, fp_plugin_output, fp_plugin_input, -1, PARSER_INPUT_SPLIT, NULL);
  1526. }
  1527. pluginsd_keywords_init(parser, PARSER_INIT_PLUGINSD);
  1528. rrd_collector_started();
  1529. size_t count = 0;
  1530. // this keeps the parser with its current value
  1531. // so, parser needs to be allocated before pushing it
  1532. netdata_thread_cleanup_push(pluginsd_process_thread_cleanup, parser);
  1533. buffered_reader_init(&parser->reader);
  1534. char buffer[PLUGINSD_LINE_MAX + 2];
  1535. while(likely(service_running(SERVICE_COLLECTORS))) {
  1536. if (unlikely(!buffered_reader_next_line(&parser->reader, buffer, PLUGINSD_LINE_MAX + 2))) {
  1537. if(unlikely(!buffered_reader_read_timeout(&parser->reader, fileno((FILE *)parser->fp_input), 2 * 60 * MSEC_PER_SEC)))
  1538. break;
  1539. }
  1540. else if(unlikely(parser_action(parser, buffer)))
  1541. break;
  1542. }
  1543. cd->unsafe.enabled = parser->user.enabled;
  1544. count = parser->user.data_collections_count;
  1545. if (likely(count)) {
  1546. cd->successful_collections += count;
  1547. cd->serial_failures = 0;
  1548. }
  1549. else
  1550. cd->serial_failures++;
  1551. // free parser with the pop function
  1552. netdata_thread_cleanup_pop(1);
  1553. return count;
  1554. }
  1555. void pluginsd_keywords_init(PARSER *parser, PARSER_REPERTOIRE repertoire) {
  1556. parser_init_repertoire(parser, repertoire);
  1557. if (repertoire & (PARSER_INIT_PLUGINSD | PARSER_INIT_STREAMING))
  1558. inflight_functions_init(parser);
  1559. }
  1560. PARSER *parser_init(struct parser_user_object *user, FILE *fp_input, FILE *fp_output, int fd,
  1561. PARSER_INPUT_TYPE flags, void *ssl __maybe_unused) {
  1562. PARSER *parser;
  1563. parser = callocz(1, sizeof(*parser));
  1564. if(user)
  1565. parser->user = *user;
  1566. parser->fd = fd;
  1567. parser->fp_input = fp_input;
  1568. parser->fp_output = fp_output;
  1569. #ifdef ENABLE_HTTPS
  1570. parser->ssl_output = ssl;
  1571. #endif
  1572. parser->flags = flags;
  1573. spinlock_init(&parser->writer.spinlock);
  1574. return parser;
  1575. }
  1576. PARSER_RC parser_execute(PARSER *parser, PARSER_KEYWORD *keyword, char **words, size_t num_words) {
  1577. switch(keyword->id) {
  1578. case 1:
  1579. return pluginsd_set_v2(words, num_words, parser);
  1580. case 2:
  1581. return pluginsd_begin_v2(words, num_words, parser);
  1582. case 3:
  1583. return pluginsd_end_v2(words, num_words, parser);
  1584. case 11:
  1585. return pluginsd_set(words, num_words, parser);
  1586. case 12:
  1587. return pluginsd_begin(words, num_words, parser);
  1588. case 13:
  1589. return pluginsd_end(words, num_words, parser);
  1590. case 21:
  1591. return pluginsd_replay_set(words, num_words, parser);
  1592. case 22:
  1593. return pluginsd_replay_begin(words, num_words, parser);
  1594. case 23:
  1595. return pluginsd_replay_rrddim_collection_state(words, num_words, parser);
  1596. case 24:
  1597. return pluginsd_replay_rrdset_collection_state(words, num_words, parser);
  1598. case 25:
  1599. return pluginsd_replay_end(words, num_words, parser);
  1600. case 31:
  1601. return pluginsd_dimension(words, num_words, parser);
  1602. case 32:
  1603. return pluginsd_chart(words, num_words, parser);
  1604. case 33:
  1605. return pluginsd_chart_definition_end(words, num_words, parser);
  1606. case 34:
  1607. return pluginsd_clabel(words, num_words, parser);
  1608. case 35:
  1609. return pluginsd_clabel_commit(words, num_words, parser);
  1610. case 41:
  1611. return pluginsd_function(words, num_words, parser);
  1612. case 42:
  1613. return pluginsd_function_result_begin(words, num_words, parser);
  1614. case 51:
  1615. return pluginsd_label(words, num_words, parser);
  1616. case 52:
  1617. return pluginsd_overwrite(words, num_words, parser);
  1618. case 53:
  1619. return pluginsd_variable(words, num_words, parser);
  1620. case 61:
  1621. return streaming_claimed_id(words, num_words, parser);
  1622. case 71:
  1623. return pluginsd_host(words, num_words, parser);
  1624. case 72:
  1625. return pluginsd_host_define(words, num_words, parser);
  1626. case 73:
  1627. return pluginsd_host_define_end(words, num_words, parser);
  1628. case 74:
  1629. return pluginsd_host_labels(words, num_words, parser);
  1630. case 97:
  1631. return pluginsd_flush(words, num_words, parser);
  1632. case 98:
  1633. return pluginsd_disable(words, num_words, parser);
  1634. case 99:
  1635. return pluginsd_exit(words, num_words, parser);
  1636. default:
  1637. fatal("Unknown keyword '%s' with id %zu", keyword->keyword, keyword->id);
  1638. }
  1639. }
  1640. #include "gperf-hashtable.h"
  1641. void parser_init_repertoire(PARSER *parser, PARSER_REPERTOIRE repertoire) {
  1642. parser->repertoire = repertoire;
  1643. for(size_t i = GPERF_PARSER_MIN_HASH_VALUE ; i <= GPERF_PARSER_MAX_HASH_VALUE ;i++) {
  1644. if(gperf_keywords[i].keyword && *gperf_keywords[i].keyword && (parser->repertoire & gperf_keywords[i].repertoire))
  1645. worker_register_job_name(gperf_keywords[i].worker_job_id, gperf_keywords[i].keyword);
  1646. }
  1647. }
  1648. void parser_destroy(PARSER *parser) {
  1649. if (unlikely(!parser))
  1650. return;
  1651. dictionary_destroy(parser->inflight.functions);
  1652. freez(parser);
  1653. }
  1654. int pluginsd_parser_unittest(void) {
  1655. PARSER *p = parser_init(NULL, NULL, NULL, -1, PARSER_INPUT_SPLIT, NULL);
  1656. pluginsd_keywords_init(p, PARSER_INIT_PLUGINSD | PARSER_INIT_STREAMING);
  1657. char *lines[] = {
  1658. "BEGIN2 abcdefghijklmnopqr 123",
  1659. "SET2 abcdefg 0x12345678 0 0",
  1660. "SET2 hijklmnoqr 0x12345678 0 0",
  1661. "SET2 stuvwxyz 0x12345678 0 0",
  1662. "END2",
  1663. NULL,
  1664. };
  1665. char *words[PLUGINSD_MAX_WORDS];
  1666. size_t iterations = 1000000;
  1667. size_t count = 0;
  1668. char input[PLUGINSD_LINE_MAX + 1];
  1669. usec_t started = now_realtime_usec();
  1670. while(--iterations) {
  1671. for(size_t line = 0; lines[line] ;line++) {
  1672. strncpyz(input, lines[line], PLUGINSD_LINE_MAX);
  1673. size_t num_words = quoted_strings_splitter_pluginsd(input, words, PLUGINSD_MAX_WORDS);
  1674. const char *command = get_word(words, num_words, 0);
  1675. PARSER_KEYWORD *keyword = parser_find_keyword(p, command);
  1676. if(unlikely(!keyword))
  1677. fatal("Cannot parse the line '%s'", lines[line]);
  1678. count++;
  1679. }
  1680. }
  1681. usec_t ended = now_realtime_usec();
  1682. netdata_log_info("Parsed %zu lines in %0.2f secs, %0.2f klines/sec", count,
  1683. (double)(ended - started) / (double)USEC_PER_SEC,
  1684. (double)count / ((double)(ended - started) / (double)USEC_PER_SEC) / 1000.0);
  1685. parser_destroy(p);
  1686. return 0;
  1687. }