pluginsd_parser.c 52 KB

1234567891011121314151617181920212223242526272829303132333435363738394041424344454647484950515253545556575859606162636465666768697071727374757677787980818283848586878889909192939495969798991001011021031041051061071081091101111121131141151161171181191201211221231241251261271281291301311321331341351361371381391401411421431441451461471481491501511521531541551561571581591601611621631641651661671681691701711721731741751761771781791801811821831841851861871881891901911921931941951961971981992002012022032042052062072082092102112122132142152162172182192202212222232242252262272282292302312322332342352362372382392402412422432442452462472482492502512522532542552562572582592602612622632642652662672682692702712722732742752762772782792802812822832842852862872882892902912922932942952962972982993003013023033043053063073083093103113123133143153163173183193203213223233243253263273283293303313323333343353363373383393403413423433443453463473483493503513523533543553563573583593603613623633643653663673683693703713723733743753763773783793803813823833843853863873883893903913923933943953963973983994004014024034044054064074084094104114124134144154164174184194204214224234244254264274284294304314324334344354364374384394404414424434444454464474484494504514524534544554564574584594604614624634644654664674684694704714724734744754764774784794804814824834844854864874884894904914924934944954964974984995005015025035045055065075085095105115125135145155165175185195205215225235245255265275285295305315325335345355365375385395405415425435445455465475485495505515525535545555565575585595605615625635645655665675685695705715725735745755765775785795805815825835845855865875885895905915925935945955965975985996006016026036046056066076086096106116126136146156166176186196206216226236246256266276286296306316326336346356366376386396406416426436446456466476486496506516526536546556566576586596606616626636646656666676686696706716726736746756766776786796806816826836846856866876886896906916926936946956966976986997007017027037047057067077087097107117127137147157167177187197207217227237247257267277287297307317327337347357367377387397407417427437447457467477487497507517527537547557567577587597607617627637647657667677687697707717727737747757767777787797807817827837847857867877887897907917927937947957967977987998008018028038048058068078088098108118128138148158168178188198208218228238248258268278288298308318328338348358368378388398408418428438448458468478488498508518528538548558568578588598608618628638648658668678688698708718728738748758768778788798808818828838848858868878888898908918928938948958968978988999009019029039049059069079089099109119129139149159169179189199209219229239249259269279289299309319329339349359369379389399409419429439449459469479489499509519529539549559569579589599609619629639649659669679689699709719729739749759769779789799809819829839849859869879889899909919929939949959969979989991000100110021003100410051006100710081009101010111012101310141015101610171018101910201021102210231024102510261027102810291030103110321033103410351036103710381039104010411042104310441045104610471048104910501051105210531054105510561057105810591060106110621063106410651066106710681069107010711072107310741075107610771078107910801081108210831084108510861087108810891090109110921093109410951096109710981099110011011102110311041105110611071108110911101111111211131114111511161117111811191120112111221123112411251126112711281129113011311132113311341135113611371138113911401141114211431144114511461147114811491150115111521153115411551156115711581159116011611162116311641165116611671168116911701171117211731174117511761177117811791180118111821183118411851186118711881189119011911192119311941195119611971198119912001201120212031204120512061207120812091210121112121213121412151216121712181219122012211222122312241225122612271228122912301231123212331234123512361237123812391240124112421243124412451246124712481249125012511252125312541255125612571258125912601261126212631264126512661267126812691270127112721273127412751276127712781279128012811282128312841285128612871288128912901291129212931294129512961297129812991300130113021303130413051306130713081309131013111312131313141315131613171318131913201321132213231324132513261327132813291330133113321333133413351336133713381339134013411342134313441345134613471348134913501351135213531354135513561357135813591360136113621363136413651366136713681369137013711372137313741375137613771378137913801381138213831384138513861387138813891390139113921393139413951396139713981399140014011402140314041405140614071408140914101411141214131414141514161417141814191420142114221423142414251426142714281429143014311432
  1. // SPDX-License-Identifier: GPL-3.0-or-later
  2. #include "pluginsd_internals.h"
  3. static inline PARSER_RC pluginsd_set(char **words, size_t num_words, PARSER *parser) {
  4. int idx = 1;
  5. ssize_t slot = pluginsd_parse_rrd_slot(words, num_words);
  6. if(slot >= 0) idx++;
  7. char *dimension = get_word(words, num_words, idx++);
  8. char *value = get_word(words, num_words, idx++);
  9. RRDHOST *host = pluginsd_require_scope_host(parser, PLUGINSD_KEYWORD_SET);
  10. if(!host) return PLUGINSD_DISABLE_PLUGIN(parser, NULL, NULL);
  11. RRDSET *st = pluginsd_require_scope_chart(parser, PLUGINSD_KEYWORD_SET, PLUGINSD_KEYWORD_CHART);
  12. if(!st) return PLUGINSD_DISABLE_PLUGIN(parser, NULL, NULL);
  13. RRDDIM *rd = pluginsd_acquire_dimension(host, st, dimension, slot, PLUGINSD_KEYWORD_SET);
  14. if(!rd) return PLUGINSD_DISABLE_PLUGIN(parser, NULL, NULL);
  15. st->pluginsd.set = true;
  16. if (unlikely(rrdset_flag_check(st, RRDSET_FLAG_DEBUG)))
  17. netdata_log_debug(D_PLUGINSD, "PLUGINSD: 'host:%s/chart:%s/dim:%s' SET is setting value to '%s'",
  18. rrdhost_hostname(host), rrdset_id(st), dimension, value && *value ? value : "UNSET");
  19. if (value && *value)
  20. rrddim_set_by_pointer(st, rd, str2ll_encoded(value));
  21. return PARSER_RC_OK;
  22. }
  23. static inline PARSER_RC pluginsd_begin(char **words, size_t num_words, PARSER *parser) {
  24. int idx = 1;
  25. ssize_t slot = pluginsd_parse_rrd_slot(words, num_words);
  26. if(slot >= 0) idx++;
  27. char *id = get_word(words, num_words, idx++);
  28. char *microseconds_txt = get_word(words, num_words, idx++);
  29. RRDHOST *host = pluginsd_require_scope_host(parser, PLUGINSD_KEYWORD_BEGIN);
  30. if(!host) return PLUGINSD_DISABLE_PLUGIN(parser, NULL, NULL);
  31. RRDSET *st = pluginsd_rrdset_cache_get_from_slot(parser, host, id, slot, PLUGINSD_KEYWORD_BEGIN);
  32. if(!st) return PLUGINSD_DISABLE_PLUGIN(parser, NULL, NULL);
  33. if(!pluginsd_set_scope_chart(parser, st, PLUGINSD_KEYWORD_BEGIN))
  34. return PLUGINSD_DISABLE_PLUGIN(parser, NULL, NULL);
  35. usec_t microseconds = 0;
  36. if (microseconds_txt && *microseconds_txt) {
  37. long long t = str2ll(microseconds_txt, NULL);
  38. if(t >= 0)
  39. microseconds = t;
  40. }
  41. #ifdef NETDATA_LOG_REPLICATION_REQUESTS
  42. if(st->replay.log_next_data_collection) {
  43. st->replay.log_next_data_collection = false;
  44. internal_error(true,
  45. "REPLAY: 'host:%s/chart:%s' first BEGIN after replication, last collected %llu, last updated %llu, microseconds %llu",
  46. rrdhost_hostname(host), rrdset_id(st),
  47. st->last_collected_time.tv_sec * USEC_PER_SEC + st->last_collected_time.tv_usec,
  48. st->last_updated.tv_sec * USEC_PER_SEC + st->last_updated.tv_usec,
  49. microseconds
  50. );
  51. }
  52. #endif
  53. if (likely(st->counter_done)) {
  54. if (likely(microseconds)) {
  55. if (parser->user.trust_durations)
  56. rrdset_next_usec_unfiltered(st, microseconds);
  57. else
  58. rrdset_next_usec(st, microseconds);
  59. }
  60. else
  61. rrdset_next(st);
  62. }
  63. return PARSER_RC_OK;
  64. }
  65. static inline PARSER_RC pluginsd_end(char **words, size_t num_words, PARSER *parser) {
  66. char *tv_sec = get_word(words, num_words, 1);
  67. char *tv_usec = get_word(words, num_words, 2);
  68. char *pending_rrdset_next = get_word(words, num_words, 3);
  69. RRDHOST *host = pluginsd_require_scope_host(parser, PLUGINSD_KEYWORD_END);
  70. if(!host) return PLUGINSD_DISABLE_PLUGIN(parser, NULL, NULL);
  71. RRDSET *st = pluginsd_require_scope_chart(parser, PLUGINSD_KEYWORD_END, PLUGINSD_KEYWORD_BEGIN);
  72. if(!st) return PLUGINSD_DISABLE_PLUGIN(parser, NULL, NULL);
  73. if (unlikely(rrdset_flag_check(st, RRDSET_FLAG_DEBUG)))
  74. netdata_log_debug(D_PLUGINSD, "requested an END on chart '%s'", rrdset_id(st));
  75. pluginsd_clear_scope_chart(parser, PLUGINSD_KEYWORD_END);
  76. parser->user.data_collections_count++;
  77. struct timeval tv = {
  78. .tv_sec = (tv_sec && *tv_sec) ? str2ll(tv_sec, NULL) : 0,
  79. .tv_usec = (tv_usec && *tv_usec) ? str2ll(tv_usec, NULL) : 0
  80. };
  81. if(!tv.tv_sec)
  82. now_realtime_timeval(&tv);
  83. rrdset_timed_done(st, tv, pending_rrdset_next && *pending_rrdset_next ? true : false);
  84. return PARSER_RC_OK;
  85. }
  86. static void pluginsd_host_define_cleanup(PARSER *parser) {
  87. string_freez(parser->user.host_define.hostname);
  88. rrdlabels_destroy(parser->user.host_define.rrdlabels);
  89. parser->user.host_define.hostname = NULL;
  90. parser->user.host_define.rrdlabels = NULL;
  91. parser->user.host_define.parsing_host = false;
  92. }
  93. static inline bool pluginsd_validate_machine_guid(const char *guid, uuid_t *uuid, char *output) {
  94. if(uuid_parse(guid, *uuid))
  95. return false;
  96. uuid_unparse_lower(*uuid, output);
  97. return true;
  98. }
  99. static inline PARSER_RC pluginsd_host_define(char **words, size_t num_words, PARSER *parser) {
  100. char *guid = get_word(words, num_words, 1);
  101. char *hostname = get_word(words, num_words, 2);
  102. if(unlikely(!guid || !*guid || !hostname || !*hostname))
  103. return PLUGINSD_DISABLE_PLUGIN(parser, PLUGINSD_KEYWORD_HOST_DEFINE, "missing parameters");
  104. if(unlikely(parser->user.host_define.parsing_host))
  105. return PLUGINSD_DISABLE_PLUGIN(parser, PLUGINSD_KEYWORD_HOST_DEFINE,
  106. "another host definition is already open - did you send " PLUGINSD_KEYWORD_HOST_DEFINE_END "?");
  107. if(!pluginsd_validate_machine_guid(guid, &parser->user.host_define.machine_guid, parser->user.host_define.machine_guid_str))
  108. return PLUGINSD_DISABLE_PLUGIN(parser, PLUGINSD_KEYWORD_HOST_DEFINE, "cannot parse MACHINE_GUID - is it a valid UUID?");
  109. parser->user.host_define.hostname = string_strdupz(hostname);
  110. parser->user.host_define.rrdlabels = rrdlabels_create();
  111. parser->user.host_define.parsing_host = true;
  112. return PARSER_RC_OK;
  113. }
  114. static inline PARSER_RC pluginsd_host_dictionary(char **words, size_t num_words, PARSER *parser, RRDLABELS *labels, const char *keyword) {
  115. char *name = get_word(words, num_words, 1);
  116. char *value = get_word(words, num_words, 2);
  117. if(!name || !*name || !value)
  118. return PLUGINSD_DISABLE_PLUGIN(parser, keyword, "missing parameters");
  119. if(!parser->user.host_define.parsing_host || !labels)
  120. return PLUGINSD_DISABLE_PLUGIN(parser, keyword, "host is not defined, send " PLUGINSD_KEYWORD_HOST_DEFINE " before this");
  121. rrdlabels_add(labels, name, value, RRDLABEL_SRC_CONFIG);
  122. return PARSER_RC_OK;
  123. }
  124. static inline PARSER_RC pluginsd_host_labels(char **words, size_t num_words, PARSER *parser) {
  125. return pluginsd_host_dictionary(words, num_words, parser,
  126. parser->user.host_define.rrdlabels,
  127. PLUGINSD_KEYWORD_HOST_LABEL);
  128. }
  129. static inline PARSER_RC pluginsd_host_define_end(char **words __maybe_unused, size_t num_words __maybe_unused, PARSER *parser) {
  130. if(!parser->user.host_define.parsing_host)
  131. return PLUGINSD_DISABLE_PLUGIN(parser, PLUGINSD_KEYWORD_HOST_DEFINE_END, "missing initialization, send " PLUGINSD_KEYWORD_HOST_DEFINE " before this");
  132. RRDHOST *host = rrdhost_find_or_create(
  133. string2str(parser->user.host_define.hostname),
  134. string2str(parser->user.host_define.hostname),
  135. parser->user.host_define.machine_guid_str,
  136. "Netdata Virtual Host 1.0",
  137. netdata_configured_timezone,
  138. netdata_configured_abbrev_timezone,
  139. netdata_configured_utc_offset,
  140. NULL,
  141. program_name,
  142. program_version,
  143. default_rrd_update_every,
  144. default_rrd_history_entries,
  145. default_rrd_memory_mode,
  146. default_health_enabled,
  147. default_rrdpush_enabled,
  148. default_rrdpush_destination,
  149. default_rrdpush_api_key,
  150. default_rrdpush_send_charts_matching,
  151. default_rrdpush_enable_replication,
  152. default_rrdpush_seconds_to_replicate,
  153. default_rrdpush_replication_step,
  154. rrdhost_labels_to_system_info(parser->user.host_define.rrdlabels),
  155. false);
  156. rrdhost_option_set(host, RRDHOST_OPTION_VIRTUAL_HOST);
  157. if(host->rrdlabels) {
  158. rrdlabels_migrate_to_these(host->rrdlabels, parser->user.host_define.rrdlabels);
  159. }
  160. else {
  161. host->rrdlabels = parser->user.host_define.rrdlabels;
  162. parser->user.host_define.rrdlabels = NULL;
  163. }
  164. pluginsd_host_define_cleanup(parser);
  165. parser->user.host = host;
  166. pluginsd_clear_scope_chart(parser, PLUGINSD_KEYWORD_HOST_DEFINE_END);
  167. rrdhost_flag_clear(host, RRDHOST_FLAG_ORPHAN);
  168. rrdcontext_host_child_connected(host);
  169. schedule_node_info_update(host);
  170. return PARSER_RC_OK;
  171. }
  172. static inline PARSER_RC pluginsd_host(char **words, size_t num_words, PARSER *parser) {
  173. char *guid = get_word(words, num_words, 1);
  174. if(!guid || !*guid || strcmp(guid, "localhost") == 0) {
  175. parser->user.host = localhost;
  176. return PARSER_RC_OK;
  177. }
  178. uuid_t uuid;
  179. char uuid_str[UUID_STR_LEN];
  180. if(!pluginsd_validate_machine_guid(guid, &uuid, uuid_str))
  181. return PLUGINSD_DISABLE_PLUGIN(parser, PLUGINSD_KEYWORD_HOST, "cannot parse MACHINE_GUID - is it a valid UUID?");
  182. RRDHOST *host = rrdhost_find_by_guid(uuid_str);
  183. if(unlikely(!host))
  184. return PLUGINSD_DISABLE_PLUGIN(parser, PLUGINSD_KEYWORD_HOST, "cannot find a host with this machine guid - have you created it?");
  185. parser->user.host = host;
  186. return PARSER_RC_OK;
  187. }
  188. static inline PARSER_RC pluginsd_chart(char **words, size_t num_words, PARSER *parser) {
  189. RRDHOST *host = pluginsd_require_scope_host(parser, PLUGINSD_KEYWORD_CHART);
  190. if(!host) return PLUGINSD_DISABLE_PLUGIN(parser, NULL, NULL);
  191. int idx = 1;
  192. ssize_t slot = pluginsd_parse_rrd_slot(words, num_words);
  193. if(slot >= 0) idx++;
  194. char *type = get_word(words, num_words, idx++);
  195. char *name = get_word(words, num_words, idx++);
  196. char *title = get_word(words, num_words, idx++);
  197. char *units = get_word(words, num_words, idx++);
  198. char *family = get_word(words, num_words, idx++);
  199. char *context = get_word(words, num_words, idx++);
  200. char *chart = get_word(words, num_words, idx++);
  201. char *priority_s = get_word(words, num_words, idx++);
  202. char *update_every_s = get_word(words, num_words, idx++);
  203. char *options = get_word(words, num_words, idx++);
  204. char *plugin = get_word(words, num_words, idx++);
  205. char *module = get_word(words, num_words, idx++);
  206. // parse the id from type
  207. char *id = NULL;
  208. if (likely(type && (id = strchr(type, '.')))) {
  209. *id = '\0';
  210. id++;
  211. }
  212. // make sure we have the required variables
  213. if (unlikely((!type || !*type || !id || !*id)))
  214. return PLUGINSD_DISABLE_PLUGIN(parser, PLUGINSD_KEYWORD_CHART, "missing parameters");
  215. // parse the name, and make sure it does not include 'type.'
  216. if (unlikely(name && *name)) {
  217. // when data are streamed from child nodes
  218. // name will be type.name
  219. // so, we have to remove 'type.' from name too
  220. size_t len = strlen(type);
  221. if (strncmp(type, name, len) == 0 && name[len] == '.')
  222. name = &name[len + 1];
  223. // if the name is the same with the id,
  224. // or is just 'NULL', clear it.
  225. if (unlikely(strcmp(name, id) == 0 || strcasecmp(name, "NULL") == 0 || strcasecmp(name, "(NULL)") == 0))
  226. name = NULL;
  227. }
  228. int priority = 1000;
  229. if (likely(priority_s && *priority_s))
  230. priority = str2i(priority_s);
  231. int update_every = parser->user.cd->update_every;
  232. if (likely(update_every_s && *update_every_s))
  233. update_every = str2i(update_every_s);
  234. if (unlikely(!update_every))
  235. update_every = parser->user.cd->update_every;
  236. RRDSET_TYPE chart_type = RRDSET_TYPE_LINE;
  237. if (unlikely(chart))
  238. chart_type = rrdset_type_id(chart);
  239. if (unlikely(name && !*name))
  240. name = NULL;
  241. if (unlikely(family && !*family))
  242. family = NULL;
  243. if (unlikely(context && !*context))
  244. context = NULL;
  245. if (unlikely(!title))
  246. title = "";
  247. if (unlikely(!units))
  248. units = "unknown";
  249. netdata_log_debug(
  250. D_PLUGINSD,
  251. "creating chart type='%s', id='%s', name='%s', family='%s', context='%s', chart='%s', priority=%d, update_every=%d",
  252. type, id, name ? name : "", family ? family : "", context ? context : "", rrdset_type_name(chart_type),
  253. priority, update_every);
  254. RRDSET *st = NULL;
  255. st = rrdset_create(
  256. host, type, id, name, family, context, title, units,
  257. (plugin && *plugin) ? plugin : parser->user.cd->filename,
  258. module, priority, update_every,
  259. chart_type);
  260. bool obsolete = false;
  261. if (likely(st)) {
  262. if (options && *options) {
  263. if (strstr(options, "obsolete")) {
  264. rrdset_is_obsolete___safe_from_collector_thread(st);
  265. obsolete = true;
  266. }
  267. else
  268. rrdset_isnot_obsolete___safe_from_collector_thread(st);
  269. if (strstr(options, "detail"))
  270. rrdset_flag_set(st, RRDSET_FLAG_DETAIL);
  271. else
  272. rrdset_flag_clear(st, RRDSET_FLAG_DETAIL);
  273. if (strstr(options, "hidden"))
  274. rrdset_flag_set(st, RRDSET_FLAG_HIDDEN);
  275. else
  276. rrdset_flag_clear(st, RRDSET_FLAG_HIDDEN);
  277. if (strstr(options, "store_first"))
  278. rrdset_flag_set(st, RRDSET_FLAG_STORE_FIRST);
  279. else
  280. rrdset_flag_clear(st, RRDSET_FLAG_STORE_FIRST);
  281. }
  282. else {
  283. rrdset_isnot_obsolete___safe_from_collector_thread(st);
  284. rrdset_flag_clear(st, RRDSET_FLAG_DETAIL);
  285. rrdset_flag_clear(st, RRDSET_FLAG_STORE_FIRST);
  286. }
  287. if(!pluginsd_set_scope_chart(parser, st, PLUGINSD_KEYWORD_CHART))
  288. return PLUGINSD_DISABLE_PLUGIN(parser, NULL, NULL);
  289. pluginsd_rrdset_cache_put_to_slot(parser, st, slot, obsolete);
  290. }
  291. else
  292. pluginsd_clear_scope_chart(parser, PLUGINSD_KEYWORD_CHART);
  293. return PARSER_RC_OK;
  294. }
  295. static inline PARSER_RC pluginsd_chart_definition_end(char **words, size_t num_words, PARSER *parser) {
  296. const char *first_entry_txt = get_word(words, num_words, 1);
  297. const char *last_entry_txt = get_word(words, num_words, 2);
  298. const char *wall_clock_time_txt = get_word(words, num_words, 3);
  299. RRDHOST *host = pluginsd_require_scope_host(parser, PLUGINSD_KEYWORD_CHART_DEFINITION_END);
  300. if(!host) return PLUGINSD_DISABLE_PLUGIN(parser, NULL, NULL);
  301. RRDSET *st = pluginsd_require_scope_chart(parser, PLUGINSD_KEYWORD_CHART_DEFINITION_END, PLUGINSD_KEYWORD_CHART);
  302. if(!st) return PLUGINSD_DISABLE_PLUGIN(parser, NULL, NULL);
  303. time_t first_entry_child = (first_entry_txt && *first_entry_txt) ? (time_t)str2ul(first_entry_txt) : 0;
  304. time_t last_entry_child = (last_entry_txt && *last_entry_txt) ? (time_t)str2ul(last_entry_txt) : 0;
  305. time_t child_wall_clock_time = (wall_clock_time_txt && *wall_clock_time_txt) ? (time_t)str2ul(wall_clock_time_txt) : now_realtime_sec();
  306. bool ok = true;
  307. if(!rrdset_flag_check(st, RRDSET_FLAG_RECEIVER_REPLICATION_IN_PROGRESS)) {
  308. #ifdef NETDATA_LOG_REPLICATION_REQUESTS
  309. st->replay.start_streaming = false;
  310. st->replay.after = 0;
  311. st->replay.before = 0;
  312. #endif
  313. rrdset_flag_set(st, RRDSET_FLAG_RECEIVER_REPLICATION_IN_PROGRESS);
  314. rrdset_flag_clear(st, RRDSET_FLAG_RECEIVER_REPLICATION_FINISHED);
  315. rrdhost_receiver_replicating_charts_plus_one(st->rrdhost);
  316. ok = replicate_chart_request(send_to_plugin, parser, host, st,
  317. first_entry_child, last_entry_child, child_wall_clock_time,
  318. 0, 0);
  319. }
  320. #ifdef NETDATA_LOG_REPLICATION_REQUESTS
  321. else {
  322. internal_error(true, "REPLAY: 'host:%s/chart:%s' not sending duplicate replication request",
  323. rrdhost_hostname(st->rrdhost), rrdset_id(st));
  324. }
  325. #endif
  326. return ok ? PARSER_RC_OK : PARSER_RC_ERROR;
  327. }
  328. static inline PARSER_RC pluginsd_dimension(char **words, size_t num_words, PARSER *parser) {
  329. int idx = 1;
  330. ssize_t slot = pluginsd_parse_rrd_slot(words, num_words);
  331. if(slot >= 0) idx++;
  332. char *id = get_word(words, num_words, idx++);
  333. char *name = get_word(words, num_words, idx++);
  334. char *algorithm = get_word(words, num_words, idx++);
  335. char *multiplier_s = get_word(words, num_words, idx++);
  336. char *divisor_s = get_word(words, num_words, idx++);
  337. char *options = get_word(words, num_words, idx++);
  338. RRDHOST *host = pluginsd_require_scope_host(parser, PLUGINSD_KEYWORD_DIMENSION);
  339. if(!host) return PLUGINSD_DISABLE_PLUGIN(parser, NULL, NULL);
  340. RRDSET *st = pluginsd_require_scope_chart(parser, PLUGINSD_KEYWORD_DIMENSION, PLUGINSD_KEYWORD_CHART);
  341. if(!st) return PLUGINSD_DISABLE_PLUGIN(parser, NULL, NULL);
  342. if (unlikely(!id))
  343. return PLUGINSD_DISABLE_PLUGIN(parser, PLUGINSD_KEYWORD_DIMENSION, "missing dimension id");
  344. long multiplier = 1;
  345. if (multiplier_s && *multiplier_s) {
  346. multiplier = str2ll_encoded(multiplier_s);
  347. if (unlikely(!multiplier))
  348. multiplier = 1;
  349. }
  350. long divisor = 1;
  351. if (likely(divisor_s && *divisor_s)) {
  352. divisor = str2ll_encoded(divisor_s);
  353. if (unlikely(!divisor))
  354. divisor = 1;
  355. }
  356. if (unlikely(!algorithm || !*algorithm))
  357. algorithm = "absolute";
  358. if (unlikely(st && rrdset_flag_check(st, RRDSET_FLAG_DEBUG)))
  359. netdata_log_debug(
  360. D_PLUGINSD,
  361. "creating dimension in chart %s, id='%s', name='%s', algorithm='%s', multiplier=%ld, divisor=%ld, hidden='%s'",
  362. rrdset_id(st), id, name ? name : "", rrd_algorithm_name(rrd_algorithm_id(algorithm)), multiplier, divisor,
  363. options ? options : "");
  364. RRDDIM *rd = rrddim_add(st, id, name, multiplier, divisor, rrd_algorithm_id(algorithm));
  365. int unhide_dimension = 1;
  366. rrddim_option_clear(rd, RRDDIM_OPTION_DONT_DETECT_RESETS_OR_OVERFLOWS);
  367. bool obsolete = false;
  368. if (options && *options) {
  369. if (strstr(options, "obsolete") != NULL) {
  370. obsolete = true;
  371. rrddim_is_obsolete___safe_from_collector_thread(st, rd);
  372. }
  373. else
  374. rrddim_isnot_obsolete___safe_from_collector_thread(st, rd);
  375. unhide_dimension = !strstr(options, "hidden");
  376. if (strstr(options, "noreset") != NULL)
  377. rrddim_option_set(rd, RRDDIM_OPTION_DONT_DETECT_RESETS_OR_OVERFLOWS);
  378. if (strstr(options, "nooverflow") != NULL)
  379. rrddim_option_set(rd, RRDDIM_OPTION_DONT_DETECT_RESETS_OR_OVERFLOWS);
  380. }
  381. else
  382. rrddim_isnot_obsolete___safe_from_collector_thread(st, rd);
  383. bool should_update_dimension = false;
  384. if (likely(unhide_dimension)) {
  385. rrddim_option_clear(rd, RRDDIM_OPTION_HIDDEN);
  386. should_update_dimension = rrddim_flag_check(rd, RRDDIM_FLAG_META_HIDDEN);
  387. }
  388. else {
  389. rrddim_option_set(rd, RRDDIM_OPTION_HIDDEN);
  390. should_update_dimension = !rrddim_flag_check(rd, RRDDIM_FLAG_META_HIDDEN);
  391. }
  392. if (should_update_dimension) {
  393. rrddim_flag_set(rd, RRDDIM_FLAG_METADATA_UPDATE);
  394. rrdhost_flag_set(rd->rrdset->rrdhost, RRDHOST_FLAG_METADATA_UPDATE);
  395. }
  396. pluginsd_rrddim_put_to_slot(parser, st, rd, slot, obsolete);
  397. return PARSER_RC_OK;
  398. }
  399. // ----------------------------------------------------------------------------
  400. static inline PARSER_RC pluginsd_variable(char **words, size_t num_words, PARSER *parser) {
  401. char *name = get_word(words, num_words, 1);
  402. char *value = get_word(words, num_words, 2);
  403. NETDATA_DOUBLE v;
  404. RRDHOST *host = pluginsd_require_scope_host(parser, PLUGINSD_KEYWORD_VARIABLE);
  405. if(!host) return PLUGINSD_DISABLE_PLUGIN(parser, NULL, NULL);
  406. RRDSET *st = pluginsd_get_scope_chart(parser);
  407. int global = (st) ? 0 : 1;
  408. if (name && *name) {
  409. if ((strcmp(name, "GLOBAL") == 0 || strcmp(name, "HOST") == 0)) {
  410. global = 1;
  411. name = get_word(words, num_words, 2);
  412. value = get_word(words, num_words, 3);
  413. } else if ((strcmp(name, "LOCAL") == 0 || strcmp(name, "CHART") == 0)) {
  414. global = 0;
  415. name = get_word(words, num_words, 2);
  416. value = get_word(words, num_words, 3);
  417. }
  418. }
  419. if (unlikely(!name || !*name))
  420. return PLUGINSD_DISABLE_PLUGIN(parser, PLUGINSD_KEYWORD_VARIABLE, "missing variable name");
  421. if (unlikely(!value || !*value))
  422. value = NULL;
  423. if (unlikely(!value)) {
  424. netdata_log_error("PLUGINSD: 'host:%s/chart:%s' cannot set %s VARIABLE '%s' to an empty value",
  425. rrdhost_hostname(host),
  426. st ? rrdset_id(st):"UNSET",
  427. (global) ? "HOST" : "CHART",
  428. name);
  429. return PARSER_RC_OK;
  430. }
  431. if (!global && !st)
  432. return PLUGINSD_DISABLE_PLUGIN(parser, PLUGINSD_KEYWORD_VARIABLE, "no chart is defined and no GLOBAL is given");
  433. char *endptr = NULL;
  434. v = (NETDATA_DOUBLE) str2ndd_encoded(value, &endptr);
  435. if (unlikely(endptr && *endptr)) {
  436. if (endptr == value)
  437. netdata_log_error("PLUGINSD: 'host:%s/chart:%s' the value '%s' of VARIABLE '%s' cannot be parsed as a number",
  438. rrdhost_hostname(host),
  439. st ? rrdset_id(st):"UNSET",
  440. value,
  441. name);
  442. else
  443. netdata_log_error("PLUGINSD: 'host:%s/chart:%s' the value '%s' of VARIABLE '%s' has leftovers: '%s'",
  444. rrdhost_hostname(host),
  445. st ? rrdset_id(st):"UNSET",
  446. value,
  447. name,
  448. endptr);
  449. }
  450. if (global) {
  451. const RRDVAR_ACQUIRED *rva = rrdvar_custom_host_variable_add_and_acquire(host, name);
  452. if (rva) {
  453. rrdvar_custom_host_variable_set(host, rva, v);
  454. rrdvar_custom_host_variable_release(host, rva);
  455. }
  456. else
  457. netdata_log_error("PLUGINSD: 'host:%s' cannot find/create HOST VARIABLE '%s'",
  458. rrdhost_hostname(host),
  459. name);
  460. } else {
  461. const RRDSETVAR_ACQUIRED *rsa = rrdsetvar_custom_chart_variable_add_and_acquire(st, name);
  462. if (rsa) {
  463. rrdsetvar_custom_chart_variable_set(st, rsa, v);
  464. rrdsetvar_custom_chart_variable_release(st, rsa);
  465. }
  466. else
  467. netdata_log_error("PLUGINSD: 'host:%s/chart:%s' cannot find/create CHART VARIABLE '%s'",
  468. rrdhost_hostname(host), rrdset_id(st), name);
  469. }
  470. return PARSER_RC_OK;
  471. }
  472. static inline PARSER_RC pluginsd_flush(char **words __maybe_unused, size_t num_words __maybe_unused, PARSER *parser) {
  473. netdata_log_debug(D_PLUGINSD, "requested a " PLUGINSD_KEYWORD_FLUSH);
  474. pluginsd_clear_scope_chart(parser, PLUGINSD_KEYWORD_FLUSH);
  475. parser->user.replay.start_time = 0;
  476. parser->user.replay.end_time = 0;
  477. parser->user.replay.start_time_ut = 0;
  478. parser->user.replay.end_time_ut = 0;
  479. return PARSER_RC_OK;
  480. }
  481. static inline PARSER_RC pluginsd_disable(char **words __maybe_unused, size_t num_words __maybe_unused, PARSER *parser) {
  482. netdata_log_info("PLUGINSD: plugin called DISABLE. Disabling it.");
  483. parser->user.enabled = 0;
  484. return PARSER_RC_STOP;
  485. }
  486. static inline PARSER_RC pluginsd_label(char **words, size_t num_words, PARSER *parser) {
  487. const char *name = get_word(words, num_words, 1);
  488. const char *label_source = get_word(words, num_words, 2);
  489. const char *value = get_word(words, num_words, 3);
  490. if (!name || !label_source || !value)
  491. return PLUGINSD_DISABLE_PLUGIN(parser, PLUGINSD_KEYWORD_LABEL, "missing parameters");
  492. char *store = (char *)value;
  493. bool allocated_store = false;
  494. if(unlikely(num_words > 4)) {
  495. allocated_store = true;
  496. store = mallocz(PLUGINSD_LINE_MAX + 1);
  497. size_t remaining = PLUGINSD_LINE_MAX;
  498. char *move = store;
  499. char *word;
  500. for(size_t i = 3; i < num_words && remaining > 2 && (word = get_word(words, num_words, i)) ;i++) {
  501. if(i > 3) {
  502. *move++ = ' ';
  503. *move = '\0';
  504. remaining--;
  505. }
  506. size_t length = strlen(word);
  507. if (length > remaining)
  508. length = remaining;
  509. remaining -= length;
  510. memcpy(move, word, length);
  511. move += length;
  512. *move = '\0';
  513. }
  514. }
  515. if(unlikely(!(parser->user.new_host_labels)))
  516. parser->user.new_host_labels = rrdlabels_create();
  517. if (strcmp(name,HOST_LABEL_IS_EPHEMERAL) == 0) {
  518. int is_ephemeral = appconfig_test_boolean_value((char *) value);
  519. if (is_ephemeral) {
  520. RRDHOST *host = pluginsd_require_scope_host(parser, PLUGINSD_KEYWORD_LABEL);
  521. if (likely(host))
  522. rrdhost_option_set(host, RRDHOST_OPTION_EPHEMERAL_HOST);
  523. }
  524. }
  525. rrdlabels_add(parser->user.new_host_labels, name, store, str2l(label_source));
  526. if (allocated_store)
  527. freez(store);
  528. return PARSER_RC_OK;
  529. }
  530. static inline PARSER_RC pluginsd_overwrite(char **words __maybe_unused, size_t num_words __maybe_unused, PARSER *parser) {
  531. RRDHOST *host = pluginsd_require_scope_host(parser, PLUGINSD_KEYWORD_OVERWRITE);
  532. if(!host) return PLUGINSD_DISABLE_PLUGIN(parser, NULL, NULL);
  533. netdata_log_debug(D_PLUGINSD, "requested to OVERWRITE host labels");
  534. if(unlikely(!host->rrdlabels))
  535. host->rrdlabels = rrdlabels_create();
  536. rrdlabels_migrate_to_these(host->rrdlabels, parser->user.new_host_labels);
  537. if (rrdhost_option_check(host, RRDHOST_OPTION_EPHEMERAL_HOST))
  538. rrdlabels_add(host->rrdlabels, HOST_LABEL_IS_EPHEMERAL, "true", RRDLABEL_SRC_CONFIG);
  539. rrdhost_flag_set(host, RRDHOST_FLAG_METADATA_LABELS | RRDHOST_FLAG_METADATA_UPDATE);
  540. rrdlabels_destroy(parser->user.new_host_labels);
  541. parser->user.new_host_labels = NULL;
  542. return PARSER_RC_OK;
  543. }
  544. static inline PARSER_RC pluginsd_clabel(char **words, size_t num_words, PARSER *parser) {
  545. const char *name = get_word(words, num_words, 1);
  546. const char *value = get_word(words, num_words, 2);
  547. const char *label_source = get_word(words, num_words, 3);
  548. if (!name || !value || !label_source) {
  549. netdata_log_error("Ignoring malformed or empty CHART LABEL command.");
  550. return PLUGINSD_DISABLE_PLUGIN(parser, NULL, NULL);
  551. }
  552. if(unlikely(!parser->user.chart_rrdlabels_linked_temporarily)) {
  553. RRDSET *st = pluginsd_get_scope_chart(parser);
  554. parser->user.chart_rrdlabels_linked_temporarily = st->rrdlabels;
  555. rrdlabels_unmark_all(parser->user.chart_rrdlabels_linked_temporarily);
  556. }
  557. rrdlabels_add(parser->user.chart_rrdlabels_linked_temporarily, name, value, str2l(label_source));
  558. return PARSER_RC_OK;
  559. }
  560. static inline PARSER_RC pluginsd_clabel_commit(char **words __maybe_unused, size_t num_words __maybe_unused, PARSER *parser) {
  561. RRDHOST *host = pluginsd_require_scope_host(parser, PLUGINSD_KEYWORD_CLABEL_COMMIT);
  562. if(!host) return PLUGINSD_DISABLE_PLUGIN(parser, NULL, NULL);
  563. RRDSET *st = pluginsd_require_scope_chart(parser, PLUGINSD_KEYWORD_CLABEL_COMMIT, PLUGINSD_KEYWORD_BEGIN);
  564. if(!st) return PLUGINSD_DISABLE_PLUGIN(parser, NULL, NULL);
  565. netdata_log_debug(D_PLUGINSD, "requested to commit chart labels");
  566. if(!parser->user.chart_rrdlabels_linked_temporarily) {
  567. netdata_log_error("PLUGINSD: 'host:%s' got CLABEL_COMMIT, without a CHART or BEGIN. Ignoring it.", rrdhost_hostname(host));
  568. return PLUGINSD_DISABLE_PLUGIN(parser, NULL, NULL);
  569. }
  570. rrdlabels_remove_all_unmarked(parser->user.chart_rrdlabels_linked_temporarily);
  571. rrdset_flag_set(st, RRDSET_FLAG_METADATA_UPDATE);
  572. rrdhost_flag_set(st->rrdhost, RRDHOST_FLAG_METADATA_UPDATE);
  573. rrdset_metadata_updated(st);
  574. parser->user.chart_rrdlabels_linked_temporarily = NULL;
  575. return PARSER_RC_OK;
  576. }
  577. static inline PARSER_RC pluginsd_begin_v2(char **words, size_t num_words, PARSER *parser) {
  578. timing_init();
  579. int idx = 1;
  580. ssize_t slot = pluginsd_parse_rrd_slot(words, num_words);
  581. if(slot >= 0) idx++;
  582. char *id = get_word(words, num_words, idx++);
  583. char *update_every_str = get_word(words, num_words, idx++);
  584. char *end_time_str = get_word(words, num_words, idx++);
  585. char *wall_clock_time_str = get_word(words, num_words, idx++);
  586. if(unlikely(!id || !update_every_str || !end_time_str || !wall_clock_time_str))
  587. return PLUGINSD_DISABLE_PLUGIN(parser, PLUGINSD_KEYWORD_BEGIN_V2, "missing parameters");
  588. RRDHOST *host = pluginsd_require_scope_host(parser, PLUGINSD_KEYWORD_BEGIN_V2);
  589. if(unlikely(!host)) return PLUGINSD_DISABLE_PLUGIN(parser, NULL, NULL);
  590. timing_step(TIMING_STEP_BEGIN2_PREPARE);
  591. RRDSET *st = pluginsd_rrdset_cache_get_from_slot(parser, host, id, slot, PLUGINSD_KEYWORD_BEGIN_V2);
  592. if(unlikely(!st)) return PLUGINSD_DISABLE_PLUGIN(parser, NULL, NULL);
  593. if(!pluginsd_set_scope_chart(parser, st, PLUGINSD_KEYWORD_BEGIN_V2))
  594. return PLUGINSD_DISABLE_PLUGIN(parser, NULL, NULL);
  595. if(unlikely(rrdset_flag_check(st, RRDSET_FLAG_OBSOLETE)))
  596. rrdset_isnot_obsolete___safe_from_collector_thread(st);
  597. timing_step(TIMING_STEP_BEGIN2_FIND_CHART);
  598. // ------------------------------------------------------------------------
  599. // parse the parameters
  600. time_t update_every = (time_t) str2ull_encoded(update_every_str);
  601. time_t end_time = (time_t) str2ull_encoded(end_time_str);
  602. time_t wall_clock_time;
  603. if(likely(*wall_clock_time_str == '#'))
  604. wall_clock_time = end_time;
  605. else
  606. wall_clock_time = (time_t) str2ull_encoded(wall_clock_time_str);
  607. if (unlikely(update_every != st->update_every))
  608. rrdset_set_update_every_s(st, update_every);
  609. timing_step(TIMING_STEP_BEGIN2_PARSE);
  610. // ------------------------------------------------------------------------
  611. // prepare our state
  612. pluginsd_lock_rrdset_data_collection(parser);
  613. parser->user.v2.update_every = update_every;
  614. parser->user.v2.end_time = end_time;
  615. parser->user.v2.wall_clock_time = wall_clock_time;
  616. parser->user.v2.ml_locked = ml_chart_update_begin(st);
  617. timing_step(TIMING_STEP_BEGIN2_ML);
  618. // ------------------------------------------------------------------------
  619. // propagate it forward in v2
  620. if(!parser->user.v2.stream_buffer.wb && rrdhost_has_rrdpush_sender_enabled(st->rrdhost))
  621. parser->user.v2.stream_buffer = rrdset_push_metric_initialize(parser->user.st, wall_clock_time);
  622. if(parser->user.v2.stream_buffer.v2 && parser->user.v2.stream_buffer.wb) {
  623. // check receiver capabilities
  624. bool can_copy = stream_has_capability(&parser->user, STREAM_CAP_IEEE754) == stream_has_capability(&parser->user.v2.stream_buffer, STREAM_CAP_IEEE754);
  625. // check sender capabilities
  626. bool with_slots = stream_has_capability(&parser->user.v2.stream_buffer, STREAM_CAP_SLOTS) ? true : false;
  627. NUMBER_ENCODING integer_encoding = stream_has_capability(&parser->user.v2.stream_buffer, STREAM_CAP_IEEE754) ? NUMBER_ENCODING_BASE64 : NUMBER_ENCODING_HEX;
  628. BUFFER *wb = parser->user.v2.stream_buffer.wb;
  629. buffer_need_bytes(wb, 1024);
  630. if(unlikely(parser->user.v2.stream_buffer.begin_v2_added))
  631. buffer_fast_strcat(wb, PLUGINSD_KEYWORD_END_V2 "\n", sizeof(PLUGINSD_KEYWORD_END_V2) - 1 + 1);
  632. buffer_fast_strcat(wb, PLUGINSD_KEYWORD_BEGIN_V2, sizeof(PLUGINSD_KEYWORD_BEGIN_V2) - 1);
  633. if(with_slots) {
  634. buffer_fast_strcat(wb, " "PLUGINSD_KEYWORD_SLOT":", sizeof(PLUGINSD_KEYWORD_SLOT) - 1 + 2);
  635. buffer_print_uint64_encoded(wb, integer_encoding, st->rrdpush.sender.chart_slot);
  636. }
  637. buffer_fast_strcat(wb, " '", 2);
  638. buffer_fast_strcat(wb, rrdset_id(st), string_strlen(st->id));
  639. buffer_fast_strcat(wb, "' ", 2);
  640. if(can_copy)
  641. buffer_strcat(wb, update_every_str);
  642. else
  643. buffer_print_uint64_encoded(wb, integer_encoding, update_every);
  644. buffer_fast_strcat(wb, " ", 1);
  645. if(can_copy)
  646. buffer_strcat(wb, end_time_str);
  647. else
  648. buffer_print_uint64_encoded(wb, integer_encoding, end_time);
  649. buffer_fast_strcat(wb, " ", 1);
  650. if(can_copy)
  651. buffer_strcat(wb, wall_clock_time_str);
  652. else
  653. buffer_print_uint64_encoded(wb, integer_encoding, wall_clock_time);
  654. buffer_fast_strcat(wb, "\n", 1);
  655. parser->user.v2.stream_buffer.last_point_end_time_s = end_time;
  656. parser->user.v2.stream_buffer.begin_v2_added = true;
  657. }
  658. timing_step(TIMING_STEP_BEGIN2_PROPAGATE);
  659. // ------------------------------------------------------------------------
  660. // store it
  661. st->last_collected_time.tv_sec = end_time;
  662. st->last_collected_time.tv_usec = 0;
  663. st->last_updated.tv_sec = end_time;
  664. st->last_updated.tv_usec = 0;
  665. st->counter++;
  666. st->counter_done++;
  667. // these are only needed for db mode RAM, SAVE, MAP, ALLOC
  668. st->db.current_entry++;
  669. if(st->db.current_entry >= st->db.entries)
  670. st->db.current_entry -= st->db.entries;
  671. timing_step(TIMING_STEP_BEGIN2_STORE);
  672. return PARSER_RC_OK;
  673. }
  674. static inline PARSER_RC pluginsd_set_v2(char **words, size_t num_words, PARSER *parser) {
  675. timing_init();
  676. int idx = 1;
  677. ssize_t slot = pluginsd_parse_rrd_slot(words, num_words);
  678. if(slot >= 0) idx++;
  679. char *dimension = get_word(words, num_words, idx++);
  680. char *collected_str = get_word(words, num_words, idx++);
  681. char *value_str = get_word(words, num_words, idx++);
  682. char *flags_str = get_word(words, num_words, idx++);
  683. if(unlikely(!dimension || !collected_str || !value_str || !flags_str))
  684. return PLUGINSD_DISABLE_PLUGIN(parser, PLUGINSD_KEYWORD_SET_V2, "missing parameters");
  685. RRDHOST *host = pluginsd_require_scope_host(parser, PLUGINSD_KEYWORD_SET_V2);
  686. if(unlikely(!host)) return PLUGINSD_DISABLE_PLUGIN(parser, NULL, NULL);
  687. RRDSET *st = pluginsd_require_scope_chart(parser, PLUGINSD_KEYWORD_SET_V2, PLUGINSD_KEYWORD_BEGIN_V2);
  688. if(unlikely(!st)) return PLUGINSD_DISABLE_PLUGIN(parser, NULL, NULL);
  689. timing_step(TIMING_STEP_SET2_PREPARE);
  690. RRDDIM *rd = pluginsd_acquire_dimension(host, st, dimension, slot, PLUGINSD_KEYWORD_SET_V2);
  691. if(unlikely(!rd)) return PLUGINSD_DISABLE_PLUGIN(parser, NULL, NULL);
  692. st->pluginsd.set = true;
  693. if(unlikely(rrddim_flag_check(rd, RRDDIM_FLAG_OBSOLETE | RRDDIM_FLAG_ARCHIVED)))
  694. rrddim_isnot_obsolete___safe_from_collector_thread(st, rd);
  695. timing_step(TIMING_STEP_SET2_LOOKUP_DIMENSION);
  696. // ------------------------------------------------------------------------
  697. // parse the parameters
  698. collected_number collected_value = (collected_number) str2ll_encoded(collected_str);
  699. NETDATA_DOUBLE value;
  700. if(*value_str == '#')
  701. value = (NETDATA_DOUBLE)collected_value;
  702. else
  703. value = str2ndd_encoded(value_str, NULL);
  704. SN_FLAGS flags = pluginsd_parse_storage_number_flags(flags_str);
  705. timing_step(TIMING_STEP_SET2_PARSE);
  706. // ------------------------------------------------------------------------
  707. // check value and ML
  708. if (unlikely(!netdata_double_isnumber(value) || (flags == SN_EMPTY_SLOT))) {
  709. value = NAN;
  710. flags = SN_EMPTY_SLOT;
  711. if(parser->user.v2.ml_locked)
  712. ml_dimension_is_anomalous(rd, parser->user.v2.end_time, 0, false);
  713. }
  714. else if(parser->user.v2.ml_locked) {
  715. if (ml_dimension_is_anomalous(rd, parser->user.v2.end_time, value, true)) {
  716. // clear anomaly bit: 0 -> is anomalous, 1 -> not anomalous
  717. flags &= ~((storage_number) SN_FLAG_NOT_ANOMALOUS);
  718. }
  719. else
  720. flags |= SN_FLAG_NOT_ANOMALOUS;
  721. }
  722. timing_step(TIMING_STEP_SET2_ML);
  723. // ------------------------------------------------------------------------
  724. // propagate it forward in v2
  725. if(parser->user.v2.stream_buffer.v2 && parser->user.v2.stream_buffer.begin_v2_added && parser->user.v2.stream_buffer.wb) {
  726. // check if receiver and sender have the same number parsing capabilities
  727. bool can_copy = stream_has_capability(&parser->user, STREAM_CAP_IEEE754) == stream_has_capability(&parser->user.v2.stream_buffer, STREAM_CAP_IEEE754);
  728. // check the sender capabilities
  729. bool with_slots = stream_has_capability(&parser->user.v2.stream_buffer, STREAM_CAP_SLOTS) ? true : false;
  730. NUMBER_ENCODING integer_encoding = stream_has_capability(&parser->user.v2.stream_buffer, STREAM_CAP_IEEE754) ? NUMBER_ENCODING_BASE64 : NUMBER_ENCODING_HEX;
  731. NUMBER_ENCODING doubles_encoding = stream_has_capability(&parser->user.v2.stream_buffer, STREAM_CAP_IEEE754) ? NUMBER_ENCODING_BASE64 : NUMBER_ENCODING_DECIMAL;
  732. BUFFER *wb = parser->user.v2.stream_buffer.wb;
  733. buffer_need_bytes(wb, 1024);
  734. buffer_fast_strcat(wb, PLUGINSD_KEYWORD_SET_V2, sizeof(PLUGINSD_KEYWORD_SET_V2) - 1);
  735. if(with_slots) {
  736. buffer_fast_strcat(wb, " "PLUGINSD_KEYWORD_SLOT":", sizeof(PLUGINSD_KEYWORD_SLOT) - 1 + 2);
  737. buffer_print_uint64_encoded(wb, integer_encoding, rd->rrdpush.sender.dim_slot);
  738. }
  739. buffer_fast_strcat(wb, " '", 2);
  740. buffer_fast_strcat(wb, rrddim_id(rd), string_strlen(rd->id));
  741. buffer_fast_strcat(wb, "' ", 2);
  742. if(can_copy)
  743. buffer_strcat(wb, collected_str);
  744. else
  745. buffer_print_int64_encoded(wb, integer_encoding, collected_value); // original v2 had hex
  746. buffer_fast_strcat(wb, " ", 1);
  747. if(can_copy)
  748. buffer_strcat(wb, value_str);
  749. else
  750. buffer_print_netdata_double_encoded(wb, doubles_encoding, value); // original v2 had decimal
  751. buffer_fast_strcat(wb, " ", 1);
  752. buffer_print_sn_flags(wb, flags, true);
  753. buffer_fast_strcat(wb, "\n", 1);
  754. }
  755. timing_step(TIMING_STEP_SET2_PROPAGATE);
  756. // ------------------------------------------------------------------------
  757. // store it
  758. rrddim_store_metric(rd, parser->user.v2.end_time * USEC_PER_SEC, value, flags);
  759. rd->collector.last_collected_time.tv_sec = parser->user.v2.end_time;
  760. rd->collector.last_collected_time.tv_usec = 0;
  761. rd->collector.last_collected_value = collected_value;
  762. rd->collector.last_stored_value = value;
  763. rd->collector.last_calculated_value = value;
  764. rd->collector.counter++;
  765. rrddim_set_updated(rd);
  766. timing_step(TIMING_STEP_SET2_STORE);
  767. return PARSER_RC_OK;
  768. }
  769. static inline PARSER_RC pluginsd_end_v2(char **words __maybe_unused, size_t num_words __maybe_unused, PARSER *parser) {
  770. timing_init();
  771. RRDHOST *host = pluginsd_require_scope_host(parser, PLUGINSD_KEYWORD_END_V2);
  772. if(unlikely(!host)) return PLUGINSD_DISABLE_PLUGIN(parser, NULL, NULL);
  773. RRDSET *st = pluginsd_require_scope_chart(parser, PLUGINSD_KEYWORD_END_V2, PLUGINSD_KEYWORD_BEGIN_V2);
  774. if(unlikely(!st)) return PLUGINSD_DISABLE_PLUGIN(parser, NULL, NULL);
  775. parser->user.data_collections_count++;
  776. timing_step(TIMING_STEP_END2_PREPARE);
  777. // ------------------------------------------------------------------------
  778. // propagate the whole chart update in v1
  779. if(unlikely(!parser->user.v2.stream_buffer.v2 && !parser->user.v2.stream_buffer.begin_v2_added && parser->user.v2.stream_buffer.wb))
  780. rrdset_push_metrics_v1(&parser->user.v2.stream_buffer, st);
  781. timing_step(TIMING_STEP_END2_PUSH_V1);
  782. // ------------------------------------------------------------------------
  783. // unblock data collection
  784. pluginsd_unlock_previous_scope_chart(parser, PLUGINSD_KEYWORD_END_V2, false);
  785. rrdcontext_collected_rrdset(st);
  786. store_metric_collection_completed();
  787. timing_step(TIMING_STEP_END2_RRDSET);
  788. // ------------------------------------------------------------------------
  789. // propagate it forward
  790. rrdset_push_metrics_finished(&parser->user.v2.stream_buffer, st);
  791. timing_step(TIMING_STEP_END2_PROPAGATE);
  792. // ------------------------------------------------------------------------
  793. // cleanup RRDSET / RRDDIM
  794. if(likely(st->pluginsd.dims_with_slots)) {
  795. for(size_t i = 0; i < st->pluginsd.size ;i++) {
  796. RRDDIM *rd = st->pluginsd.prd_array[i].rd;
  797. if(!rd)
  798. continue;
  799. rd->collector.calculated_value = 0;
  800. rd->collector.collected_value = 0;
  801. rrddim_clear_updated(rd);
  802. }
  803. }
  804. else {
  805. RRDDIM *rd;
  806. rrddim_foreach_read(rd, st){
  807. rd->collector.calculated_value = 0;
  808. rd->collector.collected_value = 0;
  809. rrddim_clear_updated(rd);
  810. }
  811. rrddim_foreach_done(rd);
  812. }
  813. // ------------------------------------------------------------------------
  814. // reset state
  815. parser->user.v2 = (struct parser_user_object_v2){ 0 };
  816. timing_step(TIMING_STEP_END2_STORE);
  817. timing_report();
  818. return PARSER_RC_OK;
  819. }
  820. static inline PARSER_RC pluginsd_exit(char **words __maybe_unused, size_t num_words __maybe_unused, PARSER *parser __maybe_unused) {
  821. netdata_log_info("PLUGINSD: plugin called EXIT.");
  822. return PARSER_RC_STOP;
  823. }
  824. static inline PARSER_RC streaming_claimed_id(char **words, size_t num_words, PARSER *parser)
  825. {
  826. const char *host_uuid_str = get_word(words, num_words, 1);
  827. const char *claim_id_str = get_word(words, num_words, 2);
  828. if (!host_uuid_str || !claim_id_str) {
  829. netdata_log_error("Command CLAIMED_ID came malformed, uuid = '%s', claim_id = '%s'",
  830. host_uuid_str ? host_uuid_str : "[unset]",
  831. claim_id_str ? claim_id_str : "[unset]");
  832. return PARSER_RC_ERROR;
  833. }
  834. uuid_t uuid;
  835. RRDHOST *host = parser->user.host;
  836. // We don't need the parsed UUID
  837. // just do it to check the format
  838. if(uuid_parse(host_uuid_str, uuid)) {
  839. netdata_log_error("1st parameter (host GUID) to CLAIMED_ID command is not valid GUID. Received: \"%s\".", host_uuid_str);
  840. return PARSER_RC_ERROR;
  841. }
  842. if(uuid_parse(claim_id_str, uuid) && strcmp(claim_id_str, "NULL") != 0) {
  843. netdata_log_error("2nd parameter (Claim ID) to CLAIMED_ID command is not valid GUID. Received: \"%s\".", claim_id_str);
  844. return PARSER_RC_ERROR;
  845. }
  846. if(strcmp(host_uuid_str, host->machine_guid) != 0) {
  847. netdata_log_error("Claim ID is for host \"%s\" but it came over connection for \"%s\"", host_uuid_str, host->machine_guid);
  848. return PARSER_RC_OK; //the message is OK problem must be somewhere else
  849. }
  850. rrdhost_aclk_state_lock(host);
  851. if (host->aclk_state.claimed_id)
  852. freez(host->aclk_state.claimed_id);
  853. host->aclk_state.claimed_id = strcmp(claim_id_str, "NULL") ? strdupz(claim_id_str) : NULL;
  854. rrdhost_aclk_state_unlock(host);
  855. rrdhost_flag_set(host, RRDHOST_FLAG_METADATA_CLAIMID |RRDHOST_FLAG_METADATA_UPDATE);
  856. rrdpush_send_claimed_id(host);
  857. return PARSER_RC_OK;
  858. }
  859. // ----------------------------------------------------------------------------
  860. void pluginsd_cleanup_v2(PARSER *parser) {
  861. // this is called when the thread is stopped while processing
  862. pluginsd_clear_scope_chart(parser, "THREAD CLEANUP");
  863. }
  864. void pluginsd_process_thread_cleanup(void *ptr) {
  865. PARSER *parser = (PARSER *)ptr;
  866. pluginsd_cleanup_v2(parser);
  867. pluginsd_host_define_cleanup(parser);
  868. rrd_collector_finished();
  869. #ifdef NETDATA_LOG_STREAM_RECEIVE
  870. if(parser->user.stream_log_fp) {
  871. fclose(parser->user.stream_log_fp);
  872. parser->user.stream_log_fp = NULL;
  873. }
  874. #endif
  875. parser_destroy(parser);
  876. }
  877. bool parser_reconstruct_node(BUFFER *wb, void *ptr) {
  878. PARSER *parser = ptr;
  879. if(!parser || !parser->user.host)
  880. return false;
  881. buffer_strcat(wb, rrdhost_hostname(parser->user.host));
  882. return true;
  883. }
  884. bool parser_reconstruct_instance(BUFFER *wb, void *ptr) {
  885. PARSER *parser = ptr;
  886. if(!parser || !parser->user.st)
  887. return false;
  888. buffer_strcat(wb, rrdset_name(parser->user.st));
  889. return true;
  890. }
  891. bool parser_reconstruct_context(BUFFER *wb, void *ptr) {
  892. PARSER *parser = ptr;
  893. if(!parser || !parser->user.st)
  894. return false;
  895. buffer_strcat(wb, string2str(parser->user.st->context));
  896. return true;
  897. }
  898. inline size_t pluginsd_process(RRDHOST *host, struct plugind *cd, FILE *fp_plugin_input, FILE *fp_plugin_output, int trust_durations)
  899. {
  900. int enabled = cd->unsafe.enabled;
  901. if (!fp_plugin_input || !fp_plugin_output || !enabled) {
  902. cd->unsafe.enabled = 0;
  903. return 0;
  904. }
  905. if (unlikely(fileno(fp_plugin_input) == -1)) {
  906. netdata_log_error("input file descriptor given is not a valid stream");
  907. cd->serial_failures++;
  908. return 0;
  909. }
  910. if (unlikely(fileno(fp_plugin_output) == -1)) {
  911. netdata_log_error("output file descriptor given is not a valid stream");
  912. cd->serial_failures++;
  913. return 0;
  914. }
  915. clearerr(fp_plugin_input);
  916. clearerr(fp_plugin_output);
  917. PARSER *parser;
  918. {
  919. PARSER_USER_OBJECT user = {
  920. .enabled = cd->unsafe.enabled,
  921. .host = host,
  922. .cd = cd,
  923. .trust_durations = trust_durations
  924. };
  925. // fp_plugin_output = our input; fp_plugin_input = our output
  926. parser = parser_init(&user, fp_plugin_output, fp_plugin_input, -1, PARSER_INPUT_SPLIT, NULL);
  927. }
  928. pluginsd_keywords_init(parser, PARSER_INIT_PLUGINSD);
  929. rrd_collector_started();
  930. size_t count = 0;
  931. // this keeps the parser with its current value
  932. // so, parser needs to be allocated before pushing it
  933. netdata_thread_cleanup_push(pluginsd_process_thread_cleanup, parser)
  934. {
  935. ND_LOG_STACK lgs[] = {
  936. ND_LOG_FIELD_CB(NDF_REQUEST, line_splitter_reconstruct_line, &parser->line),
  937. ND_LOG_FIELD_CB(NDF_NIDL_NODE, parser_reconstruct_node, parser),
  938. ND_LOG_FIELD_CB(NDF_NIDL_INSTANCE, parser_reconstruct_instance, parser),
  939. ND_LOG_FIELD_CB(NDF_NIDL_CONTEXT, parser_reconstruct_context, parser),
  940. ND_LOG_FIELD_END(),
  941. };
  942. ND_LOG_STACK_PUSH(lgs);
  943. buffered_reader_init(&parser->reader);
  944. CLEAN_BUFFER *buffer = buffer_create(sizeof(parser->reader.read_buffer) + 2, NULL);
  945. while(likely(service_running(SERVICE_COLLECTORS))) {
  946. if(unlikely(!buffered_reader_next_line(&parser->reader, buffer))) {
  947. buffered_reader_ret_t ret = buffered_reader_read_timeout(
  948. &parser->reader,
  949. fileno((FILE *) parser->fp_input),
  950. 2 * 60 * MSEC_PER_SEC, true
  951. );
  952. if(unlikely(ret != BUFFERED_READER_READ_OK))
  953. break;
  954. continue;
  955. }
  956. if(unlikely(parser_action(parser, buffer->buffer)))
  957. break;
  958. buffer->len = 0;
  959. buffer->buffer[0] = '\0';
  960. }
  961. cd->unsafe.enabled = parser->user.enabled;
  962. count = parser->user.data_collections_count;
  963. if(likely(count)) {
  964. cd->successful_collections += count;
  965. cd->serial_failures = 0;
  966. }
  967. else
  968. cd->serial_failures++;
  969. }
  970. netdata_thread_cleanup_pop(1); // free parser with the pop function
  971. return count;
  972. }
  973. PARSER_RC parser_execute(PARSER *parser, PARSER_KEYWORD *keyword, char **words, size_t num_words) {
  974. switch(keyword->id) {
  975. case 1:
  976. return pluginsd_set_v2(words, num_words, parser);
  977. case 2:
  978. return pluginsd_begin_v2(words, num_words, parser);
  979. case 3:
  980. return pluginsd_end_v2(words, num_words, parser);
  981. case 11:
  982. return pluginsd_set(words, num_words, parser);
  983. case 12:
  984. return pluginsd_begin(words, num_words, parser);
  985. case 13:
  986. return pluginsd_end(words, num_words, parser);
  987. case 21:
  988. return pluginsd_replay_set(words, num_words, parser);
  989. case 22:
  990. return pluginsd_replay_begin(words, num_words, parser);
  991. case 23:
  992. return pluginsd_replay_rrddim_collection_state(words, num_words, parser);
  993. case 24:
  994. return pluginsd_replay_rrdset_collection_state(words, num_words, parser);
  995. case 25:
  996. return pluginsd_replay_end(words, num_words, parser);
  997. case 31:
  998. return pluginsd_dimension(words, num_words, parser);
  999. case 32:
  1000. return pluginsd_chart(words, num_words, parser);
  1001. case 33:
  1002. return pluginsd_chart_definition_end(words, num_words, parser);
  1003. case 34:
  1004. return pluginsd_clabel(words, num_words, parser);
  1005. case 35:
  1006. return pluginsd_clabel_commit(words, num_words, parser);
  1007. case 41:
  1008. return pluginsd_function(words, num_words, parser);
  1009. case 42:
  1010. return pluginsd_function_result_begin(words, num_words, parser);
  1011. case 43:
  1012. return pluginsd_function_progress(words, num_words, parser);
  1013. case 51:
  1014. return pluginsd_label(words, num_words, parser);
  1015. case 52:
  1016. return pluginsd_overwrite(words, num_words, parser);
  1017. case 53:
  1018. return pluginsd_variable(words, num_words, parser);
  1019. case 61:
  1020. return streaming_claimed_id(words, num_words, parser);
  1021. case 71:
  1022. return pluginsd_host(words, num_words, parser);
  1023. case 72:
  1024. return pluginsd_host_define(words, num_words, parser);
  1025. case 73:
  1026. return pluginsd_host_define_end(words, num_words, parser);
  1027. case 74:
  1028. return pluginsd_host_labels(words, num_words, parser);
  1029. case 97:
  1030. return pluginsd_flush(words, num_words, parser);
  1031. case 98:
  1032. return pluginsd_disable(words, num_words, parser);
  1033. case 99:
  1034. return pluginsd_exit(words, num_words, parser);
  1035. case 101:
  1036. return pluginsd_register_plugin(words, num_words, parser);
  1037. case 102:
  1038. return pluginsd_register_module(words, num_words, parser);
  1039. case 103:
  1040. return pluginsd_register_job(words, num_words, parser);
  1041. case 104:
  1042. return pluginsd_dyncfg_reset(words, num_words, parser);
  1043. case 110:
  1044. return pluginsd_job_status(words, num_words, parser);
  1045. case 111:
  1046. return pluginsd_delete_job(words, num_words, parser);
  1047. default:
  1048. break;
  1049. }
  1050. fatal("Unknown keyword '%s' with id %zu", keyword->keyword, keyword->id);
  1051. }
  1052. #include "gperf-hashtable.h"
  1053. void parser_init_repertoire(PARSER *parser, PARSER_REPERTOIRE repertoire) {
  1054. parser->repertoire = repertoire;
  1055. for(size_t i = GPERF_PARSER_MIN_HASH_VALUE ; i <= GPERF_PARSER_MAX_HASH_VALUE ;i++) {
  1056. if(gperf_keywords[i].keyword && *gperf_keywords[i].keyword && (parser->repertoire & gperf_keywords[i].repertoire))
  1057. worker_register_job_name(gperf_keywords[i].worker_job_id, gperf_keywords[i].keyword);
  1058. }
  1059. }
  1060. int pluginsd_parser_unittest(void) {
  1061. PARSER *p = parser_init(NULL, NULL, NULL, -1, PARSER_INPUT_SPLIT, NULL);
  1062. pluginsd_keywords_init(p, PARSER_INIT_PLUGINSD | PARSER_INIT_STREAMING);
  1063. char *lines[] = {
  1064. "BEGIN2 abcdefghijklmnopqr 123",
  1065. "SET2 abcdefg 0x12345678 0 0",
  1066. "SET2 hijklmnoqr 0x12345678 0 0",
  1067. "SET2 stuvwxyz 0x12345678 0 0",
  1068. "END2",
  1069. NULL,
  1070. };
  1071. char *words[PLUGINSD_MAX_WORDS];
  1072. size_t iterations = 1000000;
  1073. size_t count = 0;
  1074. char input[PLUGINSD_LINE_MAX + 1];
  1075. usec_t started = now_realtime_usec();
  1076. while(--iterations) {
  1077. for(size_t line = 0; lines[line] ;line++) {
  1078. strncpyz(input, lines[line], PLUGINSD_LINE_MAX);
  1079. size_t num_words = quoted_strings_splitter_pluginsd(input, words, PLUGINSD_MAX_WORDS);
  1080. const char *command = get_word(words, num_words, 0);
  1081. PARSER_KEYWORD *keyword = parser_find_keyword(p, command);
  1082. if(unlikely(!keyword))
  1083. fatal("Cannot parse the line '%s'", lines[line]);
  1084. count++;
  1085. }
  1086. }
  1087. usec_t ended = now_realtime_usec();
  1088. netdata_log_info("Parsed %zu lines in %0.2f secs, %0.2f klines/sec", count,
  1089. (double)(ended - started) / (double)USEC_PER_SEC,
  1090. (double)count / ((double)(ended - started) / (double)USEC_PER_SEC) / 1000.0);
  1091. parser_destroy(p);
  1092. return 0;
  1093. }