job_profiler.cpp 4.0 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141
  1. #include "job_profiler.h"
  2. #include <yt/yt/library/ytprof/cpu_profiler.h>
  3. #include <yt/yt/library/ytprof/external_pprof.h>
  4. #include <yt/yt/library/ytprof/heap_profiler.h>
  5. #include <yt/yt/library/ytprof/profile.h>
  6. #include <yt/yt/library/ytprof/symbolize.h>
  7. #include <yt/cpp/mapreduce/interface/logging/logger.h>
  8. #include <yt/cpp/mapreduce/interface/logging/yt_log.h>
  9. #include <library/cpp/yson/node/node_io.h>
  10. #include <contrib/libs/tcmalloc/tcmalloc/malloc_extension.h>
  11. #include <util/system/env.h>
  12. #include <util/system/file.h>
  13. #include <util/system/shellcommand.h>
  14. namespace NYT {
  15. using namespace NYTProf;
  16. ////////////////////////////////////////////////////////////////////////////////
  17. static void RunSubprocess(const std::vector<TString>& cmd)
  18. {
  19. auto command = cmd[0];
  20. auto args = TList<TString>(cmd.begin() + 1, cmd.end());
  21. TShellCommand(command, args)
  22. .Run()
  23. .Wait();
  24. }
  25. ////////////////////////////////////////////////////////////////////////////////
  26. class TJobProfiler
  27. : public IJobProfiler
  28. {
  29. public:
  30. TJobProfiler()
  31. {
  32. try {
  33. InitializeProfiler();
  34. } catch (const std::exception& ex) {
  35. YT_LOG_ERROR("Failed to initialize job profiler: %v",
  36. ex.what());
  37. }
  38. }
  39. void Start() override
  40. {
  41. if (CpuProfiler_) {
  42. CpuProfiler_->Start();
  43. }
  44. }
  45. void Stop() override
  46. {
  47. if (CpuProfiler_) {
  48. CpuProfiler_->Stop();
  49. auto profile = CpuProfiler_->ReadProfile();
  50. SymbolizeAndWriteProfile(&profile);
  51. }
  52. if (MemoryProfilingToken_) {
  53. auto profile = ConvertAllocationProfile(std::move(*MemoryProfilingToken_).Stop());
  54. SymbolizeAndWriteProfile(&profile);
  55. }
  56. if (ProfilePeakMemoryUsage_) {
  57. auto profile = ReadHeapProfile(tcmalloc::ProfileType::kPeakHeap);
  58. SymbolizeAndWriteProfile(&profile);
  59. }
  60. }
  61. private:
  62. std::unique_ptr<TCpuProfiler> CpuProfiler_;
  63. std::optional<tcmalloc::MallocExtension::AllocationProfilingToken> MemoryProfilingToken_;
  64. bool ProfilePeakMemoryUsage_ = false;
  65. bool RunExternalSymbolizer_ = false;
  66. void InitializeProfiler()
  67. {
  68. auto profilerSpecYson = GetEnv("YT_JOB_PROFILER_SPEC");
  69. if (!profilerSpecYson) {
  70. return;
  71. }
  72. auto profilerSpec = NodeFromYsonString(profilerSpecYson);
  73. if (profilerSpec["type"] == "cpu") {
  74. auto samplingFrequency = profilerSpec["sampling_frequency"].AsInt64();
  75. CpuProfiler_ = std::make_unique<TCpuProfiler>(TCpuProfilerOptions{
  76. .SamplingFrequency = static_cast<int>(samplingFrequency),
  77. });
  78. } else if (profilerSpec["type"] == "memory") {
  79. MemoryProfilingToken_ = tcmalloc::MallocExtension::StartAllocationProfiling();
  80. } else if (profilerSpec["type"] == "peak_memory") {
  81. ProfilePeakMemoryUsage_ = true;
  82. }
  83. if (profilerSpec["run_external_symbolizer"] == true) {
  84. RunExternalSymbolizer_ = true;
  85. }
  86. }
  87. void SymbolizeAndWriteProfile(NYTProf::NProto::Profile* profile)
  88. {
  89. Symbolize(profile, /*filesOnly*/ true);
  90. AddBuildInfo(profile, TBuildInfo::GetDefault());
  91. if (RunExternalSymbolizer_) {
  92. SymbolizeByExternalPProf(profile, TSymbolizationOptions{
  93. .RunTool = RunSubprocess,
  94. });
  95. }
  96. auto serializedProfile = SerializeProfile(*profile);
  97. constexpr int ProfileFileDescriptor = 8;
  98. TFile profileFile(ProfileFileDescriptor);
  99. profileFile.Write(serializedProfile.data(), serializedProfile.size());
  100. profileFile.FlushData();
  101. }
  102. };
  103. ////////////////////////////////////////////////////////////////////////////////
  104. std::unique_ptr<IJobProfiler> CreateJobProfiler()
  105. {
  106. return std::make_unique<TJobProfiler>();
  107. }
  108. ////////////////////////////////////////////////////////////////////////////////
  109. } // namespace NYT