pluginsd_parser.c 77 KB


  1. // SPDX-License-Identifier: GPL-3.0-or-later
  2. #include "pluginsd_parser.h"
  3. #define LOG_FUNCTIONS false
  4. static int send_to_plugin(const char *txt, void *data) {
  5. PARSER *parser = data;
  6. if(!txt || !*txt)
  7. return 0;
  8. #ifdef ENABLE_HTTPS
  9. NETDATA_SSL *ssl = parser->ssl_output;
  10. if(ssl) {
  11. if(SSL_connection(ssl))
  12. return (int)netdata_ssl_write(ssl, (void *)txt, strlen(txt));
  13. error("PLUGINSD: cannot send command (SSL)");
  14. return -1;
  15. }
  16. #endif
  17. if(parser->fp_output) {
  18. int bytes = fprintf(parser->fp_output, "%s", txt);
  19. if(bytes <= 0) {
  20. error("PLUGINSD: cannot send command (FILE)");
  21. return -2;
  22. }
  23. fflush(parser->fp_output);
  24. return bytes;
  25. }
  26. if(parser->fd != -1) {
  27. size_t bytes = 0;
  28. size_t total = strlen(txt);
  29. ssize_t sent;
  30. do {
  31. sent = write(parser->fd, &txt[bytes], total - bytes);
  32. if(sent <= 0) {
  33. error("PLUGINSD: cannot send command (fd)");
  34. return -3;
  35. }
  36. bytes += sent;
  37. }
  38. while(bytes < total);
  39. return (int)bytes;
  40. }
  41. error("PLUGINSD: cannot send command (no output socket/pipe/file given to plugins.d parser)");
  42. return -4;
  43. }
  44. static inline RRDHOST *pluginsd_require_host_from_parent(void *user, const char *cmd) {
  45. RRDHOST *host = ((PARSER_USER_OBJECT *) user)->host;
  46. if(unlikely(!host))
  47. error("PLUGINSD: command %s requires a host, but is not set.", cmd);
  48. return host;
  49. }
  50. static inline RRDSET *pluginsd_require_chart_from_parent(void *user, const char *cmd, const char *parent_cmd) {
  51. RRDSET *st = ((PARSER_USER_OBJECT *) user)->st;
  52. if(unlikely(!st))
  53. error("PLUGINSD: command %s requires a chart defined via command %s, but is not set.", cmd, parent_cmd);
  54. return st;
  55. }
  56. static inline RRDSET *pluginsd_get_chart_from_parent(void *user) {
  57. return ((PARSER_USER_OBJECT *) user)->st;
  58. }
  59. static inline void pluginsd_lock_rrdset_data_collection(void *user) {
  60. PARSER_USER_OBJECT *u = (PARSER_USER_OBJECT *) user;
  61. if(u->st && !u->v2.locked_data_collection) {
  62. netdata_spinlock_lock(&u->st->data_collection_lock);
  63. u->v2.locked_data_collection = true;
  64. }
  65. }
  66. static inline bool pluginsd_unlock_rrdset_data_collection(void *user) {
  67. PARSER_USER_OBJECT *u = (PARSER_USER_OBJECT *) user;
  68. if(u->st && u->v2.locked_data_collection) {
  69. netdata_spinlock_unlock(&u->st->data_collection_lock);
  70. u->v2.locked_data_collection = false;
  71. return true;
  72. }
  73. return false;
  74. }
  75. void pluginsd_rrdset_cleanup(RRDSET *st) {
  76. for(size_t i = 0; i < st->pluginsd.used ; i++) {
  77. if (st->pluginsd.rda[i]) {
  78. rrddim_acquired_release(st->pluginsd.rda[i]);
  79. st->pluginsd.rda[i] = NULL;
  80. }
  81. }
  82. freez(st->pluginsd.rda);
  83. st->pluginsd.rda = NULL;
  84. st->pluginsd.size = 0;
  85. st->pluginsd.used = 0;
  86. st->pluginsd.pos = 0;
  87. }
  88. static inline void pluginsd_unlock_previous_chart(void *user, const char *keyword, bool stale) {
  89. PARSER_USER_OBJECT *u = (PARSER_USER_OBJECT *) user;
  90. if(unlikely(pluginsd_unlock_rrdset_data_collection(user))) {
  91. if(stale)
  92. error("PLUGINSD: 'host:%s/chart:%s/' stale data collection lock found during %s; it has been unlocked",
  93. rrdhost_hostname(u->st->rrdhost), rrdset_id(u->st), keyword);
  94. }
  95. if(unlikely(u->v2.ml_locked)) {
  96. ml_chart_update_end(u->st);
  97. u->v2.ml_locked = false;
  98. if(stale)
  99. error("PLUGINSD: 'host:%s/chart:%s/' stale ML lock found during %s, it has been unlocked",
  100. rrdhost_hostname(u->st->rrdhost), rrdset_id(u->st), keyword);
  101. }
  102. }
  103. static inline void pluginsd_set_chart_from_parent(void *user, RRDSET *st, const char *keyword) {
  104. PARSER_USER_OBJECT *u = (PARSER_USER_OBJECT *) user;
  105. pluginsd_unlock_previous_chart(user, keyword, true);
  106. if(st) {
  107. size_t dims = dictionary_entries(st->rrddim_root_index);
  108. if(unlikely(st->pluginsd.size < dims)) {
  109. st->pluginsd.rda = reallocz(st->pluginsd.rda, dims * sizeof(RRDDIM_ACQUIRED *));
  110. st->pluginsd.size = dims;
  111. }
  112. if(st->pluginsd.pos > st->pluginsd.used && st->pluginsd.pos <= st->pluginsd.size)
  113. st->pluginsd.used = st->pluginsd.pos;
  114. st->pluginsd.pos = 0;
  115. }
  116. u->st = st;
  117. }
  118. static inline RRDDIM *pluginsd_acquire_dimension(RRDHOST *host, RRDSET *st, const char *dimension, const char *cmd) {
  119. if (unlikely(!dimension || !*dimension)) {
  120. error("PLUGINSD: 'host:%s/chart:%s' got a %s, without a dimension.",
  121. rrdhost_hostname(host), rrdset_id(st), cmd);
  122. return NULL;
  123. }
  124. RRDDIM_ACQUIRED *rda;
  125. if(likely(st->pluginsd.pos < st->pluginsd.used)) {
  126. rda = st->pluginsd.rda[st->pluginsd.pos];
  127. RRDDIM *rd = rrddim_acquired_to_rrddim(rda);
  128. if (likely(rd && string_strcmp(rd->id, dimension) == 0)) {
  129. st->pluginsd.pos++;
  130. return rd;
  131. }
  132. else {
  133. rrddim_acquired_release(rda);
  134. st->pluginsd.rda[st->pluginsd.pos] = NULL;
  135. }
  136. }
  137. rda = rrddim_find_and_acquire(st, dimension);
  138. if (unlikely(!rda)) {
  139. error("PLUGINSD: 'host:%s/chart:%s/dim:%s' got a %s but dimension does not exist.",
  140. rrdhost_hostname(host), rrdset_id(st), dimension, cmd);
  141. return NULL;
  142. }
  143. if(likely(st->pluginsd.pos < st->pluginsd.size))
  144. st->pluginsd.rda[st->pluginsd.pos++] = rda;
  145. return rrddim_acquired_to_rrddim(rda);
  146. }
  147. static inline RRDSET *pluginsd_find_chart(RRDHOST *host, const char *chart, const char *cmd) {
  148. if (unlikely(!chart || !*chart)) {
  149. error("PLUGINSD: 'host:%s' got a %s without a chart id.",
  150. rrdhost_hostname(host), cmd);
  151. return NULL;
  152. }
  153. RRDSET *st = rrdset_find(host, chart);
  154. if (unlikely(!st))
  155. error("PLUGINSD: 'host:%s/chart:%s' got a %s but chart does not exist.",
  156. rrdhost_hostname(host), chart, cmd);
  157. return st;
  158. }
  159. static inline PARSER_RC PLUGINSD_DISABLE_PLUGIN(void *user, const char *keyword, const char *msg) {
  160. ((PARSER_USER_OBJECT *) user)->enabled = 0;
  161. if(keyword && msg) {
  162. error_limit_static_global_var(erl, 1, 0);
  163. error_limit(&erl, "PLUGINSD: keyword %s: %s", keyword, msg);
  164. }
  165. return PARSER_RC_ERROR;
  166. }
  167. PARSER_RC pluginsd_set(char **words, size_t num_words, void *user)
  168. {
  169. char *dimension = get_word(words, num_words, 1);
  170. char *value = get_word(words, num_words, 2);
  171. RRDHOST *host = pluginsd_require_host_from_parent(user, PLUGINSD_KEYWORD_SET);
  172. if(!host) return PLUGINSD_DISABLE_PLUGIN(user, NULL, NULL);
  173. RRDSET *st = pluginsd_require_chart_from_parent(user, PLUGINSD_KEYWORD_SET, PLUGINSD_KEYWORD_CHART);
  174. if(!st) return PLUGINSD_DISABLE_PLUGIN(user, NULL, NULL);
  175. RRDDIM *rd = pluginsd_acquire_dimension(host, st, dimension, PLUGINSD_KEYWORD_SET);
  176. if(!rd) return PLUGINSD_DISABLE_PLUGIN(user, NULL, NULL);
  177. if (unlikely(rrdset_flag_check(st, RRDSET_FLAG_DEBUG)))
  178. debug(D_PLUGINSD, "PLUGINSD: 'host:%s/chart:%s/dim:%s' SET is setting value to '%s'",
  179. rrdhost_hostname(host), rrdset_id(st), dimension, value && *value ? value : "UNSET");
  180. if (value && *value)
  181. rrddim_set_by_pointer(st, rd, str2ll_encoded(value));
  182. return PARSER_RC_OK;
  183. }
  184. PARSER_RC pluginsd_begin(char **words, size_t num_words, void *user)
  185. {
  186. char *id = get_word(words, num_words, 1);
  187. char *microseconds_txt = get_word(words, num_words, 2);
  188. RRDHOST *host = pluginsd_require_host_from_parent(user, PLUGINSD_KEYWORD_BEGIN);
  189. if(!host) return PLUGINSD_DISABLE_PLUGIN(user, NULL, NULL);
  190. RRDSET *st = pluginsd_find_chart(host, id, PLUGINSD_KEYWORD_BEGIN);
  191. if(!st) return PLUGINSD_DISABLE_PLUGIN(user, NULL, NULL);
  192. pluginsd_set_chart_from_parent(user, st, PLUGINSD_KEYWORD_BEGIN);
  193. usec_t microseconds = 0;
  194. if (microseconds_txt && *microseconds_txt) {
  195. long long t = str2ll(microseconds_txt, NULL);
  196. if(t >= 0)
  197. microseconds = t;
  198. }
  199. #ifdef NETDATA_LOG_REPLICATION_REQUESTS
  200. if(st->replay.log_next_data_collection) {
  201. st->replay.log_next_data_collection = false;
  202. internal_error(true,
  203. "REPLAY: 'host:%s/chart:%s' first BEGIN after replication, last collected %llu, last updated %llu, microseconds %llu",
  204. rrdhost_hostname(host), rrdset_id(st),
  205. st->last_collected_time.tv_sec * USEC_PER_SEC + st->last_collected_time.tv_usec,
  206. st->last_updated.tv_sec * USEC_PER_SEC + st->last_updated.tv_usec,
  207. microseconds
  208. );
  209. }
  210. #endif
  211. if (likely(st->counter_done)) {
  212. if (likely(microseconds)) {
  213. if (((PARSER_USER_OBJECT *)user)->trust_durations)
  214. rrdset_next_usec_unfiltered(st, microseconds);
  215. else
  216. rrdset_next_usec(st, microseconds);
  217. }
  218. else
  219. rrdset_next(st);
  220. }
  221. return PARSER_RC_OK;
  222. }
  223. PARSER_RC pluginsd_end(char **words, size_t num_words, void *user)
  224. {
  225. UNUSED(words);
  226. UNUSED(num_words);
  227. RRDHOST *host = pluginsd_require_host_from_parent(user, PLUGINSD_KEYWORD_END);
  228. if(!host) return PLUGINSD_DISABLE_PLUGIN(user, NULL, NULL);
  229. RRDSET *st = pluginsd_require_chart_from_parent(user, PLUGINSD_KEYWORD_END, PLUGINSD_KEYWORD_BEGIN);
  230. if(!st) return PLUGINSD_DISABLE_PLUGIN(user, NULL, NULL);
  231. if (unlikely(rrdset_flag_check(st, RRDSET_FLAG_DEBUG)))
  232. debug(D_PLUGINSD, "requested an END on chart '%s'", rrdset_id(st));
  233. pluginsd_set_chart_from_parent(user, NULL, PLUGINSD_KEYWORD_END);
  234. ((PARSER_USER_OBJECT *) user)->data_collections_count++;
  235. struct timeval now;
  236. now_realtime_timeval(&now);
  237. rrdset_timed_done(st, now, /* pending_rrdset_next = */ false);
  238. return PARSER_RC_OK;
  239. }
  240. static void pluginsd_host_define_cleanup(void *user) {
  241. PARSER_USER_OBJECT *u = user;
  242. string_freez(u->host_define.hostname);
  243. dictionary_destroy(u->host_define.rrdlabels);
  244. u->host_define.hostname = NULL;
  245. u->host_define.rrdlabels = NULL;
  246. u->host_define.parsing_host = false;
  247. }
  248. static inline bool pluginsd_validate_machine_guid(const char *guid, uuid_t *uuid, char *output) {
  249. if(uuid_parse(guid, *uuid))
  250. return false;
  251. uuid_unparse_lower(*uuid, output);
  252. return true;
  253. }
  254. static PARSER_RC pluginsd_host_define(char **words, size_t num_words, void *user) {
  255. PARSER_USER_OBJECT *u = user;
  256. char *guid = get_word(words, num_words, 1);
  257. char *hostname = get_word(words, num_words, 2);
  258. if(unlikely(!guid || !*guid || !hostname || !*hostname))
  259. return PLUGINSD_DISABLE_PLUGIN(user, PLUGINSD_KEYWORD_HOST_DEFINE, "missing parameters");
  260. if(unlikely(u->host_define.parsing_host))
  261. return PLUGINSD_DISABLE_PLUGIN(user, PLUGINSD_KEYWORD_HOST_DEFINE,
  262. "another host definition is already open - did you send " PLUGINSD_KEYWORD_HOST_DEFINE_END "?");
  263. if(!pluginsd_validate_machine_guid(guid, &u->host_define.machine_guid, u->host_define.machine_guid_str))
  264. return PLUGINSD_DISABLE_PLUGIN(user, PLUGINSD_KEYWORD_HOST_DEFINE, "cannot parse MACHINE_GUID - is it a valid UUID?");
  265. u->host_define.hostname = string_strdupz(hostname);
  266. u->host_define.rrdlabels = rrdlabels_create();
  267. u->host_define.parsing_host = true;
  268. return PARSER_RC_OK;
  269. }
  270. static inline PARSER_RC pluginsd_host_dictionary(char **words, size_t num_words, void *user, DICTIONARY *dict, const char *keyword) {
  271. PARSER_USER_OBJECT *u = user;
  272. char *name = get_word(words, num_words, 1);
  273. char *value = get_word(words, num_words, 2);
  274. if(!name || !*name || !value)
  275. return PLUGINSD_DISABLE_PLUGIN(user, keyword, "missing parameters");
  276. if(!u->host_define.parsing_host || !dict)
  277. return PLUGINSD_DISABLE_PLUGIN(user, keyword, "host is not defined, send " PLUGINSD_KEYWORD_HOST_DEFINE " before this");
  278. rrdlabels_add(dict, name, value, RRDLABEL_SRC_CONFIG);
  279. return PARSER_RC_OK;
  280. }
  281. static PARSER_RC pluginsd_host_labels(char **words, size_t num_words, void *user) {
  282. PARSER_USER_OBJECT *u = user;
  283. return pluginsd_host_dictionary(words, num_words, user, u->host_define.rrdlabels, PLUGINSD_KEYWORD_HOST_LABEL);
  284. }
  285. static PARSER_RC pluginsd_host_define_end(char **words __maybe_unused, size_t num_words __maybe_unused, void *user) {
  286. PARSER_USER_OBJECT *u = user;
  287. if(!u->host_define.parsing_host)
  288. return PLUGINSD_DISABLE_PLUGIN(user, PLUGINSD_KEYWORD_HOST_DEFINE_END, "missing initialization, send " PLUGINSD_KEYWORD_HOST_DEFINE " before this");
  289. RRDHOST *host = rrdhost_find_or_create(
  290. string2str(u->host_define.hostname),
  291. string2str(u->host_define.hostname),
  292. u->host_define.machine_guid_str,
  293. "Netdata Virtual Host 1.0",
  294. netdata_configured_timezone,
  295. netdata_configured_abbrev_timezone,
  296. netdata_configured_utc_offset,
  297. NULL,
  298. program_name,
  299. program_version,
  300. default_rrd_update_every,
  301. default_rrd_history_entries,
  302. default_rrd_memory_mode,
  303. default_health_enabled,
  304. default_rrdpush_enabled,
  305. default_rrdpush_destination,
  306. default_rrdpush_api_key,
  307. default_rrdpush_send_charts_matching,
  308. default_rrdpush_enable_replication,
  309. default_rrdpush_seconds_to_replicate,
  310. default_rrdpush_replication_step,
  311. rrdhost_labels_to_system_info(u->host_define.rrdlabels),
  312. false
  313. );
  314. if(host->rrdlabels) {
  315. rrdlabels_migrate_to_these(host->rrdlabels, u->host_define.rrdlabels);
  316. }
  317. else {
  318. host->rrdlabels = u->host_define.rrdlabels;
  319. u->host_define.rrdlabels = NULL;
  320. }
  321. pluginsd_host_define_cleanup(user);
  322. u->host = host;
  323. pluginsd_set_chart_from_parent(user, NULL, PLUGINSD_KEYWORD_HOST_DEFINE_END);
  324. rrdhost_flag_clear(host, RRDHOST_FLAG_ORPHAN);
  325. rrdcontext_host_child_connected(host);
  326. schedule_node_info_update(host);
  327. return PARSER_RC_OK;
  328. }
  329. static PARSER_RC pluginsd_host(char **words, size_t num_words, void *user) {
  330. PARSER_USER_OBJECT *u = user;
  331. char *guid = get_word(words, num_words, 1);
  332. if(!guid || !*guid || strcmp(guid, "localhost") == 0) {
  333. u->host = localhost;
  334. return PARSER_RC_OK;
  335. }
  336. uuid_t uuid;
  337. char uuid_str[UUID_STR_LEN];
  338. if(!pluginsd_validate_machine_guid(guid, &uuid, uuid_str))
  339. return PLUGINSD_DISABLE_PLUGIN(user, PLUGINSD_KEYWORD_HOST, "cannot parse MACHINE_GUID - is it a valid UUID?");
  340. RRDHOST *host = rrdhost_find_by_guid(uuid_str);
  341. if(unlikely(!host))
  342. return PLUGINSD_DISABLE_PLUGIN(user, PLUGINSD_KEYWORD_HOST, "cannot find a host with this machine guid - have you created it?");
  343. u->host = host;
  344. return PARSER_RC_OK;
  345. }
  346. PARSER_RC pluginsd_chart(char **words, size_t num_words, void *user)
  347. {
  348. RRDHOST *host = pluginsd_require_host_from_parent(user, PLUGINSD_KEYWORD_CHART);
  349. if(!host) return PLUGINSD_DISABLE_PLUGIN(user, NULL, NULL);
  350. char *type = get_word(words, num_words, 1);
  351. char *name = get_word(words, num_words, 2);
  352. char *title = get_word(words, num_words, 3);
  353. char *units = get_word(words, num_words, 4);
  354. char *family = get_word(words, num_words, 5);
  355. char *context = get_word(words, num_words, 6);
  356. char *chart = get_word(words, num_words, 7);
  357. char *priority_s = get_word(words, num_words, 8);
  358. char *update_every_s = get_word(words, num_words, 9);
  359. char *options = get_word(words, num_words, 10);
  360. char *plugin = get_word(words, num_words, 11);
  361. char *module = get_word(words, num_words, 12);
  362. // parse the id from type
  363. char *id = NULL;
  364. if (likely(type && (id = strchr(type, '.')))) {
  365. *id = '\0';
  366. id++;
  367. }
  368. // make sure we have the required variables
  369. if (unlikely((!type || !*type || !id || !*id)))
  370. return PLUGINSD_DISABLE_PLUGIN(user, PLUGINSD_KEYWORD_CHART, "missing parameters");
  371. // parse the name, and make sure it does not include 'type.'
  372. if (unlikely(name && *name)) {
  373. // when data are streamed from child nodes
  374. // name will be type.name
  375. // so, we have to remove 'type.' from name too
  376. size_t len = strlen(type);
  377. if (strncmp(type, name, len) == 0 && name[len] == '.')
  378. name = &name[len + 1];
  379. // if the name is the same with the id,
  380. // or is just 'NULL', clear it.
  381. if (unlikely(strcmp(name, id) == 0 || strcasecmp(name, "NULL") == 0 || strcasecmp(name, "(NULL)") == 0))
  382. name = NULL;
  383. }
  384. int priority = 1000;
  385. if (likely(priority_s && *priority_s))
  386. priority = str2i(priority_s);
  387. int update_every = ((PARSER_USER_OBJECT *) user)->cd->update_every;
  388. if (likely(update_every_s && *update_every_s))
  389. update_every = str2i(update_every_s);
  390. if (unlikely(!update_every))
  391. update_every = ((PARSER_USER_OBJECT *) user)->cd->update_every;
  392. RRDSET_TYPE chart_type = RRDSET_TYPE_LINE;
  393. if (unlikely(chart))
  394. chart_type = rrdset_type_id(chart);
  395. if (unlikely(name && !*name))
  396. name = NULL;
  397. if (unlikely(family && !*family))
  398. family = NULL;
  399. if (unlikely(context && !*context))
  400. context = NULL;
  401. if (unlikely(!title))
  402. title = "";
  403. if (unlikely(!units))
  404. units = "unknown";
  405. debug(
  406. D_PLUGINSD,
  407. "creating chart type='%s', id='%s', name='%s', family='%s', context='%s', chart='%s', priority=%d, update_every=%d",
  408. type, id, name ? name : "", family ? family : "", context ? context : "", rrdset_type_name(chart_type),
  409. priority, update_every);
  410. RRDSET *st = NULL;
  411. st = rrdset_create(
  412. host, type, id, name, family, context, title, units,
  413. (plugin && *plugin) ? plugin : ((PARSER_USER_OBJECT *)user)->cd->filename,
  414. module, priority, update_every,
  415. chart_type);
  416. if (likely(st)) {
  417. if (options && *options) {
  418. if (strstr(options, "obsolete"))
  419. rrdset_is_obsolete(st);
  420. else
  421. rrdset_isnot_obsolete(st);
  422. if (strstr(options, "detail"))
  423. rrdset_flag_set(st, RRDSET_FLAG_DETAIL);
  424. else
  425. rrdset_flag_clear(st, RRDSET_FLAG_DETAIL);
  426. if (strstr(options, "hidden"))
  427. rrdset_flag_set(st, RRDSET_FLAG_HIDDEN);
  428. else
  429. rrdset_flag_clear(st, RRDSET_FLAG_HIDDEN);
  430. if (strstr(options, "store_first"))
  431. rrdset_flag_set(st, RRDSET_FLAG_STORE_FIRST);
  432. else
  433. rrdset_flag_clear(st, RRDSET_FLAG_STORE_FIRST);
  434. } else {
  435. rrdset_isnot_obsolete(st);
  436. rrdset_flag_clear(st, RRDSET_FLAG_DETAIL);
  437. rrdset_flag_clear(st, RRDSET_FLAG_STORE_FIRST);
  438. }
  439. }
  440. pluginsd_set_chart_from_parent(user, st, PLUGINSD_KEYWORD_CHART);
  441. return PARSER_RC_OK;
  442. }
  443. PARSER_RC pluginsd_chart_definition_end(char **words, size_t num_words, void *user)
  444. {
  445. const char *first_entry_txt = get_word(words, num_words, 1);
  446. const char *last_entry_txt = get_word(words, num_words, 2);
  447. const char *wall_clock_time_txt = get_word(words, num_words, 3);
  448. RRDHOST *host = pluginsd_require_host_from_parent(user, PLUGINSD_KEYWORD_CHART_DEFINITION_END);
  449. if(!host) return PLUGINSD_DISABLE_PLUGIN(user, NULL, NULL);
  450. RRDSET *st = pluginsd_require_chart_from_parent(user, PLUGINSD_KEYWORD_CHART_DEFINITION_END, PLUGINSD_KEYWORD_CHART);
  451. if(!st) return PLUGINSD_DISABLE_PLUGIN(user, NULL, NULL);
  452. time_t first_entry_child = (first_entry_txt && *first_entry_txt) ? (time_t)str2ul(first_entry_txt) : 0;
  453. time_t last_entry_child = (last_entry_txt && *last_entry_txt) ? (time_t)str2ul(last_entry_txt) : 0;
  454. time_t child_wall_clock_time = (wall_clock_time_txt && *wall_clock_time_txt) ? (time_t)str2ul(wall_clock_time_txt) : now_realtime_sec();
  455. bool ok = true;
  456. if(!rrdset_flag_check(st, RRDSET_FLAG_RECEIVER_REPLICATION_IN_PROGRESS)) {
  457. #ifdef NETDATA_LOG_REPLICATION_REQUESTS
  458. st->replay.start_streaming = false;
  459. st->replay.after = 0;
  460. st->replay.before = 0;
  461. #endif
  462. rrdset_flag_set(st, RRDSET_FLAG_RECEIVER_REPLICATION_IN_PROGRESS);
  463. rrdset_flag_clear(st, RRDSET_FLAG_RECEIVER_REPLICATION_FINISHED);
  464. rrdhost_receiver_replicating_charts_plus_one(st->rrdhost);
  465. PARSER *parser = ((PARSER_USER_OBJECT *)user)->parser;
  466. ok = replicate_chart_request(send_to_plugin, parser, host, st,
  467. first_entry_child, last_entry_child, child_wall_clock_time,
  468. 0, 0);
  469. }
  470. #ifdef NETDATA_LOG_REPLICATION_REQUESTS
  471. else {
  472. internal_error(true, "REPLAY: 'host:%s/chart:%s' not sending duplicate replication request",
  473. rrdhost_hostname(st->rrdhost), rrdset_id(st));
  474. }
  475. #endif
  476. return ok ? PARSER_RC_OK : PARSER_RC_ERROR;
  477. }
  478. PARSER_RC pluginsd_dimension(char **words, size_t num_words, void *user)
  479. {
  480. char *id = get_word(words, num_words, 1);
  481. char *name = get_word(words, num_words, 2);
  482. char *algorithm = get_word(words, num_words, 3);
  483. char *multiplier_s = get_word(words, num_words, 4);
  484. char *divisor_s = get_word(words, num_words, 5);
  485. char *options = get_word(words, num_words, 6);
  486. RRDHOST *host = pluginsd_require_host_from_parent(user, PLUGINSD_KEYWORD_DIMENSION);
  487. if(!host) return PLUGINSD_DISABLE_PLUGIN(user, NULL, NULL);
  488. RRDSET *st = pluginsd_require_chart_from_parent(user, PLUGINSD_KEYWORD_DIMENSION, PLUGINSD_KEYWORD_CHART);
  489. if(!st) return PLUGINSD_DISABLE_PLUGIN(user, NULL, NULL);
  490. if (unlikely(!id))
  491. return PLUGINSD_DISABLE_PLUGIN(user, PLUGINSD_KEYWORD_DIMENSION, "missing dimension id");
  492. long multiplier = 1;
  493. if (multiplier_s && *multiplier_s) {
  494. multiplier = str2ll_encoded(multiplier_s);
  495. if (unlikely(!multiplier))
  496. multiplier = 1;
  497. }
  498. long divisor = 1;
  499. if (likely(divisor_s && *divisor_s)) {
  500. divisor = str2ll_encoded(divisor_s);
  501. if (unlikely(!divisor))
  502. divisor = 1;
  503. }
  504. if (unlikely(!algorithm || !*algorithm))
  505. algorithm = "absolute";
  506. if (unlikely(st && rrdset_flag_check(st, RRDSET_FLAG_DEBUG)))
  507. debug(
  508. D_PLUGINSD,
  509. "creating dimension in chart %s, id='%s', name='%s', algorithm='%s', multiplier=%ld, divisor=%ld, hidden='%s'",
  510. rrdset_id(st), id, name ? name : "", rrd_algorithm_name(rrd_algorithm_id(algorithm)), multiplier, divisor,
  511. options ? options : "");
  512. RRDDIM *rd = rrddim_add(st, id, name, multiplier, divisor, rrd_algorithm_id(algorithm));
  513. int unhide_dimension = 1;
  514. rrddim_option_clear(rd, RRDDIM_OPTION_DONT_DETECT_RESETS_OR_OVERFLOWS);
  515. if (options && *options) {
  516. if (strstr(options, "obsolete") != NULL)
  517. rrddim_is_obsolete(st, rd);
  518. else
  519. rrddim_isnot_obsolete(st, rd);
  520. unhide_dimension = !strstr(options, "hidden");
  521. if (strstr(options, "noreset") != NULL)
  522. rrddim_option_set(rd, RRDDIM_OPTION_DONT_DETECT_RESETS_OR_OVERFLOWS);
  523. if (strstr(options, "nooverflow") != NULL)
  524. rrddim_option_set(rd, RRDDIM_OPTION_DONT_DETECT_RESETS_OR_OVERFLOWS);
  525. } else
  526. rrddim_isnot_obsolete(st, rd);
  527. bool should_update_dimension = false;
  528. if (likely(unhide_dimension)) {
  529. rrddim_option_clear(rd, RRDDIM_OPTION_HIDDEN);
  530. should_update_dimension = rrddim_flag_check(rd, RRDDIM_FLAG_META_HIDDEN);
  531. }
  532. else {
  533. rrddim_option_set(rd, RRDDIM_OPTION_HIDDEN);
  534. should_update_dimension = !rrddim_flag_check(rd, RRDDIM_FLAG_META_HIDDEN);
  535. }
  536. if (should_update_dimension) {
  537. rrddim_flag_set(rd, RRDDIM_FLAG_METADATA_UPDATE);
  538. rrdhost_flag_set(rd->rrdset->rrdhost, RRDHOST_FLAG_METADATA_UPDATE);
  539. }
  540. return PARSER_RC_OK;
  541. }
  542. // ----------------------------------------------------------------------------
  543. // execution of functions
  544. struct inflight_function {
  545. int code;
  546. int timeout;
  547. BUFFER *destination_wb;
  548. STRING *function;
  549. void (*callback)(BUFFER *wb, int code, void *callback_data);
  550. void *callback_data;
  551. usec_t timeout_ut;
  552. usec_t started_ut;
  553. usec_t sent_ut;
  554. };
  555. static void inflight_functions_insert_callback(const DICTIONARY_ITEM *item, void *func, void *parser_ptr) {
  556. struct inflight_function *pf = func;
  557. PARSER *parser = parser_ptr;
  558. // leave this code as default, so that when the dictionary is destroyed this will be sent back to the caller
  559. pf->code = HTTP_RESP_GATEWAY_TIMEOUT;
  560. char buffer[2048 + 1];
  561. snprintfz(buffer, 2048, "FUNCTION %s %d \"%s\"\n",
  562. dictionary_acquired_item_name(item),
  563. pf->timeout,
  564. string2str(pf->function));
  565. // send the command to the plugin
  566. int ret = send_to_plugin(buffer, parser);
  567. pf->sent_ut = now_realtime_usec();
  568. if(ret < 0) {
  569. error("FUNCTION: failed to send function to plugin, error %d", ret);
  570. rrd_call_function_error(pf->destination_wb, "Failed to communicate with collector", HTTP_RESP_BACKEND_FETCH_FAILED);
  571. }
  572. else {
  573. internal_error(LOG_FUNCTIONS,
  574. "FUNCTION '%s' with transaction '%s' sent to collector (%d bytes, in %llu usec)",
  575. string2str(pf->function), dictionary_acquired_item_name(item), ret,
  576. pf->sent_ut - pf->started_ut);
  577. }
  578. }
  579. static bool inflight_functions_conflict_callback(const DICTIONARY_ITEM *item __maybe_unused, void *func __maybe_unused, void *new_func, void *parser_ptr __maybe_unused) {
  580. struct inflight_function *pf = new_func;
  581. error("PLUGINSD_PARSER: duplicate UUID on pending function '%s' detected. Ignoring the second one.", string2str(pf->function));
  582. pf->code = rrd_call_function_error(pf->destination_wb, "This request is already in progress", HTTP_RESP_BAD_REQUEST);
  583. pf->callback(pf->destination_wb, pf->code, pf->callback_data);
  584. string_freez(pf->function);
  585. return false;
  586. }
  587. static void inflight_functions_delete_callback(const DICTIONARY_ITEM *item __maybe_unused, void *func, void *parser_ptr __maybe_unused) {
  588. struct inflight_function *pf = func;
  589. internal_error(LOG_FUNCTIONS,
  590. "FUNCTION '%s' result of transaction '%s' received from collector (%zu bytes, request %llu usec, response %llu usec)",
  591. string2str(pf->function), dictionary_acquired_item_name(item),
  592. buffer_strlen(pf->destination_wb), pf->sent_ut - pf->started_ut, now_realtime_usec() - pf->sent_ut);
  593. pf->callback(pf->destination_wb, pf->code, pf->callback_data);
  594. string_freez(pf->function);
  595. }
  596. void inflight_functions_init(PARSER *parser) {
  597. parser->inflight.functions = dictionary_create_advanced(DICT_OPTION_DONT_OVERWRITE_VALUE, &dictionary_stats_category_functions, 0);
  598. dictionary_register_insert_callback(parser->inflight.functions, inflight_functions_insert_callback, parser);
  599. dictionary_register_delete_callback(parser->inflight.functions, inflight_functions_delete_callback, parser);
  600. dictionary_register_conflict_callback(parser->inflight.functions, inflight_functions_conflict_callback, parser);
  601. }
  602. static void inflight_functions_garbage_collect(PARSER *parser, usec_t now) {
  603. parser->inflight.smaller_timeout = 0;
  604. struct inflight_function *pf;
  605. dfe_start_write(parser->inflight.functions, pf) {
  606. if (pf->timeout_ut < now) {
  607. internal_error(true,
  608. "FUNCTION '%s' removing expired transaction '%s', after %llu usec.",
  609. string2str(pf->function), pf_dfe.name, now - pf->started_ut);
  610. if(!buffer_strlen(pf->destination_wb) || pf->code == HTTP_RESP_OK)
  611. pf->code = rrd_call_function_error(pf->destination_wb,
  612. "Timeout waiting for collector response.",
  613. HTTP_RESP_GATEWAY_TIMEOUT);
  614. dictionary_del(parser->inflight.functions, pf_dfe.name);
  615. }
  616. else if(!parser->inflight.smaller_timeout || pf->timeout_ut < parser->inflight.smaller_timeout)
  617. parser->inflight.smaller_timeout = pf->timeout_ut;
  618. }
  619. dfe_done(pf);
  620. }
  621. // this is the function that is called from
  622. // rrd_call_function_and_wait() and rrd_call_function_async()
  623. static int pluginsd_execute_function_callback(BUFFER *destination_wb, int timeout, const char *function, void *collector_data, void (*callback)(BUFFER *wb, int code, void *callback_data), void *callback_data) {
  624. PARSER *parser = collector_data;
  625. usec_t now = now_realtime_usec();
  626. struct inflight_function tmp = {
  627. .started_ut = now,
  628. .timeout_ut = now + timeout * USEC_PER_SEC,
  629. .destination_wb = destination_wb,
  630. .timeout = timeout,
  631. .function = string_strdupz(function),
  632. .callback = callback,
  633. .callback_data = callback_data,
  634. };
  635. uuid_t uuid;
  636. uuid_generate_time(uuid);
  637. char key[UUID_STR_LEN];
  638. uuid_unparse_lower(uuid, key);
  639. dictionary_write_lock(parser->inflight.functions);
  640. // if there is any error, our dictionary callbacks will call the caller callback to notify
  641. // the caller about the error - no need for error handling here.
  642. dictionary_set(parser->inflight.functions, key, &tmp, sizeof(struct inflight_function));
  643. if(!parser->inflight.smaller_timeout || tmp.timeout_ut < parser->inflight.smaller_timeout)
  644. parser->inflight.smaller_timeout = tmp.timeout_ut;
  645. // garbage collect stale inflight functions
  646. if(parser->inflight.smaller_timeout < now)
  647. inflight_functions_garbage_collect(parser, now);
  648. dictionary_write_unlock(parser->inflight.functions);
  649. return HTTP_RESP_OK;
  650. }
  651. PARSER_RC pluginsd_function(char **words, size_t num_words, void *user)
  652. {
  653. bool global = false;
  654. size_t i = 1;
  655. if(num_words >= 2 && strcmp(get_word(words, num_words, 1), "GLOBAL") == 0) {
  656. i++;
  657. global = true;
  658. }
  659. char *name = get_word(words, num_words, i++);
  660. char *timeout_s = get_word(words, num_words, i++);
  661. char *help = get_word(words, num_words, i++);
  662. RRDHOST *host = pluginsd_require_host_from_parent(user, PLUGINSD_KEYWORD_FUNCTION);
  663. if(!host) return PARSER_RC_ERROR;
  664. RRDSET *st = (global)?NULL:pluginsd_require_chart_from_parent(user, PLUGINSD_KEYWORD_FUNCTION, PLUGINSD_KEYWORD_CHART);
  665. if(!st) global = true;
  666. if (unlikely(!timeout_s || !name || !help || (!global && !st))) {
  667. error("PLUGINSD: 'host:%s/chart:%s' got a FUNCTION, without providing the required data (global = '%s', name = '%s', timeout = '%s', help = '%s'). Ignoring it.",
  668. rrdhost_hostname(host),
  669. st?rrdset_id(st):"(unset)",
  670. global?"yes":"no",
  671. name?name:"(unset)",
  672. timeout_s?timeout_s:"(unset)",
  673. help?help:"(unset)"
  674. );
  675. return PARSER_RC_ERROR;
  676. }
  677. int timeout = PLUGINS_FUNCTIONS_TIMEOUT_DEFAULT;
  678. if (timeout_s && *timeout_s) {
  679. timeout = str2i(timeout_s);
  680. if (unlikely(timeout <= 0))
  681. timeout = PLUGINS_FUNCTIONS_TIMEOUT_DEFAULT;
  682. }
  683. PARSER *parser = ((PARSER_USER_OBJECT *) user)->parser;
  684. rrd_collector_add_function(host, st, name, timeout, help, false, pluginsd_execute_function_callback, parser);
  685. return PARSER_RC_OK;
  686. }
  687. static void pluginsd_function_result_end(struct parser *parser, void *action_data) {
  688. STRING *key = action_data;
  689. if(key)
  690. dictionary_del(parser->inflight.functions, string2str(key));
  691. string_freez(key);
  692. }
  693. PARSER_RC pluginsd_function_result_begin(char **words, size_t num_words, void *user)
  694. {
  695. char *key = get_word(words, num_words, 1);
  696. char *status = get_word(words, num_words, 2);
  697. char *format = get_word(words, num_words, 3);
  698. char *expires = get_word(words, num_words, 4);
  699. if (unlikely(!key || !*key || !status || !*status || !format || !*format || !expires || !*expires)) {
  700. error("got a " PLUGINSD_KEYWORD_FUNCTION_RESULT_BEGIN " without providing the required data (key = '%s', status = '%s', format = '%s', expires = '%s')."
  701. , key ? key : "(unset)"
  702. , status ? status : "(unset)"
  703. , format ? format : "(unset)"
  704. , expires ? expires : "(unset)"
  705. );
  706. }
  707. int code = (status && *status) ? str2i(status) : 0;
  708. if (code <= 0)
  709. code = HTTP_RESP_BACKEND_RESPONSE_INVALID;
  710. time_t expiration = (expires && *expires) ? str2l(expires) : 0;
  711. PARSER *parser = ((PARSER_USER_OBJECT *) user)->parser;
  712. struct inflight_function *pf = NULL;
  713. if(key && *key)
  714. pf = (struct inflight_function *)dictionary_get(parser->inflight.functions, key);
  715. if(!pf) {
  716. error("got a " PLUGINSD_KEYWORD_FUNCTION_RESULT_BEGIN " for transaction '%s', but the transaction is not found.", key?key:"(unset)");
  717. }
  718. else {
  719. if(format && *format)
  720. pf->destination_wb->content_type = functions_format_to_content_type(format);
  721. pf->code = code;
  722. pf->destination_wb->expires = expiration;
  723. if(expiration <= now_realtime_sec())
  724. buffer_no_cacheable(pf->destination_wb);
  725. else
  726. buffer_cacheable(pf->destination_wb);
  727. }
  728. parser->defer.response = (pf) ? pf->destination_wb : NULL;
  729. parser->defer.end_keyword = PLUGINSD_KEYWORD_FUNCTION_RESULT_END;
  730. parser->defer.action = pluginsd_function_result_end;
  731. parser->defer.action_data = string_strdupz(key); // it is ok is key is NULL
  732. parser->flags |= PARSER_DEFER_UNTIL_KEYWORD;
  733. return PARSER_RC_OK;
  734. }
  735. // ----------------------------------------------------------------------------
  736. PARSER_RC pluginsd_variable(char **words, size_t num_words, void *user)
  737. {
  738. char *name = get_word(words, num_words, 1);
  739. char *value = get_word(words, num_words, 2);
  740. NETDATA_DOUBLE v;
  741. RRDHOST *host = pluginsd_require_host_from_parent(user, PLUGINSD_KEYWORD_VARIABLE);
  742. if(!host) return PLUGINSD_DISABLE_PLUGIN(user, NULL, NULL);
  743. RRDSET *st = pluginsd_get_chart_from_parent(user);
  744. int global = (st) ? 0 : 1;
  745. if (name && *name) {
  746. if ((strcmp(name, "GLOBAL") == 0 || strcmp(name, "HOST") == 0)) {
  747. global = 1;
  748. name = get_word(words, num_words, 2);
  749. value = get_word(words, num_words, 3);
  750. } else if ((strcmp(name, "LOCAL") == 0 || strcmp(name, "CHART") == 0)) {
  751. global = 0;
  752. name = get_word(words, num_words, 2);
  753. value = get_word(words, num_words, 3);
  754. }
  755. }
  756. if (unlikely(!name || !*name))
  757. return PLUGINSD_DISABLE_PLUGIN(user, PLUGINSD_KEYWORD_VARIABLE, "missing variable name");
  758. if (unlikely(!value || !*value))
  759. value = NULL;
  760. if (unlikely(!value)) {
  761. error("PLUGINSD: 'host:%s/chart:%s' cannot set %s VARIABLE '%s' to an empty value",
  762. rrdhost_hostname(host),
  763. st ? rrdset_id(st):"UNSET",
  764. (global) ? "HOST" : "CHART",
  765. name);
  766. return PARSER_RC_OK;
  767. }
  768. if (!global && !st)
  769. return PLUGINSD_DISABLE_PLUGIN(user, PLUGINSD_KEYWORD_VARIABLE, "no chart is defined and no GLOBAL is given");
  770. char *endptr = NULL;
  771. v = (NETDATA_DOUBLE) str2ndd_encoded(value, &endptr);
  772. if (unlikely(endptr && *endptr)) {
  773. if (endptr == value)
  774. error("PLUGINSD: 'host:%s/chart:%s' the value '%s' of VARIABLE '%s' cannot be parsed as a number",
  775. rrdhost_hostname(host),
  776. st ? rrdset_id(st):"UNSET",
  777. value,
  778. name);
  779. else
  780. error("PLUGINSD: 'host:%s/chart:%s' the value '%s' of VARIABLE '%s' has leftovers: '%s'",
  781. rrdhost_hostname(host),
  782. st ? rrdset_id(st):"UNSET",
  783. value,
  784. name,
  785. endptr);
  786. }
  787. if (global) {
  788. const RRDVAR_ACQUIRED *rva = rrdvar_custom_host_variable_add_and_acquire(host, name);
  789. if (rva) {
  790. rrdvar_custom_host_variable_set(host, rva, v);
  791. rrdvar_custom_host_variable_release(host, rva);
  792. }
  793. else
  794. error("PLUGINSD: 'host:%s' cannot find/create HOST VARIABLE '%s'",
  795. rrdhost_hostname(host),
  796. name);
  797. } else {
  798. const RRDSETVAR_ACQUIRED *rsa = rrdsetvar_custom_chart_variable_add_and_acquire(st, name);
  799. if (rsa) {
  800. rrdsetvar_custom_chart_variable_set(st, rsa, v);
  801. rrdsetvar_custom_chart_variable_release(st, rsa);
  802. }
  803. else
  804. error("PLUGINSD: 'host:%s/chart:%s' cannot find/create CHART VARIABLE '%s'",
  805. rrdhost_hostname(host), rrdset_id(st), name);
  806. }
  807. return PARSER_RC_OK;
  808. }
  809. PARSER_RC pluginsd_flush(char **words __maybe_unused, size_t num_words __maybe_unused, void *user)
  810. {
  811. debug(D_PLUGINSD, "requested a " PLUGINSD_KEYWORD_FLUSH);
  812. pluginsd_set_chart_from_parent(user, NULL, PLUGINSD_KEYWORD_FLUSH);
  813. ((PARSER_USER_OBJECT *) user)->replay.start_time = 0;
  814. ((PARSER_USER_OBJECT *) user)->replay.end_time = 0;
  815. ((PARSER_USER_OBJECT *) user)->replay.start_time_ut = 0;
  816. ((PARSER_USER_OBJECT *) user)->replay.end_time_ut = 0;
  817. return PARSER_RC_OK;
  818. }
  819. PARSER_RC pluginsd_disable(char **words __maybe_unused, size_t num_words __maybe_unused, void *user __maybe_unused)
  820. {
  821. info("PLUGINSD: plugin called DISABLE. Disabling it.");
  822. ((PARSER_USER_OBJECT *) user)->enabled = 0;
  823. return PARSER_RC_STOP;
  824. }
  825. PARSER_RC pluginsd_label(char **words, size_t num_words, void *user)
  826. {
  827. const char *name = get_word(words, num_words, 1);
  828. const char *label_source = get_word(words, num_words, 2);
  829. const char *value = get_word(words, num_words, 3);
  830. if (!name || !label_source || !value)
  831. return PLUGINSD_DISABLE_PLUGIN(user, PLUGINSD_KEYWORD_LABEL, "missing parameters");
  832. char *store = (char *)value;
  833. bool allocated_store = false;
  834. if(unlikely(num_words > 4)) {
  835. allocated_store = true;
  836. store = mallocz(PLUGINSD_LINE_MAX + 1);
  837. size_t remaining = PLUGINSD_LINE_MAX;
  838. char *move = store;
  839. char *word;
  840. for(size_t i = 3; i < num_words && remaining > 2 && (word = get_word(words, num_words, i)) ;i++) {
  841. if(i > 3) {
  842. *move++ = ' ';
  843. *move = '\0';
  844. remaining--;
  845. }
  846. size_t length = strlen(word);
  847. if (length > remaining)
  848. length = remaining;
  849. remaining -= length;
  850. memcpy(move, word, length);
  851. move += length;
  852. *move = '\0';
  853. }
  854. }
  855. if(unlikely(!((PARSER_USER_OBJECT *) user)->new_host_labels))
  856. ((PARSER_USER_OBJECT *) user)->new_host_labels = rrdlabels_create();
  857. rrdlabels_add(((PARSER_USER_OBJECT *)user)->new_host_labels,
  858. name,
  859. store,
  860. str2l(label_source));
  861. if (allocated_store)
  862. freez(store);
  863. return PARSER_RC_OK;
  864. }
  865. PARSER_RC pluginsd_overwrite(char **words __maybe_unused, size_t num_words __maybe_unused, void *user)
  866. {
  867. RRDHOST *host = pluginsd_require_host_from_parent(user, PLUGINSD_KEYWORD_OVERWRITE);
  868. if(!host) return PLUGINSD_DISABLE_PLUGIN(user, NULL, NULL);
  869. debug(D_PLUGINSD, "requested to OVERWRITE host labels");
  870. if(unlikely(!host->rrdlabels))
  871. host->rrdlabels = rrdlabels_create();
  872. rrdlabels_migrate_to_these(host->rrdlabels, (DICTIONARY *) (((PARSER_USER_OBJECT *)user)->new_host_labels));
  873. rrdhost_flag_set(host, RRDHOST_FLAG_METADATA_LABELS | RRDHOST_FLAG_METADATA_UPDATE);
  874. rrdlabels_destroy(((PARSER_USER_OBJECT *)user)->new_host_labels);
  875. ((PARSER_USER_OBJECT *)user)->new_host_labels = NULL;
  876. return PARSER_RC_OK;
  877. }
  878. PARSER_RC pluginsd_clabel(char **words, size_t num_words, void *user)
  879. {
  880. const char *name = get_word(words, num_words, 1);
  881. const char *value = get_word(words, num_words, 2);
  882. const char *label_source = get_word(words, num_words, 3);
  883. if (!name || !value || !*label_source) {
  884. error("Ignoring malformed or empty CHART LABEL command.");
  885. return PLUGINSD_DISABLE_PLUGIN(user, NULL, NULL);
  886. }
  887. if(unlikely(!((PARSER_USER_OBJECT *) user)->chart_rrdlabels_linked_temporarily)) {
  888. RRDSET *st = pluginsd_get_chart_from_parent(user);
  889. ((PARSER_USER_OBJECT *)user)->chart_rrdlabels_linked_temporarily = st->rrdlabels;
  890. rrdlabels_unmark_all(((PARSER_USER_OBJECT *)user)->chart_rrdlabels_linked_temporarily);
  891. }
  892. rrdlabels_add(((PARSER_USER_OBJECT *)user)->chart_rrdlabels_linked_temporarily,
  893. name, value, str2l(label_source));
  894. return PARSER_RC_OK;
  895. }
  896. PARSER_RC pluginsd_clabel_commit(char **words __maybe_unused, size_t num_words __maybe_unused, void *user)
  897. {
  898. RRDHOST *host = pluginsd_require_host_from_parent(user, PLUGINSD_KEYWORD_CLABEL_COMMIT);
  899. if(!host) return PLUGINSD_DISABLE_PLUGIN(user, NULL, NULL);
  900. RRDSET *st = pluginsd_require_chart_from_parent(user, PLUGINSD_KEYWORD_CLABEL_COMMIT, PLUGINSD_KEYWORD_BEGIN);
  901. if(!st) return PLUGINSD_DISABLE_PLUGIN(user, NULL, NULL);
  902. debug(D_PLUGINSD, "requested to commit chart labels");
  903. if(!((PARSER_USER_OBJECT *)user)->chart_rrdlabels_linked_temporarily) {
  904. error("PLUGINSD: 'host:%s' got CLABEL_COMMIT, without a CHART or BEGIN. Ignoring it.",
  905. rrdhost_hostname(host));
  906. return PLUGINSD_DISABLE_PLUGIN(user, NULL, NULL);
  907. }
  908. rrdlabels_remove_all_unmarked(((PARSER_USER_OBJECT *)user)->chart_rrdlabels_linked_temporarily);
  909. rrdset_flag_set(st, RRDSET_FLAG_METADATA_UPDATE);
  910. rrdhost_flag_set(st->rrdhost, RRDHOST_FLAG_METADATA_UPDATE);
  911. ((PARSER_USER_OBJECT *)user)->chart_rrdlabels_linked_temporarily = NULL;
  912. return PARSER_RC_OK;
  913. }
  914. PARSER_RC pluginsd_replay_begin(char **words, size_t num_words, void *user) {
  915. char *id = get_word(words, num_words, 1);
  916. char *start_time_str = get_word(words, num_words, 2);
  917. char *end_time_str = get_word(words, num_words, 3);
  918. char *child_now_str = get_word(words, num_words, 4);
  919. RRDHOST *host = pluginsd_require_host_from_parent(user, PLUGINSD_KEYWORD_REPLAY_BEGIN);
  920. if(!host) return PLUGINSD_DISABLE_PLUGIN(user, NULL, NULL);
  921. RRDSET *st;
  922. if (likely(!id || !*id))
  923. st = pluginsd_require_chart_from_parent(user, PLUGINSD_KEYWORD_REPLAY_BEGIN, PLUGINSD_KEYWORD_REPLAY_BEGIN);
  924. else
  925. st = pluginsd_find_chart(host, id, PLUGINSD_KEYWORD_REPLAY_BEGIN);
  926. if(!st) return PLUGINSD_DISABLE_PLUGIN(user, NULL, NULL);
  927. pluginsd_set_chart_from_parent(user, st, PLUGINSD_KEYWORD_REPLAY_BEGIN);
  928. if(start_time_str && end_time_str) {
  929. time_t start_time = (time_t) str2ull_encoded(start_time_str);
  930. time_t end_time = (time_t) str2ull_encoded(end_time_str);
  931. time_t wall_clock_time = 0, tolerance;
  932. bool wall_clock_comes_from_child; (void)wall_clock_comes_from_child;
  933. if(child_now_str) {
  934. wall_clock_time = (time_t) str2ull_encoded(child_now_str);
  935. tolerance = st->update_every + 1;
  936. wall_clock_comes_from_child = true;
  937. }
  938. if(wall_clock_time <= 0) {
  939. wall_clock_time = now_realtime_sec();
  940. tolerance = st->update_every + 5;
  941. wall_clock_comes_from_child = false;
  942. }
  943. #ifdef NETDATA_LOG_REPLICATION_REQUESTS
  944. internal_error(
  945. (!st->replay.start_streaming && (end_time < st->replay.after || start_time > st->replay.before)),
  946. "REPLAY ERROR: 'host:%s/chart:%s' got a " PLUGINSD_KEYWORD_REPLAY_BEGIN " from %ld to %ld, which does not match our request (%ld to %ld).",
  947. rrdhost_hostname(st->rrdhost), rrdset_id(st), start_time, end_time, st->replay.after, st->replay.before);
  948. internal_error(
  949. true,
  950. "REPLAY: 'host:%s/chart:%s' got a " PLUGINSD_KEYWORD_REPLAY_BEGIN " from %ld to %ld, child wall clock is %ld (%s), had requested %ld to %ld",
  951. rrdhost_hostname(st->rrdhost), rrdset_id(st),
  952. start_time, end_time, wall_clock_time, wall_clock_comes_from_child ? "from child" : "parent time",
  953. st->replay.after, st->replay.before);
  954. #endif
  955. if(start_time && end_time && start_time < wall_clock_time + tolerance && end_time < wall_clock_time + tolerance && start_time < end_time) {
  956. if (unlikely(end_time - start_time != st->update_every))
  957. rrdset_set_update_every_s(st, end_time - start_time);
  958. st->last_collected_time.tv_sec = end_time;
  959. st->last_collected_time.tv_usec = 0;
  960. st->last_updated.tv_sec = end_time;
  961. st->last_updated.tv_usec = 0;
  962. st->counter++;
  963. st->counter_done++;
  964. // these are only needed for db mode RAM, SAVE, MAP, ALLOC
  965. st->current_entry++;
  966. if(st->current_entry >= st->entries)
  967. st->current_entry -= st->entries;
  968. ((PARSER_USER_OBJECT *) user)->replay.start_time = start_time;
  969. ((PARSER_USER_OBJECT *) user)->replay.end_time = end_time;
  970. ((PARSER_USER_OBJECT *) user)->replay.start_time_ut = (usec_t) start_time * USEC_PER_SEC;
  971. ((PARSER_USER_OBJECT *) user)->replay.end_time_ut = (usec_t) end_time * USEC_PER_SEC;
  972. ((PARSER_USER_OBJECT *) user)->replay.wall_clock_time = wall_clock_time;
  973. ((PARSER_USER_OBJECT *) user)->replay.rset_enabled = true;
  974. return PARSER_RC_OK;
  975. }
  976. error("PLUGINSD REPLAY ERROR: 'host:%s/chart:%s' got a " PLUGINSD_KEYWORD_REPLAY_BEGIN
  977. " from %ld to %ld, but timestamps are invalid "
  978. "(now is %ld [%s], tolerance %ld). Ignoring " PLUGINSD_KEYWORD_REPLAY_SET,
  979. rrdhost_hostname(st->rrdhost), rrdset_id(st), start_time, end_time,
  980. wall_clock_time, wall_clock_comes_from_child ? "child wall clock" : "parent wall clock", tolerance);
  981. }
  982. // the child sends an RBEGIN without any parameters initially
  983. // setting rset_enabled to false, means the RSET should not store any metrics
  984. // to store metrics, the RBEGIN needs to have timestamps
  985. ((PARSER_USER_OBJECT *) user)->replay.start_time = 0;
  986. ((PARSER_USER_OBJECT *) user)->replay.end_time = 0;
  987. ((PARSER_USER_OBJECT *) user)->replay.start_time_ut = 0;
  988. ((PARSER_USER_OBJECT *) user)->replay.end_time_ut = 0;
  989. ((PARSER_USER_OBJECT *) user)->replay.wall_clock_time = 0;
  990. ((PARSER_USER_OBJECT *) user)->replay.rset_enabled = false;
  991. return PARSER_RC_OK;
  992. }
  993. static inline SN_FLAGS pluginsd_parse_storage_number_flags(const char *flags_str) {
  994. SN_FLAGS flags = SN_FLAG_NONE;
  995. char c;
  996. while ((c = *flags_str++)) {
  997. switch (c) {
  998. case 'A':
  999. flags |= SN_FLAG_NOT_ANOMALOUS;
  1000. break;
  1001. case 'R':
  1002. flags |= SN_FLAG_RESET;
  1003. break;
  1004. case 'E':
  1005. flags = SN_EMPTY_SLOT;
  1006. return flags;
  1007. default:
  1008. internal_error(true, "Unknown SN_FLAGS flag '%c'", c);
  1009. break;
  1010. }
  1011. }
  1012. return flags;
  1013. }
  1014. PARSER_RC pluginsd_replay_set(char **words, size_t num_words, void *user)
  1015. {
  1016. char *dimension = get_word(words, num_words, 1);
  1017. char *value_str = get_word(words, num_words, 2);
  1018. char *flags_str = get_word(words, num_words, 3);
  1019. RRDHOST *host = pluginsd_require_host_from_parent(user, PLUGINSD_KEYWORD_REPLAY_SET);
  1020. if(!host) return PLUGINSD_DISABLE_PLUGIN(user, NULL, NULL);
  1021. RRDSET *st = pluginsd_require_chart_from_parent(user, PLUGINSD_KEYWORD_REPLAY_SET, PLUGINSD_KEYWORD_REPLAY_BEGIN);
  1022. if(!st) return PLUGINSD_DISABLE_PLUGIN(user, NULL, NULL);
  1023. PARSER_USER_OBJECT *u = user;
  1024. if(!u->replay.rset_enabled) {
  1025. error_limit_static_thread_var(erl, 1, 0);
  1026. error_limit(&erl, "PLUGINSD: 'host:%s/chart:%s' got a %s but it is disabled by %s errors",
  1027. rrdhost_hostname(host), rrdset_id(st), PLUGINSD_KEYWORD_REPLAY_SET, PLUGINSD_KEYWORD_REPLAY_BEGIN);
  1028. // we have to return OK here
  1029. return PARSER_RC_OK;
  1030. }
  1031. RRDDIM *rd = pluginsd_acquire_dimension(host, st, dimension, PLUGINSD_KEYWORD_REPLAY_SET);
  1032. if(!rd) return PLUGINSD_DISABLE_PLUGIN(user, NULL, NULL);
  1033. if (unlikely(!u->replay.start_time || !u->replay.end_time)) {
  1034. error("PLUGINSD: 'host:%s/chart:%s/dim:%s' got a %s with invalid timestamps %ld to %ld from a %s. Disabling it.",
  1035. rrdhost_hostname(host),
  1036. rrdset_id(st),
  1037. dimension,
  1038. PLUGINSD_KEYWORD_REPLAY_SET,
  1039. u->replay.start_time,
  1040. u->replay.end_time,
  1041. PLUGINSD_KEYWORD_REPLAY_BEGIN);
  1042. return PLUGINSD_DISABLE_PLUGIN(user, NULL, NULL);
  1043. }
  1044. if (unlikely(!value_str || !*value_str))
  1045. value_str = "NAN";
  1046. if(unlikely(!flags_str))
  1047. flags_str = "";
  1048. if (likely(value_str)) {
  1049. RRDDIM_FLAGS rd_flags = rrddim_flag_check(rd, RRDDIM_FLAG_OBSOLETE | RRDDIM_FLAG_ARCHIVED);
  1050. if(!(rd_flags & RRDDIM_FLAG_ARCHIVED)) {
  1051. NETDATA_DOUBLE value = str2ndd_encoded(value_str, NULL);
  1052. SN_FLAGS flags = pluginsd_parse_storage_number_flags(flags_str);
  1053. if (!netdata_double_isnumber(value) || (flags == SN_EMPTY_SLOT)) {
  1054. value = NAN;
  1055. flags = SN_EMPTY_SLOT;
  1056. }
  1057. rrddim_store_metric(rd, u->replay.end_time_ut, value, flags);
  1058. rd->last_collected_time.tv_sec = u->replay.end_time;
  1059. rd->last_collected_time.tv_usec = 0;
  1060. rd->collections_counter++;
  1061. }
  1062. else {
  1063. error_limit_static_global_var(erl, 1, 0);
  1064. error_limit(&erl, "PLUGINSD: 'host:%s/chart:%s/dim:%s' has the ARCHIVED flag set, but it is replicated. Ignoring data.",
  1065. rrdhost_hostname(st->rrdhost), rrdset_id(st), rrddim_name(rd));
  1066. }
  1067. }
  1068. return PARSER_RC_OK;
  1069. }
  1070. PARSER_RC pluginsd_replay_rrddim_collection_state(char **words, size_t num_words, void *user)
  1071. {
  1072. if(((PARSER_USER_OBJECT *) user)->replay.rset_enabled == false)
  1073. return PARSER_RC_OK;
  1074. char *dimension = get_word(words, num_words, 1);
  1075. char *last_collected_ut_str = get_word(words, num_words, 2);
  1076. char *last_collected_value_str = get_word(words, num_words, 3);
  1077. char *last_calculated_value_str = get_word(words, num_words, 4);
  1078. char *last_stored_value_str = get_word(words, num_words, 5);
  1079. RRDHOST *host = pluginsd_require_host_from_parent(user, PLUGINSD_KEYWORD_REPLAY_RRDDIM_STATE);
  1080. if(!host) return PLUGINSD_DISABLE_PLUGIN(user, NULL, NULL);
  1081. RRDSET *st = pluginsd_require_chart_from_parent(user, PLUGINSD_KEYWORD_REPLAY_RRDDIM_STATE, PLUGINSD_KEYWORD_REPLAY_BEGIN);
  1082. if(!st) return PLUGINSD_DISABLE_PLUGIN(user, NULL, NULL);
  1083. RRDDIM *rd = pluginsd_acquire_dimension(host, st, dimension, PLUGINSD_KEYWORD_REPLAY_RRDDIM_STATE);
  1084. if(!rd) return PLUGINSD_DISABLE_PLUGIN(user, NULL, NULL);
  1085. usec_t dim_last_collected_ut = (usec_t)rd->last_collected_time.tv_sec * USEC_PER_SEC + (usec_t)rd->last_collected_time.tv_usec;
  1086. usec_t last_collected_ut = last_collected_ut_str ? str2ull_encoded(last_collected_ut_str) : 0;
  1087. if(last_collected_ut > dim_last_collected_ut) {
  1088. rd->last_collected_time.tv_sec = (time_t)(last_collected_ut / USEC_PER_SEC);
  1089. rd->last_collected_time.tv_usec = (last_collected_ut % USEC_PER_SEC);
  1090. }
  1091. rd->last_collected_value = last_collected_value_str ? str2ll_encoded(last_collected_value_str) : 0;
  1092. rd->last_calculated_value = last_calculated_value_str ? str2ndd_encoded(last_calculated_value_str, NULL) : 0;
  1093. rd->last_stored_value = last_stored_value_str ? str2ndd_encoded(last_stored_value_str, NULL) : 0.0;
  1094. return PARSER_RC_OK;
  1095. }
  1096. PARSER_RC pluginsd_replay_rrdset_collection_state(char **words, size_t num_words, void *user)
  1097. {
  1098. if(((PARSER_USER_OBJECT *) user)->replay.rset_enabled == false)
  1099. return PARSER_RC_OK;
  1100. char *last_collected_ut_str = get_word(words, num_words, 1);
  1101. char *last_updated_ut_str = get_word(words, num_words, 2);
  1102. RRDHOST *host = pluginsd_require_host_from_parent(user, PLUGINSD_KEYWORD_REPLAY_RRDSET_STATE);
  1103. if(!host) return PLUGINSD_DISABLE_PLUGIN(user, NULL, NULL);
  1104. RRDSET *st = pluginsd_require_chart_from_parent(user, PLUGINSD_KEYWORD_REPLAY_RRDSET_STATE, PLUGINSD_KEYWORD_REPLAY_BEGIN);
  1105. if(!st) return PLUGINSD_DISABLE_PLUGIN(user, NULL, NULL);
  1106. usec_t chart_last_collected_ut = (usec_t)st->last_collected_time.tv_sec * USEC_PER_SEC + (usec_t)st->last_collected_time.tv_usec;
  1107. usec_t last_collected_ut = last_collected_ut_str ? str2ull_encoded(last_collected_ut_str) : 0;
  1108. if(last_collected_ut > chart_last_collected_ut) {
  1109. st->last_collected_time.tv_sec = (time_t)(last_collected_ut / USEC_PER_SEC);
  1110. st->last_collected_time.tv_usec = (last_collected_ut % USEC_PER_SEC);
  1111. }
  1112. usec_t chart_last_updated_ut = (usec_t)st->last_updated.tv_sec * USEC_PER_SEC + (usec_t)st->last_updated.tv_usec;
  1113. usec_t last_updated_ut = last_updated_ut_str ? str2ull_encoded(last_updated_ut_str) : 0;
  1114. if(last_updated_ut > chart_last_updated_ut) {
  1115. st->last_updated.tv_sec = (time_t)(last_updated_ut / USEC_PER_SEC);
  1116. st->last_updated.tv_usec = (last_updated_ut % USEC_PER_SEC);
  1117. }
  1118. st->counter++;
  1119. st->counter_done++;
  1120. return PARSER_RC_OK;
  1121. }
  1122. PARSER_RC pluginsd_replay_end(char **words, size_t num_words, void *user)
  1123. {
  1124. if (num_words < 7) { // accepts 7, but the 7th is optional
  1125. error("REPLAY: malformed " PLUGINSD_KEYWORD_REPLAY_END " command");
  1126. return PARSER_RC_ERROR;
  1127. }
  1128. const char *update_every_child_txt = get_word(words, num_words, 1);
  1129. const char *first_entry_child_txt = get_word(words, num_words, 2);
  1130. const char *last_entry_child_txt = get_word(words, num_words, 3);
  1131. const char *start_streaming_txt = get_word(words, num_words, 4);
  1132. const char *first_entry_requested_txt = get_word(words, num_words, 5);
  1133. const char *last_entry_requested_txt = get_word(words, num_words, 6);
  1134. const char *child_world_time_txt = get_word(words, num_words, 7); // optional
  1135. time_t update_every_child = (time_t) str2ull_encoded(update_every_child_txt);
  1136. time_t first_entry_child = (time_t) str2ull_encoded(first_entry_child_txt);
  1137. time_t last_entry_child = (time_t) str2ull_encoded(last_entry_child_txt);
  1138. bool start_streaming = (strcmp(start_streaming_txt, "true") == 0);
  1139. time_t first_entry_requested = (time_t) str2ull_encoded(first_entry_requested_txt);
  1140. time_t last_entry_requested = (time_t) str2ull_encoded(last_entry_requested_txt);
  1141. // the optional child world time
  1142. time_t child_world_time = (child_world_time_txt && *child_world_time_txt) ? (time_t) str2ull_encoded(
  1143. child_world_time_txt) : now_realtime_sec();
  1144. PARSER_USER_OBJECT *user_object = user;
  1145. RRDHOST *host = pluginsd_require_host_from_parent(user, PLUGINSD_KEYWORD_REPLAY_END);
  1146. if(!host) return PLUGINSD_DISABLE_PLUGIN(user, NULL, NULL);
  1147. RRDSET *st = pluginsd_require_chart_from_parent(user, PLUGINSD_KEYWORD_REPLAY_END, PLUGINSD_KEYWORD_REPLAY_BEGIN);
  1148. if(!st) return PLUGINSD_DISABLE_PLUGIN(user, NULL, NULL);
  1149. #ifdef NETDATA_LOG_REPLICATION_REQUESTS
  1150. internal_error(true,
  1151. "PLUGINSD REPLAY: 'host:%s/chart:%s': got a " PLUGINSD_KEYWORD_REPLAY_END " child db from %llu to %llu, start_streaming %s, had requested from %llu to %llu, wall clock %llu",
  1152. rrdhost_hostname(host), rrdset_id(st),
  1153. (unsigned long long)first_entry_child, (unsigned long long)last_entry_child,
  1154. start_streaming?"true":"false",
  1155. (unsigned long long)first_entry_requested, (unsigned long long)last_entry_requested,
  1156. (unsigned long long)child_world_time
  1157. );
  1158. #endif
  1159. ((PARSER_USER_OBJECT *) user)->data_collections_count++;
  1160. if(((PARSER_USER_OBJECT *) user)->replay.rset_enabled && st->rrdhost->receiver) {
  1161. time_t now = now_realtime_sec();
  1162. time_t started = st->rrdhost->receiver->replication_first_time_t;
  1163. time_t current = ((PARSER_USER_OBJECT *) user)->replay.end_time;
  1164. if(started && current > started) {
  1165. host->rrdpush_receiver_replication_percent = (NETDATA_DOUBLE) (current - started) * 100.0 / (NETDATA_DOUBLE) (now - started);
  1166. worker_set_metric(WORKER_RECEIVER_JOB_REPLICATION_COMPLETION,
  1167. host->rrdpush_receiver_replication_percent);
  1168. }
  1169. }
  1170. ((PARSER_USER_OBJECT *) user)->replay.start_time = 0;
  1171. ((PARSER_USER_OBJECT *) user)->replay.end_time = 0;
  1172. ((PARSER_USER_OBJECT *) user)->replay.start_time_ut = 0;
  1173. ((PARSER_USER_OBJECT *) user)->replay.end_time_ut = 0;
  1174. ((PARSER_USER_OBJECT *) user)->replay.wall_clock_time = 0;
  1175. ((PARSER_USER_OBJECT *) user)->replay.rset_enabled = false;
  1176. st->counter++;
  1177. st->counter_done++;
  1178. store_metric_collection_completed();
  1179. #ifdef NETDATA_LOG_REPLICATION_REQUESTS
  1180. st->replay.start_streaming = false;
  1181. st->replay.after = 0;
  1182. st->replay.before = 0;
  1183. if(start_streaming)
  1184. st->replay.log_next_data_collection = true;
  1185. #endif
  1186. if (start_streaming) {
  1187. if (st->update_every != update_every_child)
  1188. rrdset_set_update_every_s(st, update_every_child);
  1189. if(rrdset_flag_check(st, RRDSET_FLAG_RECEIVER_REPLICATION_IN_PROGRESS)) {
  1190. rrdset_flag_set(st, RRDSET_FLAG_RECEIVER_REPLICATION_FINISHED);
  1191. rrdset_flag_clear(st, RRDSET_FLAG_RECEIVER_REPLICATION_IN_PROGRESS);
  1192. rrdset_flag_clear(st, RRDSET_FLAG_SYNC_CLOCK);
  1193. rrdhost_receiver_replicating_charts_minus_one(st->rrdhost);
  1194. }
  1195. #ifdef NETDATA_LOG_REPLICATION_REQUESTS
  1196. else
  1197. internal_error(true, "REPLAY ERROR: 'host:%s/chart:%s' got a " PLUGINSD_KEYWORD_REPLAY_END " with enable_streaming = true, but there is no replication in progress for this chart.",
  1198. rrdhost_hostname(host), rrdset_id(st));
  1199. #endif
  1200. pluginsd_set_chart_from_parent(user, NULL, PLUGINSD_KEYWORD_REPLAY_END);
  1201. host->rrdpush_receiver_replication_percent = 100.0;
  1202. worker_set_metric(WORKER_RECEIVER_JOB_REPLICATION_COMPLETION, host->rrdpush_receiver_replication_percent);
  1203. return PARSER_RC_OK;
  1204. }
  1205. pluginsd_set_chart_from_parent(user, NULL, PLUGINSD_KEYWORD_REPLAY_END);
  1206. rrdcontext_updated_retention_rrdset(st);
  1207. bool ok = replicate_chart_request(send_to_plugin, user_object->parser, host, st,
  1208. first_entry_child, last_entry_child, child_world_time,
  1209. first_entry_requested, last_entry_requested);
  1210. return ok ? PARSER_RC_OK : PARSER_RC_ERROR;
  1211. }
  1212. PARSER_RC pluginsd_begin_v2(char **words, size_t num_words, void *user) {
  1213. timing_init();
  1214. char *id = get_word(words, num_words, 1);
  1215. char *update_every_str = get_word(words, num_words, 2);
  1216. char *end_time_str = get_word(words, num_words, 3);
  1217. char *wall_clock_time_str = get_word(words, num_words, 4);
  1218. if(unlikely(!id || !update_every_str || !end_time_str || !wall_clock_time_str))
  1219. return PLUGINSD_DISABLE_PLUGIN(user, PLUGINSD_KEYWORD_BEGIN_V2, "missing parameters");
  1220. RRDHOST *host = pluginsd_require_host_from_parent(user, PLUGINSD_KEYWORD_BEGIN_V2);
  1221. if(unlikely(!host)) return PLUGINSD_DISABLE_PLUGIN(user, NULL, NULL);
  1222. timing_step(TIMING_STEP_BEGIN2_PREPARE);
  1223. RRDSET *st = pluginsd_find_chart(host, id, PLUGINSD_KEYWORD_BEGIN_V2);
  1224. if(unlikely(!st)) return PLUGINSD_DISABLE_PLUGIN(user, NULL, NULL);
  1225. pluginsd_set_chart_from_parent(user, st, PLUGINSD_KEYWORD_BEGIN_V2);
  1226. if(unlikely(rrdset_flag_check(st, RRDSET_FLAG_OBSOLETE | RRDSET_FLAG_ARCHIVED)))
  1227. rrdset_isnot_obsolete(st);
  1228. timing_step(TIMING_STEP_BEGIN2_FIND_CHART);
  1229. // ------------------------------------------------------------------------
  1230. // parse the parameters
  1231. time_t update_every = (time_t) str2ull_encoded(update_every_str);
  1232. time_t end_time = (time_t) str2ull_encoded(end_time_str);
  1233. time_t wall_clock_time;
  1234. if(likely(*wall_clock_time_str == '#'))
  1235. wall_clock_time = end_time;
  1236. else
  1237. wall_clock_time = (time_t) str2ull_encoded(wall_clock_time_str);
  1238. if (unlikely(update_every != st->update_every))
  1239. rrdset_set_update_every_s(st, update_every);
  1240. timing_step(TIMING_STEP_BEGIN2_PARSE);
  1241. // ------------------------------------------------------------------------
  1242. // prepare our state
  1243. pluginsd_lock_rrdset_data_collection(user);
  1244. PARSER_USER_OBJECT *u = (PARSER_USER_OBJECT *) user;
  1245. u->v2.update_every = update_every;
  1246. u->v2.end_time = end_time;
  1247. u->v2.wall_clock_time = wall_clock_time;
  1248. u->v2.ml_locked = ml_chart_update_begin(st);
  1249. timing_step(TIMING_STEP_BEGIN2_ML);
  1250. // ------------------------------------------------------------------------
  1251. // propagate it forward in v2
  1252. if(!u->v2.stream_buffer.wb && rrdhost_has_rrdpush_sender_enabled(st->rrdhost))
  1253. u->v2.stream_buffer = rrdset_push_metric_initialize(u->st, wall_clock_time);
  1254. if(u->v2.stream_buffer.v2 && u->v2.stream_buffer.wb) {
  1255. // check if receiver and sender have the same number parsing capabilities
  1256. bool can_copy = stream_has_capability(u, STREAM_CAP_IEEE754) == stream_has_capability(&u->v2.stream_buffer, STREAM_CAP_IEEE754);
  1257. NUMBER_ENCODING encoding = stream_has_capability(&u->v2.stream_buffer, STREAM_CAP_IEEE754) ? NUMBER_ENCODING_BASE64 : NUMBER_ENCODING_HEX;
  1258. BUFFER *wb = u->v2.stream_buffer.wb;
  1259. buffer_need_bytes(wb, 1024);
  1260. if(unlikely(u->v2.stream_buffer.begin_v2_added))
  1261. buffer_fast_strcat(wb, PLUGINSD_KEYWORD_END_V2 "\n", sizeof(PLUGINSD_KEYWORD_END_V2) - 1 + 1);
  1262. buffer_fast_strcat(wb, PLUGINSD_KEYWORD_BEGIN_V2 " '", sizeof(PLUGINSD_KEYWORD_BEGIN_V2) - 1 + 2);
  1263. buffer_fast_strcat(wb, rrdset_id(st), string_strlen(st->id));
  1264. buffer_fast_strcat(wb, "' ", 2);
  1265. if(can_copy)
  1266. buffer_strcat(wb, update_every_str);
  1267. else
  1268. buffer_print_uint64_encoded(wb, encoding, update_every);
  1269. buffer_fast_strcat(wb, " ", 1);
  1270. if(can_copy)
  1271. buffer_strcat(wb, end_time_str);
  1272. else
  1273. buffer_print_uint64_encoded(wb, encoding, end_time);
  1274. buffer_fast_strcat(wb, " ", 1);
  1275. if(can_copy)
  1276. buffer_strcat(wb, wall_clock_time_str);
  1277. else
  1278. buffer_print_uint64_encoded(wb, encoding, wall_clock_time);
  1279. buffer_fast_strcat(wb, "\n", 1);
  1280. u->v2.stream_buffer.last_point_end_time_s = end_time;
  1281. u->v2.stream_buffer.begin_v2_added = true;
  1282. }
  1283. timing_step(TIMING_STEP_BEGIN2_PROPAGATE);
  1284. // ------------------------------------------------------------------------
  1285. // store it
  1286. st->last_collected_time.tv_sec = end_time;
  1287. st->last_collected_time.tv_usec = 0;
  1288. st->last_updated.tv_sec = end_time;
  1289. st->last_updated.tv_usec = 0;
  1290. st->counter++;
  1291. st->counter_done++;
  1292. // these are only needed for db mode RAM, SAVE, MAP, ALLOC
  1293. st->current_entry++;
  1294. if(st->current_entry >= st->entries)
  1295. st->current_entry -= st->entries;
  1296. timing_step(TIMING_STEP_BEGIN2_STORE);
  1297. return PARSER_RC_OK;
  1298. }
  1299. PARSER_RC pluginsd_set_v2(char **words, size_t num_words, void *user) {
  1300. timing_init();
  1301. char *dimension = get_word(words, num_words, 1);
  1302. char *collected_str = get_word(words, num_words, 2);
  1303. char *value_str = get_word(words, num_words, 3);
  1304. char *flags_str = get_word(words, num_words, 4);
  1305. if(unlikely(!dimension || !collected_str || !value_str || !flags_str))
  1306. return PLUGINSD_DISABLE_PLUGIN(user, PLUGINSD_KEYWORD_SET_V2, "missing parameters");
  1307. PARSER_USER_OBJECT *u = (PARSER_USER_OBJECT *) user;
  1308. RRDHOST *host = pluginsd_require_host_from_parent(user, PLUGINSD_KEYWORD_SET_V2);
  1309. if(unlikely(!host)) return PLUGINSD_DISABLE_PLUGIN(user, NULL, NULL);
  1310. RRDSET *st = pluginsd_require_chart_from_parent(user, PLUGINSD_KEYWORD_SET_V2, PLUGINSD_KEYWORD_BEGIN_V2);
  1311. if(unlikely(!st)) return PLUGINSD_DISABLE_PLUGIN(user, NULL, NULL);
  1312. timing_step(TIMING_STEP_SET2_PREPARE);
  1313. RRDDIM *rd = pluginsd_acquire_dimension(host, st, dimension, PLUGINSD_KEYWORD_SET_V2);
  1314. if(unlikely(!rd)) return PLUGINSD_DISABLE_PLUGIN(user, NULL, NULL);
  1315. if(unlikely(rrddim_flag_check(rd, RRDDIM_FLAG_OBSOLETE | RRDDIM_FLAG_ARCHIVED)))
  1316. rrddim_isnot_obsolete(st, rd);
  1317. timing_step(TIMING_STEP_SET2_LOOKUP_DIMENSION);
  1318. // ------------------------------------------------------------------------
  1319. // parse the parameters
  1320. collected_number collected_value = (collected_number) str2ll_encoded(collected_str);
  1321. NETDATA_DOUBLE value;
  1322. if(*value_str == '#')
  1323. value = (NETDATA_DOUBLE)collected_value;
  1324. else
  1325. value = str2ndd_encoded(value_str, NULL);
  1326. SN_FLAGS flags = pluginsd_parse_storage_number_flags(flags_str);
  1327. timing_step(TIMING_STEP_SET2_PARSE);
  1328. // ------------------------------------------------------------------------
  1329. // check value and ML
  1330. if (unlikely(!netdata_double_isnumber(value) || (flags == SN_EMPTY_SLOT))) {
  1331. value = NAN;
  1332. flags = SN_EMPTY_SLOT;
  1333. if(u->v2.ml_locked)
  1334. ml_dimension_is_anomalous(rd, u->v2.end_time, 0, false);
  1335. }
  1336. else if(u->v2.ml_locked) {
  1337. if (ml_dimension_is_anomalous(rd, u->v2.end_time, value, true)) {
  1338. // clear anomaly bit: 0 -> is anomalous, 1 -> not anomalous
  1339. flags &= ~((storage_number) SN_FLAG_NOT_ANOMALOUS);
  1340. }
  1341. else
  1342. flags |= SN_FLAG_NOT_ANOMALOUS;
  1343. }
  1344. timing_step(TIMING_STEP_SET2_ML);
  1345. // ------------------------------------------------------------------------
  1346. // propagate it forward in v2
  1347. if(u->v2.stream_buffer.v2 && u->v2.stream_buffer.begin_v2_added && u->v2.stream_buffer.wb) {
  1348. // check if receiver and sender have the same number parsing capabilities
  1349. bool can_copy = stream_has_capability(u, STREAM_CAP_IEEE754) == stream_has_capability(&u->v2.stream_buffer, STREAM_CAP_IEEE754);
  1350. NUMBER_ENCODING integer_encoding = stream_has_capability(&u->v2.stream_buffer, STREAM_CAP_IEEE754) ? NUMBER_ENCODING_BASE64 : NUMBER_ENCODING_HEX;
  1351. NUMBER_ENCODING doubles_encoding = stream_has_capability(&u->v2.stream_buffer, STREAM_CAP_IEEE754) ? NUMBER_ENCODING_BASE64 : NUMBER_ENCODING_DECIMAL;
  1352. BUFFER *wb = u->v2.stream_buffer.wb;
  1353. buffer_need_bytes(wb, 1024);
  1354. buffer_fast_strcat(wb, PLUGINSD_KEYWORD_SET_V2 " '", sizeof(PLUGINSD_KEYWORD_SET_V2) - 1 + 2);
  1355. buffer_fast_strcat(wb, rrddim_id(rd), string_strlen(rd->id));
  1356. buffer_fast_strcat(wb, "' ", 2);
  1357. if(can_copy)
  1358. buffer_strcat(wb, collected_str);
  1359. else
  1360. buffer_print_int64_encoded(wb, integer_encoding, collected_value); // original v2 had hex
  1361. buffer_fast_strcat(wb, " ", 1);
  1362. if(can_copy)
  1363. buffer_strcat(wb, value_str);
  1364. else
  1365. buffer_print_netdata_double_encoded(wb, doubles_encoding, value); // original v2 had decimal
  1366. buffer_fast_strcat(wb, " ", 1);
  1367. buffer_print_sn_flags(wb, flags, true);
  1368. buffer_fast_strcat(wb, "\n", 1);
  1369. }
  1370. timing_step(TIMING_STEP_SET2_PROPAGATE);
  1371. // ------------------------------------------------------------------------
  1372. // store it
  1373. rrddim_store_metric(rd, u->v2.end_time * USEC_PER_SEC, value, flags);
  1374. rd->last_collected_time.tv_sec = u->v2.end_time;
  1375. rd->last_collected_time.tv_usec = 0;
  1376. rd->last_collected_value = collected_value;
  1377. rd->last_stored_value = value;
  1378. rd->last_calculated_value = value;
  1379. rd->collections_counter++;
  1380. rd->updated = true;
  1381. timing_step(TIMING_STEP_SET2_STORE);
  1382. return PARSER_RC_OK;
  1383. }
  1384. void pluginsd_cleanup_v2(void *user) {
  1385. // this is called when the thread is stopped while processing
  1386. pluginsd_set_chart_from_parent(user, NULL, "THREAD CLEANUP");
  1387. }
  1388. PARSER_RC pluginsd_end_v2(char **words __maybe_unused, size_t num_words __maybe_unused, void *user) {
  1389. timing_init();
  1390. RRDHOST *host = pluginsd_require_host_from_parent(user, PLUGINSD_KEYWORD_END_V2);
  1391. if(unlikely(!host)) return PLUGINSD_DISABLE_PLUGIN(user, NULL, NULL);
  1392. RRDSET *st = pluginsd_require_chart_from_parent(user, PLUGINSD_KEYWORD_END_V2, PLUGINSD_KEYWORD_BEGIN_V2);
  1393. if(unlikely(!st)) return PLUGINSD_DISABLE_PLUGIN(user, NULL, NULL);
  1394. PARSER_USER_OBJECT *u = (PARSER_USER_OBJECT *) user;
  1395. u->data_collections_count++;
  1396. timing_step(TIMING_STEP_END2_PREPARE);
  1397. // ------------------------------------------------------------------------
  1398. // propagate the whole chart update in v1
  1399. if(unlikely(!u->v2.stream_buffer.v2 && !u->v2.stream_buffer.begin_v2_added && u->v2.stream_buffer.wb))
  1400. rrdset_push_metrics_v1(&u->v2.stream_buffer, st);
  1401. timing_step(TIMING_STEP_END2_PUSH_V1);
  1402. // ------------------------------------------------------------------------
  1403. // unblock data collection
  1404. pluginsd_unlock_previous_chart(user, PLUGINSD_KEYWORD_END_V2, false);
  1405. rrdcontext_collected_rrdset(st);
  1406. store_metric_collection_completed();
  1407. timing_step(TIMING_STEP_END2_RRDSET);
  1408. // ------------------------------------------------------------------------
  1409. // propagate it forward
  1410. rrdset_push_metrics_finished(&u->v2.stream_buffer, st);
  1411. timing_step(TIMING_STEP_END2_PROPAGATE);
  1412. // ------------------------------------------------------------------------
  1413. // cleanup RRDSET / RRDDIM
  1414. RRDDIM *rd;
  1415. rrddim_foreach_read(rd, st) {
  1416. rd->calculated_value = 0;
  1417. rd->collected_value = 0;
  1418. rd->updated = false;
  1419. }
  1420. rrddim_foreach_done(rd);
  1421. // ------------------------------------------------------------------------
  1422. // reset state
  1423. u->v2 = (struct parser_user_object_v2){ 0 };
  1424. timing_step(TIMING_STEP_END2_STORE);
  1425. timing_report();
  1426. return PARSER_RC_OK;
  1427. }
  1428. void pluginsd_process_thread_cleanup(void *ptr) {
  1429. PARSER *parser = (PARSER *)ptr;
  1430. pluginsd_cleanup_v2(parser->user);
  1431. pluginsd_host_define_cleanup(parser->user);
  1432. rrd_collector_finished();
  1433. parser_destroy(parser);
  1434. }
  1435. // New plugins.d parser
  1436. inline size_t pluginsd_process(RRDHOST *host, struct plugind *cd, FILE *fp_plugin_input, FILE *fp_plugin_output, int trust_durations)
  1437. {
  1438. int enabled = cd->unsafe.enabled;
  1439. if (!fp_plugin_input || !fp_plugin_output || !enabled) {
  1440. cd->unsafe.enabled = 0;
  1441. return 0;
  1442. }
  1443. if (unlikely(fileno(fp_plugin_input) == -1)) {
  1444. error("input file descriptor given is not a valid stream");
  1445. cd->serial_failures++;
  1446. return 0;
  1447. }
  1448. if (unlikely(fileno(fp_plugin_output) == -1)) {
  1449. error("output file descriptor given is not a valid stream");
  1450. cd->serial_failures++;
  1451. return 0;
  1452. }
  1453. clearerr(fp_plugin_input);
  1454. clearerr(fp_plugin_output);
  1455. PARSER_USER_OBJECT user = {
  1456. .enabled = cd->unsafe.enabled,
  1457. .host = host,
  1458. .cd = cd,
  1459. .trust_durations = trust_durations
  1460. };
  1461. // fp_plugin_output = our input; fp_plugin_input = our output
  1462. PARSER *parser = parser_init(&user, fp_plugin_output, fp_plugin_input, -1,
  1463. PARSER_INPUT_SPLIT, NULL);
  1464. pluginsd_keywords_init(parser, PARSER_INIT_PLUGINSD);
  1465. rrd_collector_started();
  1466. // this keeps the parser with its current value
  1467. // so, parser needs to be allocated before pushing it
  1468. netdata_thread_cleanup_push(pluginsd_process_thread_cleanup, parser);
  1469. user.parser = parser;
  1470. char buffer[PLUGINSD_LINE_MAX + 1];
  1471. while (likely(!parser_next(parser, buffer, PLUGINSD_LINE_MAX))) {
  1472. if (unlikely(!service_running(SERVICE_COLLECTORS) || parser_action(parser, buffer)))
  1473. break;
  1474. }
  1475. // free parser with the pop function
  1476. netdata_thread_cleanup_pop(1);
  1477. cd->unsafe.enabled = user.enabled;
  1478. size_t count = user.data_collections_count;
  1479. if (likely(count)) {
  1480. cd->successful_collections += count;
  1481. cd->serial_failures = 0;
  1482. }
  1483. else
  1484. cd->serial_failures++;
  1485. return count;
  1486. }
  1487. PARSER_RC pluginsd_exit(char **words __maybe_unused, size_t num_words __maybe_unused, void *user __maybe_unused)
  1488. {
  1489. info("PLUGINSD: plugin called EXIT.");
  1490. return PARSER_RC_STOP;
  1491. }
  1492. static void pluginsd_keywords_init_internal(PARSER *parser, PLUGINSD_KEYWORDS types, void (*add_func)(PARSER *parser, char *keyword, keyword_function func)) {
  1493. if (types & PARSER_INIT_PLUGINSD) {
  1494. add_func(parser, PLUGINSD_KEYWORD_FLUSH, pluginsd_flush);
  1495. add_func(parser, PLUGINSD_KEYWORD_DISABLE, pluginsd_disable);
  1496. add_func(parser, PLUGINSD_KEYWORD_HOST_DEFINE, pluginsd_host_define);
  1497. add_func(parser, PLUGINSD_KEYWORD_HOST_DEFINE_END, pluginsd_host_define_end);
  1498. add_func(parser, PLUGINSD_KEYWORD_HOST_LABEL, pluginsd_host_labels);
  1499. add_func(parser, PLUGINSD_KEYWORD_HOST, pluginsd_host);
  1500. add_func(parser, PLUGINSD_KEYWORD_EXIT, pluginsd_exit);
  1501. }
  1502. if (types & (PARSER_INIT_PLUGINSD | PARSER_INIT_STREAMING)) {
  1503. // plugins.d plugins and streaming
  1504. add_func(parser, PLUGINSD_KEYWORD_CHART, pluginsd_chart);
  1505. add_func(parser, PLUGINSD_KEYWORD_DIMENSION, pluginsd_dimension);
  1506. add_func(parser, PLUGINSD_KEYWORD_VARIABLE, pluginsd_variable);
  1507. add_func(parser, PLUGINSD_KEYWORD_LABEL, pluginsd_label);
  1508. add_func(parser, PLUGINSD_KEYWORD_OVERWRITE, pluginsd_overwrite);
  1509. add_func(parser, PLUGINSD_KEYWORD_CLABEL_COMMIT, pluginsd_clabel_commit);
  1510. add_func(parser, PLUGINSD_KEYWORD_CLABEL, pluginsd_clabel);
  1511. add_func(parser, PLUGINSD_KEYWORD_FUNCTION, pluginsd_function);
  1512. add_func(parser, PLUGINSD_KEYWORD_FUNCTION_RESULT_BEGIN, pluginsd_function_result_begin);
  1513. add_func(parser, PLUGINSD_KEYWORD_BEGIN, pluginsd_begin);
  1514. add_func(parser, PLUGINSD_KEYWORD_SET, pluginsd_set);
  1515. add_func(parser, PLUGINSD_KEYWORD_END, pluginsd_end);
  1516. inflight_functions_init(parser);
  1517. }
  1518. if (types & PARSER_INIT_STREAMING) {
  1519. add_func(parser, PLUGINSD_KEYWORD_CHART_DEFINITION_END, pluginsd_chart_definition_end);
  1520. // replication
  1521. add_func(parser, PLUGINSD_KEYWORD_REPLAY_BEGIN, pluginsd_replay_begin);
  1522. add_func(parser, PLUGINSD_KEYWORD_REPLAY_SET, pluginsd_replay_set);
  1523. add_func(parser, PLUGINSD_KEYWORD_REPLAY_RRDDIM_STATE, pluginsd_replay_rrddim_collection_state);
  1524. add_func(parser, PLUGINSD_KEYWORD_REPLAY_RRDSET_STATE, pluginsd_replay_rrdset_collection_state);
  1525. add_func(parser, PLUGINSD_KEYWORD_REPLAY_END, pluginsd_replay_end);
  1526. // streaming metrics v2
  1527. add_func(parser, PLUGINSD_KEYWORD_BEGIN_V2, pluginsd_begin_v2);
  1528. add_func(parser, PLUGINSD_KEYWORD_SET_V2, pluginsd_set_v2);
  1529. add_func(parser, PLUGINSD_KEYWORD_END_V2, pluginsd_end_v2);
  1530. }
  1531. }
  1532. void pluginsd_keywords_init(PARSER *parser, PLUGINSD_KEYWORDS types) {
  1533. pluginsd_keywords_init_internal(parser, types, parser_add_keyword);
  1534. }
  1535. struct pluginsd_user_unittest {
  1536. size_t size;
  1537. const char **hashtable;
  1538. uint32_t (*hash)(const char *s);
  1539. size_t collisions;
  1540. };
  1541. void pluginsd_keyword_collision_check(PARSER *parser, char *keyword, keyword_function func __maybe_unused) {
  1542. struct pluginsd_user_unittest *u = parser->user;
  1543. uint32_t hash = u->hash(keyword);
  1544. uint32_t slot = hash % u->size;
  1545. if(u->hashtable[slot])
  1546. u->collisions++;
  1547. u->hashtable[slot] = keyword;
  1548. }
  1549. static struct {
  1550. const char *name;
  1551. uint32_t (*hash)(const char *s);
  1552. size_t slots_needed;
  1553. } hashers[] = {
  1554. { .name = "djb2_hash32(s)", djb2_hash32, .slots_needed = 0, },
  1555. { .name = "fnv1_hash32(s)", fnv1_hash32, .slots_needed = 0, },
  1556. { .name = "fnv1a_hash32(s)", fnv1a_hash32, .slots_needed = 0, },
  1557. { .name = "larson_hash32(s)", larson_hash32, .slots_needed = 0, },
  1558. { .name = "pluginsd_parser_hash32(s)", pluginsd_parser_hash32, .slots_needed = 0, },
  1559. // terminator
  1560. { .name = NULL, NULL, .slots_needed = 0, },
  1561. };
  1562. int pluginsd_parser_unittest(void) {
  1563. PARSER *p;
  1564. size_t slots_to_check = 1000;
  1565. size_t i, h;
  1566. // check for hashtable collisions
  1567. for(h = 0; hashers[h].name ;h++) {
  1568. hashers[h].slots_needed = slots_to_check * 1000000;
  1569. for (i = 10; i < slots_to_check; i++) {
  1570. struct pluginsd_user_unittest user = {
  1571. .hash = hashers[h].hash,
  1572. .size = i,
  1573. .hashtable = callocz(i, sizeof(const char *)),
  1574. .collisions = 0,
  1575. };
  1576. p = parser_init(&user, NULL, NULL, -1, PARSER_INPUT_SPLIT, NULL);
  1577. pluginsd_keywords_init_internal(p, PARSER_INIT_PLUGINSD | PARSER_INIT_STREAMING,
  1578. pluginsd_keyword_collision_check);
  1579. parser_destroy(p);
  1580. freez(user.hashtable);
  1581. if (!user.collisions) {
  1582. hashers[h].slots_needed = i;
  1583. break;
  1584. }
  1585. }
  1586. }
  1587. for(h = 0; hashers[h].name ;h++) {
  1588. if(hashers[h].slots_needed > 1000)
  1589. info("PARSER: hash function '%s' cannot be used without collisions under %zu slots", hashers[h].name, slots_to_check);
  1590. else
  1591. info("PARSER: hash function '%s' needs PARSER_KEYWORDS_HASHTABLE_SIZE (in parser.h) set to %zu", hashers[h].name, hashers[h].slots_needed);
  1592. }
  1593. p = parser_init(NULL, NULL, NULL, -1, PARSER_INPUT_SPLIT, NULL);
  1594. pluginsd_keywords_init(p, PARSER_INIT_PLUGINSD | PARSER_INIT_STREAMING);
  1595. parser_destroy(p);
  1596. return 0;
  1597. }