user_job_statistics.cpp 3.3 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133
  1. #include "user_job_statistics.h"
  2. #include <yt/cpp/mapreduce/common/helpers.h>
  3. #include <util/stream/null.h>
  4. #include <util/string/builder.h>
  5. #include <util/system/mutex.h>
  6. #include <util/system/env.h>
  7. using namespace NYtTools;
  8. static TMutex GlobalStatsWritingMutex;
  9. #if defined(_unix_)
  10. const FHANDLE TUserJobStatsProxy::JobStatisticsHandle = 5;
  11. #elif defined(_win_)
  12. const FHANDLE TUserJobStatsProxy::JobStatisticsHandle = nullptr;
  13. #endif
  14. static IOutputStream* CorrectHandle(const FHANDLE h) {
  15. #if defined(_unix_)
  16. if (fcntl(h, F_GETFD) == -1) {
  17. return &Cerr;
  18. }
  19. return nullptr;
  20. #elif defined(_win_)
  21. return &Cerr;
  22. #endif
  23. }
  24. static TString PrintNodeSimple(const NYT::TNode& n) {
  25. return NYT::NodeToYsonString(n, NYson::EYsonFormat::Text);
  26. }
  27. void TUserJobStatsProxy::Init(IOutputStream * usingStream) {
  28. if (usingStream == nullptr) {
  29. usingStream = CorrectHandle(JobStatisticsHandle);
  30. }
  31. if (usingStream == nullptr && GetEnv("YT_JOB_ID").empty()) {
  32. usingStream = &Cerr;
  33. }
  34. if (usingStream == nullptr) {
  35. TFileHandle fixedDesrc(JobStatisticsHandle);
  36. FetchedOut = MakeHolder<TFixedBufferFileOutput>(TFile(fixedDesrc.Duplicate()));
  37. UsingStream = FetchedOut.Get();
  38. fixedDesrc.Release();
  39. } else {
  40. UsingStream = usingStream;
  41. }
  42. }
  43. void TUserJobStatsProxy::InitChecked(IOutputStream* def) {
  44. IOutputStream* usingStream = CorrectHandle(JobStatisticsHandle);
  45. if (usingStream == nullptr && !GetEnv("YT_JOB_ID").empty()) {
  46. TFileHandle fixedDesrc(JobStatisticsHandle);
  47. FetchedOut = MakeHolder<TFixedBufferFileOutput>(TFile(fixedDesrc.Duplicate()));
  48. UsingStream = FetchedOut.Get();
  49. fixedDesrc.Release();
  50. } else {
  51. UsingStream = def;
  52. }
  53. }
  54. void TUserJobStatsProxy::InitIfNotInited(IOutputStream * usingStream) {
  55. if (UsingStream == nullptr) {
  56. Init(usingStream);
  57. }
  58. }
  59. void TUserJobStatsProxy::CommitStats() {
  60. if (Stats.empty()) {
  61. return;
  62. }
  63. auto res = NYT::TNode::CreateMap();
  64. for (auto& p : Stats) {
  65. res[p.first] = p.second;
  66. }
  67. for (auto& p : TimeStats) {
  68. res[p.first] = p.second.MilliSeconds();
  69. }
  70. with_lock(GlobalStatsWritingMutex) {
  71. *UsingStream << PrintNodeSimple(res) << ";" << Endl;
  72. }
  73. Stats.clear();
  74. }
  75. TTimeStatHolder TUserJobStatsProxy::TimerStart(TString name, bool commitOnFinish) {
  76. return THolder(new TTimeStat(this, name, commitOnFinish));
  77. }
  78. void TUserJobStatsProxy::WriteStat(TString name, i64 val) {
  79. auto res = NYT::TNode {} (name, val);
  80. with_lock(GlobalStatsWritingMutex) {
  81. *UsingStream << PrintNodeSimple(res) << ";" << Endl;
  82. }
  83. }
  84. void TUserJobStatsProxy::WriteStatNoFlush(TString name, i64 val) {
  85. auto res = NYT::TNode {} (name, val);
  86. with_lock(GlobalStatsWritingMutex) {
  87. *UsingStream << (TStringBuilder{} << PrintNodeSimple(res) << ";\n");
  88. }
  89. }
  90. TTimeStat::TTimeStat(TUserJobStatsProxy* parent, TString name, bool commit)
  91. : Parent(parent)
  92. , Name(name)
  93. , Commit(commit) {}
  94. TTimeStat::~TTimeStat() {
  95. Finish();
  96. }
  97. void TTimeStat::Cancel() {
  98. Parent = nullptr;
  99. }
  100. void TTimeStat::Finish() {
  101. if (!Parent) {
  102. return;
  103. }
  104. if (Commit) {
  105. Parent->WriteStatNoFlush(Name, Timer.Get().MilliSeconds());
  106. } else {
  107. Parent->TimeStats[Name] += Timer.Get();
  108. }
  109. Cancel();
  110. }