pluginsd_parser.h 6.8 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221
  1. // SPDX-License-Identifier: GPL-3.0-or-later
  2. #ifndef NETDATA_PLUGINSD_PARSER_H
  3. #define NETDATA_PLUGINSD_PARSER_H
  4. #include "daemon/common.h"
  5. #define WORKER_PARSER_FIRST_JOB 3
  6. // this has to be in-sync with the same at receiver.c
  7. #define WORKER_RECEIVER_JOB_REPLICATION_COMPLETION (WORKER_PARSER_FIRST_JOB - 3)
  8. // PARSER return codes
  9. typedef enum __attribute__ ((__packed__)) parser_rc {
  10. PARSER_RC_OK, // Callback was successful, go on
  11. PARSER_RC_STOP, // Callback says STOP
  12. PARSER_RC_ERROR // Callback failed (abort rest of callbacks)
  13. } PARSER_RC;
  14. typedef enum __attribute__ ((__packed__)) parser_input_type {
  15. PARSER_INPUT_SPLIT = (1 << 1),
  16. PARSER_DEFER_UNTIL_KEYWORD = (1 << 2),
  17. } PARSER_INPUT_TYPE;
  18. typedef enum __attribute__ ((__packed__)) {
  19. PARSER_INIT_PLUGINSD = (1 << 1),
  20. PARSER_INIT_STREAMING = (1 << 2),
  21. } PARSER_REPERTOIRE;
  22. struct parser;
  23. typedef PARSER_RC (*keyword_function)(char **words, size_t num_words, struct parser *parser);
  24. typedef struct parser_keyword {
  25. char *keyword;
  26. size_t id;
  27. PARSER_REPERTOIRE repertoire;
  28. size_t worker_job_id;
  29. } PARSER_KEYWORD;
  30. typedef struct parser_user_object {
  31. RRDSET *st;
  32. RRDHOST *host;
  33. void *opaque;
  34. struct plugind *cd;
  35. int trust_durations;
  36. DICTIONARY *new_host_labels;
  37. DICTIONARY *chart_rrdlabels_linked_temporarily;
  38. size_t data_collections_count;
  39. int enabled;
  40. STREAM_CAPABILITIES capabilities; // receiver capabilities
  41. struct {
  42. bool parsing_host;
  43. uuid_t machine_guid;
  44. char machine_guid_str[UUID_STR_LEN];
  45. STRING *hostname;
  46. DICTIONARY *rrdlabels;
  47. } host_define;
  48. struct parser_user_object_replay {
  49. time_t start_time;
  50. time_t end_time;
  51. usec_t start_time_ut;
  52. usec_t end_time_ut;
  53. time_t wall_clock_time;
  54. bool rset_enabled;
  55. } replay;
  56. struct parser_user_object_v2 {
  57. bool locked_data_collection;
  58. RRDSET_STREAM_BUFFER stream_buffer; // sender capabilities in this
  59. time_t update_every;
  60. time_t end_time;
  61. time_t wall_clock_time;
  62. bool ml_locked;
  63. } v2;
  64. } PARSER_USER_OBJECT;
  65. typedef struct parser {
  66. uint8_t version; // Parser version
  67. PARSER_REPERTOIRE repertoire;
  68. uint32_t flags;
  69. int fd; // Socket
  70. size_t line;
  71. FILE *fp_input; // Input source e.g. stream
  72. FILE *fp_output; // Stream to send commands to plugin
  73. #ifdef ENABLE_HTTPS
  74. NETDATA_SSL *ssl_output;
  75. #endif
  76. PARSER_USER_OBJECT user; // User defined structure to hold extra state between calls
  77. struct buffered_reader reader;
  78. struct {
  79. const char *end_keyword;
  80. BUFFER *response;
  81. void (*action)(struct parser *parser, void *action_data);
  82. void *action_data;
  83. } defer;
  84. struct {
  85. DICTIONARY *functions;
  86. usec_t smaller_timeout;
  87. } inflight;
  88. struct {
  89. SPINLOCK spinlock;
  90. } writer;
  91. } PARSER;
  92. PARSER *parser_init(struct parser_user_object *user, FILE *fp_input, FILE *fp_output, int fd, PARSER_INPUT_TYPE flags, void *ssl);
  93. void parser_init_repertoire(PARSER *parser, PARSER_REPERTOIRE repertoire);
  94. void parser_destroy(PARSER *working_parser);
  95. void pluginsd_cleanup_v2(PARSER *parser);
  96. void inflight_functions_init(PARSER *parser);
  97. void pluginsd_keywords_init(PARSER *parser, PARSER_REPERTOIRE repertoire);
  98. PARSER_RC parser_execute(PARSER *parser, PARSER_KEYWORD *keyword, char **words, size_t num_words);
  99. static inline int find_first_keyword(const char *src, char *dst, int dst_size, bool *isspace_map) {
  100. const char *s = src, *keyword_start;
  101. while (unlikely(isspace_map[(uint8_t)*s])) s++;
  102. keyword_start = s;
  103. while (likely(*s && !isspace_map[(uint8_t)*s]) && dst_size > 1) {
  104. *dst++ = *s++;
  105. dst_size--;
  106. }
  107. *dst = '\0';
  108. return dst_size == 0 ? 0 : (int) (s - keyword_start);
  109. }
  110. PARSER_KEYWORD *gperf_lookup_keyword(register const char *str, register size_t len);
  111. static inline PARSER_KEYWORD *parser_find_keyword(PARSER *parser, const char *command) {
  112. PARSER_KEYWORD *t = gperf_lookup_keyword(command, strlen(command));
  113. if(t && (t->repertoire & parser->repertoire))
  114. return t;
  115. return NULL;
  116. }
  117. static inline int parser_action(PARSER *parser, char *input) {
  118. parser->line++;
  119. if(unlikely(parser->flags & PARSER_DEFER_UNTIL_KEYWORD)) {
  120. char command[PLUGINSD_LINE_MAX + 1];
  121. bool has_keyword = find_first_keyword(input, command, PLUGINSD_LINE_MAX, isspace_map_pluginsd);
  122. if(!has_keyword || strcmp(command, parser->defer.end_keyword) != 0) {
  123. if(parser->defer.response) {
  124. buffer_strcat(parser->defer.response, input);
  125. if(buffer_strlen(parser->defer.response) > 10 * 1024 * 1024) {
  126. // more than 10MB of data
  127. // a bad plugin that did not send the end_keyword
  128. internal_error(true, "PLUGINSD: deferred response is too big (%zu bytes). Stopping this plugin.", buffer_strlen(parser->defer.response));
  129. return 1;
  130. }
  131. }
  132. return 0;
  133. }
  134. else {
  135. // call the action
  136. parser->defer.action(parser, parser->defer.action_data);
  137. // empty everything
  138. parser->defer.action = NULL;
  139. parser->defer.action_data = NULL;
  140. parser->defer.end_keyword = NULL;
  141. parser->defer.response = NULL;
  142. parser->flags &= ~PARSER_DEFER_UNTIL_KEYWORD;
  143. }
  144. return 0;
  145. }
  146. char *words[PLUGINSD_MAX_WORDS];
  147. size_t num_words = quoted_strings_splitter_pluginsd(input, words, PLUGINSD_MAX_WORDS);
  148. const char *command = get_word(words, num_words, 0);
  149. if(unlikely(!command))
  150. return 0;
  151. PARSER_RC rc;
  152. PARSER_KEYWORD *t = parser_find_keyword(parser, command);
  153. if(likely(t)) {
  154. worker_is_busy(t->worker_job_id);
  155. rc = parser_execute(parser, t, words, num_words);
  156. // rc = (*t->func)(words, num_words, parser);
  157. worker_is_idle();
  158. }
  159. else
  160. rc = PARSER_RC_ERROR;
  161. if(rc == PARSER_RC_ERROR) {
  162. BUFFER *wb = buffer_create(PLUGINSD_LINE_MAX, NULL);
  163. for(size_t i = 0; i < num_words ;i++) {
  164. if(i) buffer_fast_strcat(wb, " ", 1);
  165. buffer_fast_strcat(wb, "\"", 1);
  166. const char *s = get_word(words, num_words, i);
  167. buffer_strcat(wb, s?s:"");
  168. buffer_fast_strcat(wb, "\"", 1);
  169. }
  170. netdata_log_error("PLUGINSD: parser_action('%s') failed on line %zu: { %s } (quotes added to show parsing)",
  171. command, parser->line, buffer_tostring(wb));
  172. buffer_free(wb);
  173. }
  174. return (rc == PARSER_RC_ERROR || rc == PARSER_RC_STOP);
  175. }
  176. #endif //NETDATA_PLUGINSD_PARSER_H