send_data.c 7.0 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241
  1. // SPDX-License-Identifier: GPL-3.0-or-later
  2. #include "exporting_engine.h"
  3. /**
  4. * Discard response
  5. *
  6. * Discards a response received by an exporting connector instance after logging a sample of it to error.log
  7. *
  8. * @param buffer buffer with response data.
  9. * @param instance an instance data structure.
  10. * @return Always returns 0.
  11. */
  12. int exporting_discard_response(BUFFER *buffer, struct instance *instance) {
  13. char sample[1024];
  14. const char *s = buffer_tostring(buffer);
  15. char *d = sample, *e = &sample[sizeof(sample) - 1];
  16. for(; *s && d < e ;s++) {
  17. char c = *s;
  18. if(unlikely(!isprint(c))) c = ' ';
  19. *d++ = c;
  20. }
  21. *d = '\0';
  22. info(
  23. "EXPORTING: received %zu bytes from %s connector instance. Ignoring them. Sample: '%s'",
  24. buffer_strlen(buffer),
  25. instance->config.name,
  26. sample);
  27. buffer_flush(buffer);
  28. return 0;
  29. }
  30. /**
  31. * Receive response
  32. *
  33. * @param sock communication socket.
  34. * @param instance an instance data structure.
  35. */
  36. void simple_connector_receive_response(int *sock, struct instance *instance)
  37. {
  38. static BUFFER *response = NULL;
  39. if (!response)
  40. response = buffer_create(1);
  41. struct stats *stats = &instance->stats;
  42. errno = 0;
  43. // loop through to collect all data
  44. while (*sock != -1 && errno != EWOULDBLOCK) {
  45. buffer_need_bytes(response, 4096);
  46. ssize_t r;
  47. r = recv(*sock, &response->buffer[response->len], response->size - response->len, MSG_DONTWAIT);
  48. if (likely(r > 0)) {
  49. // we received some data
  50. response->len += r;
  51. stats->received_bytes += r;
  52. stats->receptions++;
  53. } else if (r == 0) {
  54. error("EXPORTING: '%s' closed the socket", instance->config.destination);
  55. close(*sock);
  56. *sock = -1;
  57. } else {
  58. // failed to receive data
  59. if (errno != EAGAIN && errno != EWOULDBLOCK) {
  60. error("EXPORTING: cannot receive data from '%s'.", instance->config.destination);
  61. }
  62. }
  63. #ifdef UNIT_TESTING
  64. break;
  65. #endif
  66. }
  67. // if we received data, process them
  68. if (buffer_strlen(response))
  69. instance->check_response(response, instance);
  70. }
  71. /**
  72. * Send buffer to a server
  73. *
  74. * @param sock communication socket.
  75. * @param failures the number of communication failures.
  76. * @param instance an instance data structure.
  77. */
  78. void simple_connector_send_buffer(int *sock, int *failures, struct instance *instance)
  79. {
  80. BUFFER *buffer = (BUFFER *)instance->buffer;
  81. size_t len = buffer_strlen(buffer);
  82. int flags = 0;
  83. #ifdef MSG_NOSIGNAL
  84. flags += MSG_NOSIGNAL;
  85. #endif
  86. struct stats *stats = &instance->stats;
  87. int ret = 0;
  88. if (instance->send_header)
  89. ret = instance->send_header(sock, instance);
  90. ssize_t written = -1;
  91. if (!ret)
  92. written = send(*sock, buffer_tostring(buffer), len, flags);
  93. if(written != -1 && (size_t)written == len) {
  94. // we sent the data successfully
  95. stats->transmission_successes++;
  96. stats->sent_bytes += written;
  97. stats->sent_metrics = stats->buffered_metrics;
  98. // reset the failures count
  99. *failures = 0;
  100. // empty the buffer
  101. buffer_flush(buffer);
  102. }
  103. else {
  104. // oops! we couldn't send (all or some of the) data
  105. error(
  106. "EXPORTING: failed to write data to '%s'. Willing to write %zu bytes, wrote %zd bytes. Will re-connect.",
  107. instance->config.destination,
  108. len,
  109. written);
  110. stats->transmission_failures++;
  111. if(written != -1)
  112. stats->sent_bytes += written;
  113. // increment the counter we check for data loss
  114. (*failures)++;
  115. // close the socket - we will re-open it next time
  116. close(*sock);
  117. *sock = -1;
  118. }
  119. }
  120. /**
  121. * Simple connector worker
  122. *
  123. * Runs in a separate thread for every instance.
  124. *
  125. * @param instance_p an instance data structure.
  126. */
  127. void simple_connector_worker(void *instance_p)
  128. {
  129. struct instance *instance = (struct instance*)instance_p;
  130. struct simple_connector_config *connector_specific_config = instance->config.connector_specific_config;
  131. struct stats *stats = &instance->stats;
  132. int sock = -1;
  133. struct timeval timeout = {.tv_sec = (instance->config.timeoutms * 1000) / 1000000,
  134. .tv_usec = (instance->config.timeoutms * 1000) % 1000000};
  135. int failures = 0;
  136. while(!netdata_exit) {
  137. // reset the monitoring chart counters
  138. stats->received_bytes =
  139. stats->sent_bytes =
  140. stats->sent_metrics =
  141. stats->lost_metrics =
  142. stats->receptions =
  143. stats->transmission_successes =
  144. stats->transmission_failures =
  145. stats->data_lost_events =
  146. stats->lost_bytes =
  147. stats->reconnects = 0;
  148. // ------------------------------------------------------------------------
  149. // if we are connected, receive a response, without blocking
  150. if(likely(sock != -1))
  151. simple_connector_receive_response(&sock, instance);
  152. // ------------------------------------------------------------------------
  153. // if we are not connected, connect to a data collecting server
  154. if(unlikely(sock == -1)) {
  155. size_t reconnects = 0;
  156. sock = connect_to_one_of(
  157. instance->config.destination,
  158. connector_specific_config->default_port,
  159. &timeout,
  160. &reconnects,
  161. NULL,
  162. 0);
  163. stats->reconnects += reconnects;
  164. }
  165. if(unlikely(netdata_exit)) break;
  166. // ------------------------------------------------------------------------
  167. // if we are connected, send our buffer to the data collecting server
  168. uv_mutex_lock(&instance->mutex);
  169. uv_cond_wait(&instance->cond_var, &instance->mutex);
  170. if (likely(sock != -1)) {
  171. simple_connector_send_buffer(&sock, &failures, instance);
  172. } else {
  173. error("EXPORTING: failed to update '%s'", instance->config.destination);
  174. stats->transmission_failures++;
  175. // increment the counter we check for data loss
  176. failures++;
  177. }
  178. BUFFER *buffer = instance->buffer;
  179. if (failures > instance->config.buffer_on_failures) {
  180. stats->lost_bytes += buffer_strlen(buffer);
  181. error(
  182. "EXPORTING: connector instance %s reached %d exporting failures. "
  183. "Flushing buffers to protect this host - this results in data loss on server '%s'",
  184. instance->config.name, failures, instance->config.destination);
  185. buffer_flush(buffer);
  186. failures = 0;
  187. stats->data_lost_events++;
  188. stats->lost_metrics = stats->buffered_metrics;
  189. }
  190. send_internal_metrics(instance);
  191. if(likely(buffer_strlen(buffer) == 0))
  192. stats->buffered_metrics = 0;
  193. uv_mutex_unlock(&instance->mutex);
  194. #ifdef UNIT_TESTING
  195. break;
  196. #endif
  197. }
  198. }