connection_monitor.c 8.3 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235
  1. /**
  2. * Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved.
  3. * SPDX-License-Identifier: Apache-2.0.
  4. */
  5. #include <aws/http/private/connection_monitor.h>
  6. #include <aws/http/connection.h>
  7. #include <aws/http/statistics.h>
  8. #include <aws/io/channel.h>
  9. #include <aws/io/logging.h>
  10. #include <aws/io/statistics.h>
  11. #include <aws/common/clock.h>
  12. #include <inttypes.h>
  13. static void s_process_statistics(
  14. struct aws_crt_statistics_handler *handler,
  15. struct aws_crt_statistics_sample_interval *interval,
  16. struct aws_array_list *stats_list,
  17. void *context) {
  18. (void)interval;
  19. struct aws_statistics_handler_http_connection_monitor_impl *impl = handler->impl;
  20. if (!aws_http_connection_monitoring_options_is_valid(&impl->options)) {
  21. return;
  22. }
  23. uint64_t pending_read_interval_ms = 0;
  24. uint64_t pending_write_interval_ms = 0;
  25. uint64_t bytes_read = 0;
  26. uint64_t bytes_written = 0;
  27. uint32_t h1_current_outgoing_stream_id = 0;
  28. uint32_t h1_current_incoming_stream_id = 0;
  29. /*
  30. * Pull out the data needed to perform the throughput calculation
  31. */
  32. size_t stats_count = aws_array_list_length(stats_list);
  33. bool h2 = false;
  34. bool h2_was_inactive = false;
  35. for (size_t i = 0; i < stats_count; ++i) {
  36. struct aws_crt_statistics_base *stats_base = NULL;
  37. if (aws_array_list_get_at(stats_list, &stats_base, i)) {
  38. continue;
  39. }
  40. switch (stats_base->category) {
  41. case AWSCRT_STAT_CAT_SOCKET: {
  42. struct aws_crt_statistics_socket *socket_stats = (struct aws_crt_statistics_socket *)stats_base;
  43. bytes_read = socket_stats->bytes_read;
  44. bytes_written = socket_stats->bytes_written;
  45. break;
  46. }
  47. case AWSCRT_STAT_CAT_HTTP1_CHANNEL: {
  48. AWS_ASSERT(!h2);
  49. struct aws_crt_statistics_http1_channel *http1_stats =
  50. (struct aws_crt_statistics_http1_channel *)stats_base;
  51. pending_read_interval_ms = http1_stats->pending_incoming_stream_ms;
  52. pending_write_interval_ms = http1_stats->pending_outgoing_stream_ms;
  53. h1_current_outgoing_stream_id = http1_stats->current_outgoing_stream_id;
  54. h1_current_incoming_stream_id = http1_stats->current_incoming_stream_id;
  55. break;
  56. }
  57. case AWSCRT_STAT_CAT_HTTP2_CHANNEL: {
  58. struct aws_crt_statistics_http2_channel *h2_stats =
  59. (struct aws_crt_statistics_http2_channel *)stats_base;
  60. pending_read_interval_ms = h2_stats->pending_incoming_stream_ms;
  61. pending_write_interval_ms = h2_stats->pending_outgoing_stream_ms;
  62. h2_was_inactive |= h2_stats->was_inactive;
  63. h2 = true;
  64. break;
  65. }
  66. default:
  67. break;
  68. }
  69. }
  70. if (impl->options.statistics_observer_fn) {
  71. impl->options.statistics_observer_fn(
  72. (size_t)(uintptr_t)(context), stats_list, impl->options.statistics_observer_user_data);
  73. }
  74. struct aws_channel *channel = context;
  75. uint64_t bytes_per_second = 0;
  76. uint64_t max_pending_io_interval_ms = 0;
  77. if (pending_write_interval_ms > 0) {
  78. double fractional_bytes_written_per_second =
  79. (double)bytes_written * (double)AWS_TIMESTAMP_MILLIS / (double)pending_write_interval_ms;
  80. if (fractional_bytes_written_per_second >= (double)UINT64_MAX) {
  81. bytes_per_second = UINT64_MAX;
  82. } else {
  83. bytes_per_second = (uint64_t)fractional_bytes_written_per_second;
  84. }
  85. max_pending_io_interval_ms = pending_write_interval_ms;
  86. }
  87. if (pending_read_interval_ms > 0) {
  88. double fractional_bytes_read_per_second =
  89. (double)bytes_read * (double)AWS_TIMESTAMP_MILLIS / (double)pending_read_interval_ms;
  90. if (fractional_bytes_read_per_second >= (double)UINT64_MAX) {
  91. bytes_per_second = UINT64_MAX;
  92. } else {
  93. bytes_per_second = aws_add_u64_saturating(bytes_per_second, (uint64_t)fractional_bytes_read_per_second);
  94. }
  95. if (pending_read_interval_ms > max_pending_io_interval_ms) {
  96. max_pending_io_interval_ms = pending_read_interval_ms;
  97. }
  98. }
  99. AWS_LOGF_DEBUG(
  100. AWS_LS_IO_CHANNEL,
  101. "id=%p: channel throughput - %" PRIu64 " bytes per second",
  102. (void *)channel,
  103. bytes_per_second);
  104. /*
  105. * Check throughput only if the connection has active stream and no gap between.
  106. */
  107. bool check_throughput = false;
  108. if (h2) {
  109. /* For HTTP/2, check throughput only if there always has any active stream on the connection */
  110. check_throughput = !h2_was_inactive;
  111. } else {
  112. /* For HTTP/1, check throughput only if at least one stream exists and was observed in that role previously */
  113. check_throughput =
  114. (h1_current_incoming_stream_id != 0 && h1_current_incoming_stream_id == impl->last_incoming_stream_id) ||
  115. (h1_current_outgoing_stream_id != 0 && h1_current_outgoing_stream_id == impl->last_outgoing_stream_id);
  116. impl->last_outgoing_stream_id = h1_current_outgoing_stream_id;
  117. impl->last_incoming_stream_id = h1_current_incoming_stream_id;
  118. }
  119. impl->last_measured_throughput = bytes_per_second;
  120. if (!check_throughput) {
  121. AWS_LOGF_TRACE(AWS_LS_IO_CHANNEL, "id=%p: channel throughput does not need to be checked", (void *)channel);
  122. impl->throughput_failure_time_ms = 0;
  123. return;
  124. }
  125. if (bytes_per_second >= impl->options.minimum_throughput_bytes_per_second) {
  126. impl->throughput_failure_time_ms = 0;
  127. return;
  128. }
  129. impl->throughput_failure_time_ms =
  130. aws_add_u64_saturating(impl->throughput_failure_time_ms, max_pending_io_interval_ms);
  131. AWS_LOGF_INFO(
  132. AWS_LS_IO_CHANNEL,
  133. "id=%p: Channel low throughput warning. Currently %" PRIu64 " milliseconds of consecutive failure time",
  134. (void *)channel,
  135. impl->throughput_failure_time_ms);
  136. uint64_t maximum_failure_time_ms = aws_timestamp_convert(
  137. impl->options.allowable_throughput_failure_interval_seconds, AWS_TIMESTAMP_SECS, AWS_TIMESTAMP_MILLIS, NULL);
  138. if (impl->throughput_failure_time_ms <= maximum_failure_time_ms) {
  139. return;
  140. }
  141. AWS_LOGF_INFO(
  142. AWS_LS_IO_CHANNEL,
  143. "id=%p: Channel low throughput threshold exceeded (< %" PRIu64
  144. " bytes per second for more than %u seconds). Shutting down.",
  145. (void *)channel,
  146. impl->options.minimum_throughput_bytes_per_second,
  147. impl->options.allowable_throughput_failure_interval_seconds);
  148. aws_channel_shutdown(channel, AWS_ERROR_HTTP_CHANNEL_THROUGHPUT_FAILURE);
  149. }
  150. static void s_destroy(struct aws_crt_statistics_handler *handler) {
  151. if (handler == NULL) {
  152. return;
  153. }
  154. aws_mem_release(handler->allocator, handler);
  155. }
  156. static uint64_t s_get_report_interval_ms(struct aws_crt_statistics_handler *handler) {
  157. (void)handler;
  158. return 1000;
  159. }
  160. static struct aws_crt_statistics_handler_vtable s_http_connection_monitor_vtable = {
  161. .process_statistics = s_process_statistics,
  162. .destroy = s_destroy,
  163. .get_report_interval_ms = s_get_report_interval_ms,
  164. };
  165. struct aws_crt_statistics_handler *aws_crt_statistics_handler_new_http_connection_monitor(
  166. struct aws_allocator *allocator,
  167. struct aws_http_connection_monitoring_options *options) {
  168. struct aws_crt_statistics_handler *handler = NULL;
  169. struct aws_statistics_handler_http_connection_monitor_impl *impl = NULL;
  170. if (!aws_mem_acquire_many(
  171. allocator,
  172. 2,
  173. &handler,
  174. sizeof(struct aws_crt_statistics_handler),
  175. &impl,
  176. sizeof(struct aws_statistics_handler_http_connection_monitor_impl))) {
  177. return NULL;
  178. }
  179. AWS_ZERO_STRUCT(*handler);
  180. AWS_ZERO_STRUCT(*impl);
  181. impl->options = *options;
  182. handler->vtable = &s_http_connection_monitor_vtable;
  183. handler->allocator = allocator;
  184. handler->impl = impl;
  185. return handler;
  186. }
  187. bool aws_http_connection_monitoring_options_is_valid(const struct aws_http_connection_monitoring_options *options) {
  188. if (options == NULL) {
  189. return false;
  190. }
  191. return options->allowable_throughput_failure_interval_seconds > 0 &&
  192. options->minimum_throughput_bytes_per_second > 0;
  193. }