helpers.cpp 12 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359
  1. #include "helpers.h"
  2. #include "config.h"
  3. #include "private.h"
  4. #include "stockpile.h"
  5. #include <yt/yt/core/ytalloc/bindings.h>
  6. #include <yt/yt/core/misc/lazy_ptr.h>
  7. #include <yt/yt/core/misc/ref_counted_tracker.h>
  8. #include <yt/yt/core/misc/ref_counted_tracker_profiler.h>
  9. #include <yt/yt/core/bus/tcp/dispatcher.h>
  10. #include <yt/yt/library/oom/oom.h>
  11. #include <yt/yt/library/tracing/jaeger/tracer.h>
  12. #include <yt/yt/library/profiling/perf/counters.h>
  13. #include <yt/yt/library/profiling/resource_tracker/resource_tracker.h>
  14. #include <yt/yt/core/logging/log_manager.h>
  15. #include <yt/yt/core/concurrency/execution_stack.h>
  16. #include <yt/yt/core/concurrency/fiber_scheduler_thread.h>
  17. #include <yt/yt/core/concurrency/periodic_executor.h>
  18. #include <tcmalloc/malloc_extension.h>
  19. #include <yt/yt/core/net/address.h>
  20. #include <yt/yt/core/yson/protobuf_interop.h>
  21. #include <yt/yt/core/rpc/dispatcher.h>
  22. #include <yt/yt/core/rpc/grpc/dispatcher.h>
  23. #include <yt/yt/core/service_discovery/yp/service_discovery.h>
  24. #include <yt/yt/core/threading/spin_wait_slow_path_logger.h>
  25. #include <library/cpp/yt/memory/atomic_intrusive_ptr.h>
  26. #include <util/string/split.h>
  27. #include <util/system/thread.h>
  28. #include <mutex>
  29. #include <thread>
  30. namespace NYT {
  31. using namespace NConcurrency;
  32. using namespace NThreading;
  33. ////////////////////////////////////////////////////////////////////////////////
  34. static std::once_flag InitAggressiveReleaseThread;
  35. static auto& Logger = ProgramLogger;
  36. ////////////////////////////////////////////////////////////////////////////////
  37. class TCMallocLimitsAdjuster
  38. {
  39. public:
  40. void Adjust(const TTCMallocConfigPtr& config)
  41. {
  42. i64 totalMemory = GetAnonymousMemoryLimit();
  43. AdjustPageHeapLimit(totalMemory, config);
  44. AdjustAggressiveReleaseThreshold(totalMemory, config);
  45. SetupMemoryLimitHandler(config);
  46. }
  47. i64 GetAggressiveReleaseThreshold()
  48. {
  49. return AggressiveReleaseThreshold_;
  50. }
  51. private:
  52. using TAllocatorMemoryLimit = tcmalloc::MallocExtension::MemoryLimit;
  53. TAllocatorMemoryLimit AppliedLimit_;
  54. i64 AggressiveReleaseThreshold_ = 0;
  55. void AdjustPageHeapLimit(i64 totalMemory, const TTCMallocConfigPtr& config)
  56. {
  57. auto proposed = ProposeHeapMemoryLimit(totalMemory, config);
  58. if (proposed.limit == AppliedLimit_.limit && proposed.hard == AppliedLimit_.hard) {
  59. // Already applied
  60. return;
  61. }
  62. YT_LOG_INFO("Changing tcmalloc memory limit (Limit: %v, Hard: %v)",
  63. proposed.limit,
  64. proposed.hard);
  65. tcmalloc::MallocExtension::SetMemoryLimit(proposed);
  66. AppliedLimit_ = proposed;
  67. }
  68. void AdjustAggressiveReleaseThreshold(i64 totalMemory, const TTCMallocConfigPtr& config)
  69. {
  70. if (totalMemory && config->AggressiveReleaseThresholdRatio) {
  71. AggressiveReleaseThreshold_ = *config->AggressiveReleaseThresholdRatio * totalMemory;
  72. } else {
  73. AggressiveReleaseThreshold_ = config->AggressiveReleaseThreshold;
  74. }
  75. }
  76. void SetupMemoryLimitHandler(const TTCMallocConfigPtr& config)
  77. {
  78. TTCMallocLimitHandlerOptions handlerOptions {
  79. .HeapDumpDirectory = config->HeapSizeLimit->DumpMemoryProfilePath,
  80. .Timeout = config->HeapSizeLimit->DumpMemoryProfileTimeout,
  81. };
  82. if (config->HeapSizeLimit->DumpMemoryProfileOnViolation) {
  83. EnableTCMallocLimitHandler(handlerOptions);
  84. } else {
  85. DisableTCMallocLimitHandler();
  86. }
  87. }
  88. i64 GetAnonymousMemoryLimit() const
  89. {
  90. auto resourceTracker = NProfiling::GetResourceTracker();
  91. if (!resourceTracker) {
  92. return 0;
  93. }
  94. return resourceTracker->GetAnonymousMemoryLimit();
  95. }
  96. TAllocatorMemoryLimit ProposeHeapMemoryLimit(i64 totalMemory, const TTCMallocConfigPtr& config) const
  97. {
  98. const auto& heapSizeConfig = config->HeapSizeLimit;
  99. if (totalMemory == 0 || !heapSizeConfig->ContainerMemoryRatio && !heapSizeConfig->ContainerMemoryMargin) {
  100. return {};
  101. }
  102. TAllocatorMemoryLimit proposed;
  103. proposed.hard = heapSizeConfig->Hard;
  104. if (heapSizeConfig->ContainerMemoryMargin) {
  105. proposed.limit = totalMemory - *heapSizeConfig->ContainerMemoryMargin;
  106. } else {
  107. proposed.limit = *heapSizeConfig->ContainerMemoryRatio * totalMemory;
  108. }
  109. return proposed;
  110. }
  111. };
  112. void ConfigureTCMalloc(const TTCMallocConfigPtr& config)
  113. {
  114. tcmalloc::MallocExtension::SetBackgroundReleaseRate(
  115. tcmalloc::MallocExtension::BytesPerSecond{static_cast<size_t>(config->BackgroundReleaseRate)});
  116. tcmalloc::MallocExtension::SetMaxPerCpuCacheSize(config->MaxPerCpuCacheSize);
  117. if (config->GuardedSamplingRate) {
  118. tcmalloc::MallocExtension::SetGuardedSamplingRate(*config->GuardedSamplingRate);
  119. tcmalloc::MallocExtension::ActivateGuardedSampling();
  120. }
  121. struct TConfigSingleton
  122. {
  123. TAtomicIntrusivePtr<TTCMallocConfig> Config;
  124. };
  125. LeakySingleton<TConfigSingleton>()->Config.Store(config);
  126. if (tcmalloc::MallocExtension::NeedsProcessBackgroundActions()) {
  127. std::call_once(InitAggressiveReleaseThread, [] {
  128. std::thread([] {
  129. ::TThread::SetCurrentThreadName("TCAllocYT");
  130. TCMallocLimitsAdjuster limitsAdjuster;
  131. while (true) {
  132. auto config = LeakySingleton<TConfigSingleton>()->Config.Acquire();
  133. limitsAdjuster.Adjust(config);
  134. auto freeBytes = tcmalloc::MallocExtension::GetNumericProperty("tcmalloc.page_heap_free");
  135. YT_VERIFY(freeBytes);
  136. if (static_cast<i64>(*freeBytes) > limitsAdjuster.GetAggressiveReleaseThreshold()) {
  137. YT_LOG_DEBUG("Aggressively releasing memory (FreeBytes: %v, Threshold: %v)",
  138. static_cast<i64>(*freeBytes),
  139. limitsAdjuster.GetAggressiveReleaseThreshold());
  140. tcmalloc::MallocExtension::ReleaseMemoryToSystem(config->AggressiveReleaseSize);
  141. }
  142. Sleep(config->AggressiveReleasePeriod);
  143. }
  144. }).detach();
  145. });
  146. }
  147. }
  148. void ConfigureSingletons(const TSingletonsConfigPtr& config)
  149. {
  150. SetSpinWaitSlowPathLoggingThreshold(config->SpinWaitSlowPathLoggingThreshold);
  151. if (!NYTAlloc::ConfigureFromEnv()) {
  152. NYTAlloc::Configure(config->YTAlloc);
  153. }
  154. for (const auto& [kind, size] : config->FiberStackPoolSizes) {
  155. NConcurrency::SetFiberStackPoolSize(ParseEnum<NConcurrency::EExecutionStackKind>(kind), size);
  156. }
  157. NLogging::TLogManager::Get()->EnableReopenOnSighup();
  158. if (!NLogging::TLogManager::Get()->IsConfiguredFromEnv()) {
  159. NLogging::TLogManager::Get()->Configure(config->Logging);
  160. }
  161. NNet::TAddressResolver::Get()->Configure(config->AddressResolver);
  162. // By default, server components must have a reasonable FQDN.
  163. // Failure to do so may result in issues like YT-4561.
  164. NNet::TAddressResolver::Get()->EnsureLocalHostName();
  165. NBus::TTcpDispatcher::Get()->Configure(config->TcpDispatcher);
  166. NPipes::TIODispatcher::Get()->Configure(config->IODispatcher);
  167. NRpc::TDispatcher::Get()->Configure(config->RpcDispatcher);
  168. NRpc::NGrpc::TDispatcher::Get()->Configure(config->GrpcDispatcher);
  169. NRpc::TDispatcher::Get()->SetServiceDiscovery(
  170. NServiceDiscovery::NYP::CreateServiceDiscovery(config->YPServiceDiscovery));
  171. NTracing::SetGlobalTracer(New<NTracing::TJaegerTracer>(config->Jaeger));
  172. NProfiling::EnablePerfCounters();
  173. if (auto tracingConfig = config->TracingTransport) {
  174. NTracing::SetTracingTransportConfig(tracingConfig);
  175. }
  176. ConfigureTCMalloc(config->TCMalloc);
  177. TStockpileManager::Get()->Reconfigure(*config->Stockpile);
  178. if (config->EnableRefCountedTrackerProfiling) {
  179. EnableRefCountedTrackerProfiling();
  180. }
  181. if (config->EnableResourceTracker) {
  182. NProfiling::EnableResourceTracker();
  183. if (config->ResourceTrackerVCpuFactor.has_value()) {
  184. NProfiling::SetVCpuFactor(config->ResourceTrackerVCpuFactor.value());
  185. }
  186. }
  187. NYson::SetProtobufInteropConfig(config->ProtobufInterop);
  188. }
  189. TTCMallocConfigPtr MergeTCMallocDynamicConfig(const TTCMallocConfigPtr& staticConfig, const TTCMallocConfigPtr& dynamicConfig)
  190. {
  191. auto mergedConfig = CloneYsonStruct(dynamicConfig);
  192. mergedConfig->HeapSizeLimit->DumpMemoryProfilePath = staticConfig->HeapSizeLimit->DumpMemoryProfilePath;
  193. return mergedConfig;
  194. }
  195. void ReconfigureSingletons(const TSingletonsConfigPtr& config, const TSingletonsDynamicConfigPtr& dynamicConfig)
  196. {
  197. SetSpinWaitSlowPathLoggingThreshold(dynamicConfig->SpinWaitSlowPathLoggingThreshold.value_or(config->SpinWaitSlowPathLoggingThreshold));
  198. NConcurrency::UpdateMaxIdleFibers(dynamicConfig->MaxIdleFibers);
  199. if (!NYTAlloc::IsConfiguredFromEnv()) {
  200. NYTAlloc::Configure(dynamicConfig->YTAlloc ? dynamicConfig->YTAlloc : config->YTAlloc);
  201. }
  202. if (!NLogging::TLogManager::Get()->IsConfiguredFromEnv()) {
  203. NLogging::TLogManager::Get()->Configure(
  204. config->Logging->ApplyDynamic(dynamicConfig->Logging),
  205. /*sync*/ false);
  206. }
  207. auto tracer = NTracing::GetGlobalTracer();
  208. if (auto jaeger = DynamicPointerCast<NTracing::TJaegerTracer>(tracer); jaeger) {
  209. jaeger->Configure(config->Jaeger->ApplyDynamic(dynamicConfig->Jaeger));
  210. }
  211. NBus::TTcpDispatcher::Get()->Configure(config->TcpDispatcher->ApplyDynamic(dynamicConfig->TcpDispatcher));
  212. NPipes::TIODispatcher::Get()->Configure(dynamicConfig->IODispatcher ? dynamicConfig->IODispatcher : config->IODispatcher);
  213. NRpc::TDispatcher::Get()->Configure(config->RpcDispatcher->ApplyDynamic(dynamicConfig->RpcDispatcher));
  214. if (dynamicConfig->TracingTransport) {
  215. NTracing::SetTracingTransportConfig(dynamicConfig->TracingTransport);
  216. } else if (config->TracingTransport) {
  217. NTracing::SetTracingTransportConfig(config->TracingTransport);
  218. }
  219. if (dynamicConfig->TCMalloc) {
  220. ConfigureTCMalloc(MergeTCMallocDynamicConfig(config->TCMalloc, dynamicConfig->TCMalloc));
  221. } else if (config->TCMalloc) {
  222. ConfigureTCMalloc(config->TCMalloc);
  223. }
  224. if (dynamicConfig->Stockpile) {
  225. TStockpileManager::Get()->Reconfigure(*config->Stockpile->ApplyDynamic(dynamicConfig->Stockpile));
  226. }
  227. NYson::SetProtobufInteropConfig(config->ProtobufInterop->ApplyDynamic(dynamicConfig->ProtobufInterop));
  228. }
  229. template <class TConfig>
  230. void StartDiagnosticDumpImpl(const TConfig& config)
  231. {
  232. static NLogging::TLogger Logger("DiagDump");
  233. auto logDumpString = [&] (TStringBuf banner, const TString& str) {
  234. for (const auto& line : StringSplitter(str).Split('\n')) {
  235. YT_LOG_DEBUG("%v %v", banner, line.Token());
  236. }
  237. };
  238. if (config->YTAllocDumpPeriod) {
  239. static const TLazyIntrusivePtr<TPeriodicExecutor> Executor(BIND([&] {
  240. return New<TPeriodicExecutor>(
  241. NRpc::TDispatcher::Get()->GetHeavyInvoker(),
  242. BIND([&] {
  243. logDumpString("YTAlloc", NYTAlloc::FormatAllocationCounters());
  244. }));
  245. }));
  246. Executor->SetPeriod(config->YTAllocDumpPeriod);
  247. Executor->Start();
  248. }
  249. if (config->RefCountedTrackerDumpPeriod) {
  250. static const TLazyIntrusivePtr<TPeriodicExecutor> Executor(BIND([&] {
  251. return New<TPeriodicExecutor>(
  252. NRpc::TDispatcher::Get()->GetHeavyInvoker(),
  253. BIND([&] {
  254. logDumpString("RCT", TRefCountedTracker::Get()->GetDebugInfo());
  255. }));
  256. }));
  257. Executor->SetPeriod(config->RefCountedTrackerDumpPeriod);
  258. Executor->Start();
  259. }
  260. }
  261. void StartDiagnosticDump(const TDiagnosticDumpConfigPtr& config)
  262. {
  263. StartDiagnosticDumpImpl(config);
  264. }
  265. ////////////////////////////////////////////////////////////////////////////////
  266. } // namespace NYT