pluginsd_parser.h 7.7 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244
  1. // SPDX-License-Identifier: GPL-3.0-or-later
  2. #ifndef NETDATA_PLUGINSD_PARSER_H
  3. #define NETDATA_PLUGINSD_PARSER_H
  4. #include "daemon/common.h"
  5. #define WORKER_PARSER_FIRST_JOB 3
  6. // this has to be in-sync with the same at receiver.c
  7. #define WORKER_RECEIVER_JOB_REPLICATION_COMPLETION (WORKER_PARSER_FIRST_JOB - 3)
  8. // this controls the max response size of a function
  9. #define PLUGINSD_MAX_DEFERRED_SIZE (100 * 1024 * 1024)
  10. #define PLUGINSD_MIN_RRDSET_POINTERS_CACHE 1024
  11. #define HOST_LABEL_IS_EPHEMERAL "_is_ephemeral"
  12. // PARSER return codes
  13. typedef enum __attribute__ ((__packed__)) parser_rc {
  14. PARSER_RC_OK, // Callback was successful, go on
  15. PARSER_RC_STOP, // Callback says STOP
  16. PARSER_RC_ERROR // Callback failed (abort rest of callbacks)
  17. } PARSER_RC;
  18. typedef enum __attribute__ ((__packed__)) parser_input_type {
  19. PARSER_INPUT_SPLIT = (1 << 1),
  20. PARSER_DEFER_UNTIL_KEYWORD = (1 << 2),
  21. } PARSER_INPUT_TYPE;
  22. typedef enum __attribute__ ((__packed__)) {
  23. PARSER_INIT_PLUGINSD = (1 << 1),
  24. PARSER_INIT_STREAMING = (1 << 2),
  25. PARSER_REP_METADATA = (1 << 3),
  26. } PARSER_REPERTOIRE;
  27. struct parser;
  28. typedef PARSER_RC (*keyword_function)(char **words, size_t num_words, struct parser *parser);
  29. typedef struct parser_keyword {
  30. char *keyword;
  31. size_t id;
  32. PARSER_REPERTOIRE repertoire;
  33. size_t worker_job_id;
  34. } PARSER_KEYWORD;
  35. typedef struct parser_user_object {
  36. bool cleanup_slots;
  37. RRDSET *st;
  38. RRDHOST *host;
  39. void *opaque;
  40. struct plugind *cd;
  41. int trust_durations;
  42. RRDLABELS *new_host_labels;
  43. RRDLABELS *chart_rrdlabels_linked_temporarily;
  44. size_t data_collections_count;
  45. int enabled;
  46. #ifdef NETDATA_LOG_STREAM_RECEIVE
  47. FILE *stream_log_fp;
  48. PARSER_REPERTOIRE stream_log_repertoire;
  49. #endif
  50. STREAM_CAPABILITIES capabilities; // receiver capabilities
  51. struct {
  52. bool parsing_host;
  53. uuid_t machine_guid;
  54. char machine_guid_str[UUID_STR_LEN];
  55. STRING *hostname;
  56. RRDLABELS *rrdlabels;
  57. } host_define;
  58. struct parser_user_object_replay {
  59. time_t start_time;
  60. time_t end_time;
  61. usec_t start_time_ut;
  62. usec_t end_time_ut;
  63. time_t wall_clock_time;
  64. bool rset_enabled;
  65. } replay;
  66. struct parser_user_object_v2 {
  67. bool locked_data_collection;
  68. RRDSET_STREAM_BUFFER stream_buffer; // sender capabilities in this
  69. time_t update_every;
  70. time_t end_time;
  71. time_t wall_clock_time;
  72. bool ml_locked;
  73. } v2;
  74. } PARSER_USER_OBJECT;
  75. typedef struct parser {
  76. uint8_t version; // Parser version
  77. PARSER_REPERTOIRE repertoire;
  78. uint32_t flags;
  79. int fd; // Socket
  80. FILE *fp_input; // Input source e.g. stream
  81. FILE *fp_output; // Stream to send commands to plugin
  82. #ifdef ENABLE_HTTPS
  83. NETDATA_SSL *ssl_output;
  84. #endif
  85. #ifdef ENABLE_H2O
  86. void *h2o_ctx; // if set we use h2o_stream functions to send data
  87. #endif
  88. PARSER_USER_OBJECT user; // User defined structure to hold extra state between calls
  89. struct buffered_reader reader;
  90. struct line_splitter line;
  91. const PARSER_KEYWORD *keyword;
  92. struct {
  93. const char *end_keyword;
  94. BUFFER *response;
  95. void (*action)(struct parser *parser, void *action_data);
  96. void *action_data;
  97. } defer;
  98. struct {
  99. DICTIONARY *functions;
  100. usec_t smaller_monotonic_timeout_ut;
  101. } inflight;
  102. struct {
  103. SPINLOCK spinlock;
  104. } writer;
  105. } PARSER;
  106. PARSER *parser_init(struct parser_user_object *user, FILE *fp_input, FILE *fp_output, int fd, PARSER_INPUT_TYPE flags, void *ssl);
  107. void parser_init_repertoire(PARSER *parser, PARSER_REPERTOIRE repertoire);
  108. void parser_destroy(PARSER *working_parser);
  109. void pluginsd_cleanup_v2(PARSER *parser);
  110. void pluginsd_keywords_init(PARSER *parser, PARSER_REPERTOIRE repertoire);
  111. PARSER_RC parser_execute(PARSER *parser, const PARSER_KEYWORD *keyword, char **words, size_t num_words);
  112. static inline int find_first_keyword(const char *src, char *dst, int dst_size, bool *isspace_map) {
  113. const char *s = src, *keyword_start;
  114. while (unlikely(isspace_map[(uint8_t)*s])) s++;
  115. keyword_start = s;
  116. while (likely(*s && !isspace_map[(uint8_t)*s]) && dst_size > 1) {
  117. *dst++ = *s++;
  118. dst_size--;
  119. }
  120. *dst = '\0';
  121. return dst_size == 0 ? 0 : (int) (s - keyword_start);
  122. }
  123. const PARSER_KEYWORD *gperf_lookup_keyword(register const char *str, register size_t len);
  124. static inline const PARSER_KEYWORD *parser_find_keyword(PARSER *parser, const char *command) {
  125. const PARSER_KEYWORD *t = gperf_lookup_keyword(command, strlen(command));
  126. if(t && (t->repertoire & parser->repertoire))
  127. return t;
  128. return NULL;
  129. }
  130. bool parser_reconstruct_node(BUFFER *wb, void *ptr);
  131. bool parser_reconstruct_instance(BUFFER *wb, void *ptr);
  132. bool parser_reconstruct_context(BUFFER *wb, void *ptr);
  133. static inline int parser_action(PARSER *parser, char *input) {
  134. #ifdef NETDATA_LOG_STREAM_RECEIVE
  135. static __thread char line[PLUGINSD_LINE_MAX + 1];
  136. strncpyz(line, input, sizeof(line) - 1);
  137. #endif
  138. parser->line.count++;
  139. if(unlikely(parser->flags & PARSER_DEFER_UNTIL_KEYWORD)) {
  140. char command[100 + 1];
  141. bool has_keyword = find_first_keyword(input, command, 100, isspace_map_pluginsd);
  142. if(!has_keyword || strcmp(command, parser->defer.end_keyword) != 0) {
  143. if(parser->defer.response) {
  144. buffer_strcat(parser->defer.response, input);
  145. if(buffer_strlen(parser->defer.response) > PLUGINSD_MAX_DEFERRED_SIZE) {
  146. // more than PLUGINSD_MAX_DEFERRED_SIZE of data,
  147. // or a bad plugin that did not send the end_keyword
  148. nd_log(NDLS_DAEMON, NDLP_ERR, "PLUGINSD: deferred response is too big (%zu bytes). Stopping this plugin.", buffer_strlen(parser->defer.response));
  149. return 1;
  150. }
  151. }
  152. return 0;
  153. }
  154. else {
  155. // call the action
  156. parser->defer.action(parser, parser->defer.action_data);
  157. // empty everything
  158. parser->defer.action = NULL;
  159. parser->defer.action_data = NULL;
  160. parser->defer.end_keyword = NULL;
  161. parser->defer.response = NULL;
  162. parser->flags &= ~PARSER_DEFER_UNTIL_KEYWORD;
  163. }
  164. return 0;
  165. }
  166. parser->line.num_words = quoted_strings_splitter_pluginsd(input, parser->line.words, PLUGINSD_MAX_WORDS);
  167. const char *command = get_word(parser->line.words, parser->line.num_words, 0);
  168. if(unlikely(!command)) {
  169. line_splitter_reset(&parser->line);
  170. return 0;
  171. }
  172. PARSER_RC rc;
  173. parser->keyword = parser_find_keyword(parser, command);
  174. if(likely(parser->keyword)) {
  175. worker_is_busy(parser->keyword->worker_job_id);
  176. #ifdef NETDATA_LOG_STREAM_RECEIVE
  177. if(parser->user.stream_log_fp && parser->keyword->repertoire & parser->user.stream_log_repertoire)
  178. fprintf(parser->user.stream_log_fp, "%s", line);
  179. #endif
  180. rc = parser_execute(parser, parser->keyword, parser->line.words, parser->line.num_words);
  181. // rc = (*t->func)(words, num_words, parser);
  182. worker_is_idle();
  183. }
  184. else
  185. rc = PARSER_RC_ERROR;
  186. if(rc == PARSER_RC_ERROR) {
  187. CLEAN_BUFFER *wb = buffer_create(1024, NULL);
  188. line_splitter_reconstruct_line(wb, &parser->line);
  189. netdata_log_error("PLUGINSD: parser_action('%s') failed on line %zu: { %s } (quotes added to show parsing)",
  190. command, parser->line.count, buffer_tostring(wb));
  191. }
  192. line_splitter_reset(&parser->line);
  193. return (rc == PARSER_RC_ERROR || rc == PARSER_RC_STOP);
  194. }
  195. #endif //NETDATA_PLUGINSD_PARSER_H