buffered_reader.h 4.3 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146
  1. // SPDX-License-Identifier: GPL-3.0-or-later
  2. #include "../libnetdata.h"
  3. #ifndef NETDATA_BUFFERED_READER_H
  4. #define NETDATA_BUFFERED_READER_H
  5. struct buffered_reader {
  6. ssize_t read_len;
  7. ssize_t pos;
  8. char read_buffer[PLUGINSD_LINE_MAX + 1];
  9. };
  10. static inline void buffered_reader_init(struct buffered_reader *reader) {
  11. reader->read_buffer[0] = '\0';
  12. reader->read_len = 0;
  13. reader->pos = 0;
  14. }
  15. typedef enum {
  16. BUFFERED_READER_READ_OK = 0,
  17. BUFFERED_READER_READ_FAILED = -1,
  18. BUFFERED_READER_READ_BUFFER_FULL = -2,
  19. BUFFERED_READER_READ_POLLERR = -3,
  20. BUFFERED_READER_READ_POLLHUP = -4,
  21. BUFFERED_READER_READ_POLLNVAL = -5,
  22. BUFFERED_READER_READ_POLL_UNKNOWN = -6,
  23. BUFFERED_READER_READ_POLL_TIMEOUT = -7,
  24. BUFFERED_READER_READ_POLL_FAILED = -8,
  25. } buffered_reader_ret_t;
  26. static inline buffered_reader_ret_t buffered_reader_read(struct buffered_reader *reader, int fd) {
  27. #ifdef NETDATA_INTERNAL_CHECKS
  28. if(reader->read_buffer[reader->read_len] != '\0')
  29. fatal("read_buffer does not start with zero");
  30. #endif
  31. char *read_at = reader->read_buffer + reader->read_len;
  32. ssize_t remaining = sizeof(reader->read_buffer) - reader->read_len - 1;
  33. if(unlikely(remaining <= 0))
  34. return BUFFERED_READER_READ_BUFFER_FULL;
  35. ssize_t bytes_read = read(fd, read_at, remaining);
  36. if(unlikely(bytes_read <= 0))
  37. return BUFFERED_READER_READ_FAILED;
  38. reader->read_len += bytes_read;
  39. reader->read_buffer[reader->read_len] = '\0';
  40. return BUFFERED_READER_READ_OK;
  41. }
  42. static inline buffered_reader_ret_t buffered_reader_read_timeout(struct buffered_reader *reader, int fd, int timeout_ms, bool log_error) {
  43. errno = 0;
  44. struct pollfd fds[1];
  45. fds[0].fd = fd;
  46. fds[0].events = POLLIN;
  47. int ret = poll(fds, 1, timeout_ms);
  48. if (ret > 0) {
  49. /* There is data to read */
  50. if (fds[0].revents & POLLIN)
  51. return buffered_reader_read(reader, fd);
  52. else if(fds[0].revents & POLLERR) {
  53. if(log_error)
  54. netdata_log_error("PARSER: read failed: POLLERR.");
  55. return BUFFERED_READER_READ_POLLERR;
  56. }
  57. else if(fds[0].revents & POLLHUP) {
  58. if(log_error)
  59. netdata_log_error("PARSER: read failed: POLLHUP.");
  60. return BUFFERED_READER_READ_POLLHUP;
  61. }
  62. else if(fds[0].revents & POLLNVAL) {
  63. if(log_error)
  64. netdata_log_error("PARSER: read failed: POLLNVAL.");
  65. return BUFFERED_READER_READ_POLLNVAL;
  66. }
  67. if(log_error)
  68. netdata_log_error("PARSER: poll() returned positive number, but POLLIN|POLLERR|POLLHUP|POLLNVAL are not set.");
  69. return BUFFERED_READER_READ_POLL_UNKNOWN;
  70. }
  71. else if (ret == 0) {
  72. if(log_error)
  73. netdata_log_error("PARSER: timeout while waiting for data.");
  74. return BUFFERED_READER_READ_POLL_TIMEOUT;
  75. }
  76. if(log_error)
  77. netdata_log_error("PARSER: poll() failed with code %d.", ret);
  78. return BUFFERED_READER_READ_POLL_FAILED;
  79. }
  80. /* Produce a full line if one exists, statefully return where we start next time.
  81. * When we hit the end of the buffer with a partial line move it to the beginning for the next fill.
  82. */
  83. static inline bool buffered_reader_next_line(struct buffered_reader *reader, BUFFER *dst) {
  84. buffer_need_bytes(dst, reader->read_len - reader->pos + 2);
  85. size_t start = reader->pos;
  86. char *ss = &reader->read_buffer[start];
  87. char *se = &reader->read_buffer[reader->read_len];
  88. char *ds = &dst->buffer[dst->len];
  89. char *de = &ds[dst->size - dst->len - 2];
  90. if(ss >= se) {
  91. *ds = '\0';
  92. reader->pos = 0;
  93. reader->read_len = 0;
  94. reader->read_buffer[reader->read_len] = '\0';
  95. return false;
  96. }
  97. // copy all bytes to buffer
  98. while(ss < se && ds < de && *ss != '\n') {
  99. *ds++ = *ss++;
  100. dst->len++;
  101. }
  102. // if we have a newline, return the buffer
  103. if(ss < se && ds < de && *ss == '\n') {
  104. // newline found in the r->read_buffer
  105. *ds++ = *ss++; // copy the newline too
  106. dst->len++;
  107. *ds = '\0';
  108. reader->pos = ss - reader->read_buffer;
  109. return true;
  110. }
  111. reader->pos = 0;
  112. reader->read_len = 0;
  113. reader->read_buffer[reader->read_len] = '\0';
  114. return false;
  115. }
  116. #endif //NETDATA_BUFFERED_READER_H