stockpile_linux.cpp 3.5 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121
  1. #include "stockpile.h"
  2. #include "library/cpp/yt/logging/logger.h"
  3. #include <thread>
  4. #include <mutex>
  5. #include <sys/mman.h>
  6. #include <util/system/thread.h>
  7. #include <string.h>
  8. namespace NYT {
  9. ////////////////////////////////////////////////////////////////////////////////
  10. static const auto Logger = NLogging::TLogger("Stockpile");
  11. constexpr int MADV_STOCKPILE = 0x59410004;
  12. ////////////////////////////////////////////////////////////////////////////////
  13. namespace {
  14. void RunWithFixedBreaks(i64 bufferSize, TDuration period)
  15. {
  16. auto returnCode = -::madvise(nullptr, bufferSize, MADV_STOCKPILE);
  17. YT_LOG_DEBUG_IF(returnCode, "System call \"madvise\" failed: %v", strerror(returnCode));
  18. Sleep(period);
  19. }
  20. void RunWithCappedLoad(i64 bufferSize, TDuration period)
  21. {
  22. auto started = GetApproximateCpuInstant();
  23. auto returnCode = -::madvise(nullptr, bufferSize, MADV_STOCKPILE);
  24. YT_LOG_DEBUG_IF(returnCode, "System call \"madvise\" failed: %v", strerror(returnCode));
  25. auto duration = CpuDurationToDuration(GetApproximateCpuInstant() - started);
  26. if (duration < period) {
  27. Sleep(period - duration);
  28. }
  29. }
  30. std::pair<i64, TDuration> RunWithBackoffs(
  31. i64 adjustedBufferSize,
  32. TDuration adjustedPeriod,
  33. const TStockpileOptions& options,
  34. i64 pageSize)
  35. {
  36. int returnCode = -::madvise(nullptr, adjustedBufferSize, MADV_STOCKPILE);
  37. YT_LOG_DEBUG_IF(returnCode, "System call \"madvise\" failed: %v", strerror(returnCode));
  38. switch(returnCode) {
  39. case 0:
  40. Sleep(options.Period);
  41. return {options.BufferSize, options.Period};
  42. case ENOMEM:
  43. if (adjustedBufferSize / 2 >= pageSize) {
  44. // Immediately make an attempt to reclaim half as much.
  45. adjustedBufferSize = adjustedBufferSize / 2;
  46. } else {
  47. // Unless there is not even a single reclaimable page.
  48. Sleep(options.Period);
  49. }
  50. return {adjustedBufferSize, options.Period};
  51. case EAGAIN:
  52. case EINTR:
  53. Sleep(adjustedPeriod);
  54. return {options.BufferSize, adjustedPeriod + options.Period};
  55. default:
  56. Sleep(options.Period);
  57. return {options.BufferSize, options.Period};
  58. }
  59. }
  60. } // namespace
  61. void RunStockpileThread(TStockpileOptions options, std::atomic<bool>* shouldProceed)
  62. {
  63. TThread::SetCurrentThreadName("Stockpile");
  64. const i64 pageSize = sysconf(_SC_PAGESIZE);
  65. auto bufferSize = options.BufferSize;
  66. auto period = options.Period;
  67. while (!shouldProceed || shouldProceed->load()) {
  68. switch (options.Strategy) {
  69. case EStockpileStrategy::FixedBreaks:
  70. RunWithFixedBreaks(options.BufferSize, options.Period);
  71. break;
  72. case EStockpileStrategy::FlooredLoad:
  73. RunWithCappedLoad(options.BufferSize, options.Period);
  74. break;
  75. case EStockpileStrategy::ProgressiveBackoff:
  76. std::tie(bufferSize, period) = RunWithBackoffs(bufferSize, period, options, pageSize);
  77. break;
  78. default:
  79. YT_ABORT();
  80. }
  81. }
  82. }
  83. void RunDetachedStockpileThreads(TStockpileOptions options)
  84. {
  85. static std::once_flag OnceFlag;
  86. std::call_once(OnceFlag, [options = std::move(options)] {
  87. for (int i = 0; i < options.ThreadCount; ++i) {
  88. std::thread(RunStockpileThread, options, nullptr).detach();
  89. }
  90. });
  91. }
  92. ////////////////////////////////////////////////////////////////////////////////
  93. } // namespace NYT