job_statistics.cpp 11 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361
  1. #include "job_statistics.h"
  2. #include "operation.h"
  3. #include <library/cpp/yson/node/node.h>
  4. #include <library/cpp/yson/node/serialize.h>
  5. #include <library/cpp/yson/writer.h>
  6. #include <util/datetime/base.h>
  7. #include <util/generic/hash_set.h>
  8. #include <util/generic/ptr.h>
  9. #include <util/stream/file.h>
  10. #include <util/string/cast.h>
  11. #include <util/string/subst.h>
  12. #include <util/system/file.h>
  13. namespace NYT {
  14. ////////////////////////////////////////////////////////////////////////////////
  15. template <>
  16. i64 ConvertJobStatisticsEntry(i64 value)
  17. {
  18. return value;
  19. }
  20. template <>
  21. TDuration ConvertJobStatisticsEntry(i64 value)
  22. {
  23. return TDuration::MilliSeconds(value);
  24. }
  25. ////////////////////////////////////////////////////////////////////////////////
  26. static TTaskName JobTypeToTaskName(EJobType jobType)
  27. {
  28. switch (jobType) {
  29. case EJobType::PartitionMap:
  30. return ETaskName::PartitionMap0;
  31. case EJobType::Partition:
  32. return ETaskName::Partition0;
  33. default:
  34. return ToString(jobType);
  35. }
  36. }
  37. static TTaskName FixTaskName(TString taskName)
  38. {
  39. if (taskName == "partition") {
  40. return ETaskName::Partition0;
  41. } else if (taskName == "partition_map") {
  42. return ETaskName::PartitionMap0;
  43. }
  44. return taskName;
  45. }
  46. ////////////////////////////////////////////////////////////////////////////////
  47. class TJobStatistics::TData
  48. : public TThrRefBase
  49. {
  50. public:
  51. using TTaskName2Data = THashMap<TString, TJobStatistics::TDataEntry>;
  52. using TState2TaskName2Data = THashMap<EJobState, TTaskName2Data>;
  53. using TName2State2TaskName2Data = THashMap<TString, TState2TaskName2Data>;
  54. public:
  55. TName2State2TaskName2Data Name2State2TaskName2Data;
  56. public:
  57. TData() = default;
  58. TData(const TNode& statisticsNode)
  59. {
  60. ParseNode(statisticsNode, TString(), &Name2State2TaskName2Data);
  61. }
  62. static void Aggregate(TJobStatistics::TDataEntry* result, const TJobStatistics::TDataEntry& other)
  63. {
  64. result->Max = Max(result->Max, other.Max);
  65. result->Min = Min(result->Min, other.Min);
  66. result->Sum += other.Sum;
  67. result->Count += other.Count;
  68. }
  69. static void ParseNode(const TNode& node, TState2TaskName2Data* output)
  70. {
  71. auto getInt = [] (const TNode& theNode, TStringBuf key) {
  72. const auto& nodeAsMap = theNode.AsMap();
  73. auto it = nodeAsMap.find(key);
  74. if (it == nodeAsMap.end()) {
  75. ythrow yexception() << "Key '" << key << "' is not found";
  76. }
  77. const auto& valueNode = it->second;
  78. if (!valueNode.IsInt64()) {
  79. ythrow yexception() << "Key '" << key << "' is not of int64 type";
  80. }
  81. return valueNode.AsInt64();
  82. };
  83. for (const auto& [stateStr, taskName2DataNode] : node.AsMap()) {
  84. EJobState state;
  85. if (!TryFromString(stateStr, state)) {
  86. continue;
  87. }
  88. for (const auto& [taskName, dataNode] : taskName2DataNode.AsMap()) {
  89. auto fixedTaskName = FixTaskName(taskName);
  90. auto& data = (*output)[state][fixedTaskName.Get()];
  91. data.Max = getInt(dataNode, "max");
  92. data.Min = getInt(dataNode, "min");
  93. data.Sum = getInt(dataNode, "sum");
  94. data.Count = getInt(dataNode, "count");
  95. }
  96. }
  97. }
  98. static void ParseNode(const TNode& node, const TString& curPath, TName2State2TaskName2Data* output)
  99. {
  100. Y_ABORT_UNLESS(node.IsMap());
  101. for (const auto& [key, value] : node.AsMap()) {
  102. if (key == "$"sv) {
  103. ParseNode(value, &(*output)[curPath]);
  104. } else {
  105. TString childPath = curPath;
  106. if (!childPath.empty()) {
  107. childPath.push_back('/');
  108. }
  109. if (key.find_first_of('/') != key.npos) {
  110. TString keyCopy(key);
  111. SubstGlobal(keyCopy, "/", "\\/");
  112. childPath += keyCopy;
  113. } else {
  114. childPath += key;
  115. }
  116. ParseNode(value, childPath, output);
  117. }
  118. }
  119. }
  120. };
  121. ////////////////////////////////////////////////////////////////////////////////
  122. struct TJobStatistics::TFilter
  123. : public TThrRefBase
  124. {
  125. TVector<TTaskName> TaskNameFilter;
  126. TVector<EJobState> JobStateFilter = {EJobState::Completed};
  127. };
  128. ////////////////////////////////////////////////////////////////////////////////
  129. const TString TJobStatistics::CustomStatisticsNamePrefix_ = "custom/";
  130. TJobStatistics::TJobStatistics()
  131. : Data_(::MakeIntrusive<TData>())
  132. , Filter_(::MakeIntrusive<TFilter>())
  133. { }
  134. TJobStatistics::TJobStatistics(const NYT::TNode& statisticsNode)
  135. : Data_(::MakeIntrusive<TData>(statisticsNode))
  136. , Filter_(::MakeIntrusive<TFilter>())
  137. { }
  138. TJobStatistics::TJobStatistics(::TIntrusivePtr<TData> data, ::TIntrusivePtr<TFilter> filter)
  139. : Data_(data)
  140. , Filter_(::MakeIntrusive<TFilter>(*filter))
  141. { }
  142. TJobStatistics::TJobStatistics(const TJobStatistics& jobStatistics) = default;
  143. TJobStatistics::TJobStatistics(TJobStatistics&&) = default;
  144. TJobStatistics& TJobStatistics::operator=(const TJobStatistics& jobStatistics) = default;
  145. TJobStatistics& TJobStatistics::operator=(TJobStatistics&& jobStatistics) = default;
  146. TJobStatistics::~TJobStatistics() = default;
  147. TJobStatistics TJobStatistics::TaskName(TVector<TTaskName> taskNames) const
  148. {
  149. auto newFilter = ::MakeIntrusive<TFilter>(*Filter_);
  150. newFilter->TaskNameFilter = std::move(taskNames);
  151. return TJobStatistics(Data_, std::move(newFilter));
  152. }
  153. TJobStatistics TJobStatistics::JobState(TVector<EJobState> jobStates) const
  154. {
  155. auto newFilter = ::MakeIntrusive<TFilter>(*Filter_);
  156. newFilter->JobStateFilter = std::move(jobStates);
  157. return TJobStatistics(Data_, std::move(newFilter));
  158. }
  159. TJobStatistics TJobStatistics::JobType(TVector<EJobType> jobTypes) const
  160. {
  161. TVector<TTaskName> taskNames;
  162. for (auto jobType : jobTypes) {
  163. taskNames.push_back(JobTypeToTaskName(jobType));
  164. }
  165. return TaskName(std::move(taskNames));
  166. }
  167. bool TJobStatistics::HasStatistics(TStringBuf name) const
  168. {
  169. return Data_->Name2State2TaskName2Data.contains(name);
  170. }
  171. TJobStatisticsEntry<i64> TJobStatistics::GetStatistics(TStringBuf name) const
  172. {
  173. return GetStatisticsAs<i64>(name);
  174. }
  175. TVector<TString> TJobStatistics::GetStatisticsNames() const
  176. {
  177. TVector<TString> result;
  178. result.reserve(Data_->Name2State2TaskName2Data.size());
  179. for (const auto& entry : Data_->Name2State2TaskName2Data) {
  180. result.push_back(entry.first);
  181. }
  182. return result;
  183. }
  184. bool TJobStatistics::HasCustomStatistics(TStringBuf name) const
  185. {
  186. return HasStatistics(CustomStatisticsNamePrefix_ + name);
  187. }
  188. TJobStatisticsEntry<i64> TJobStatistics::GetCustomStatistics(TStringBuf name) const
  189. {
  190. return GetCustomStatisticsAs<i64>(name);
  191. }
  192. TVector<TString> TJobStatistics::GetCustomStatisticsNames() const
  193. {
  194. TVector<TString> result;
  195. for (const auto& entry : Data_->Name2State2TaskName2Data) {
  196. if (entry.first.StartsWith(CustomStatisticsNamePrefix_)) {
  197. result.push_back(entry.first.substr(CustomStatisticsNamePrefix_.size()));
  198. }
  199. }
  200. return result;
  201. }
  202. TMaybe<TJobStatistics::TDataEntry> TJobStatistics::GetStatisticsImpl(TStringBuf name) const
  203. {
  204. auto name2State2TaskName2DataIt = Data_->Name2State2TaskName2Data.find(name);
  205. Y_ENSURE(
  206. name2State2TaskName2DataIt != Data_->Name2State2TaskName2Data.end(),
  207. "Statistics '" << name << "' are missing");
  208. const auto& state2TaskName2Data = name2State2TaskName2DataIt->second;
  209. TMaybe<TDataEntry> result;
  210. auto aggregate = [&] (const TDataEntry& data) {
  211. if (result) {
  212. TData::Aggregate(&result.GetRef(), data);
  213. } else {
  214. result = data;
  215. }
  216. };
  217. auto aggregateTaskName2Data = [&] (const TData::TTaskName2Data& taskName2Data) {
  218. if (Filter_->TaskNameFilter.empty()) {
  219. for (const auto& [taskName, data] : taskName2Data) {
  220. aggregate(data);
  221. }
  222. } else {
  223. for (const auto& taskName : Filter_->TaskNameFilter) {
  224. auto it = taskName2Data.find(taskName.Get());
  225. if (it == taskName2Data.end()) {
  226. continue;
  227. }
  228. const auto& data = it->second;
  229. aggregate(data);
  230. }
  231. }
  232. };
  233. if (Filter_->JobStateFilter.empty()) {
  234. for (const auto& [state, taskName2Data] : state2TaskName2Data) {
  235. aggregateTaskName2Data(taskName2Data);
  236. }
  237. } else {
  238. for (auto state : Filter_->JobStateFilter) {
  239. auto it = state2TaskName2Data.find(state);
  240. if (it == state2TaskName2Data.end()) {
  241. continue;
  242. }
  243. const auto& taskName2Data = it->second;
  244. aggregateTaskName2Data(taskName2Data);
  245. }
  246. }
  247. return result;
  248. }
  249. ////////////////////////////////////////////////////////////////////////////////
  250. namespace {
  251. constexpr int USER_STATISTICS_FILE_DESCRIPTOR = 5;
  252. constexpr char PATH_DELIMITER = '/';
  253. constexpr char ESCAPE = '\\';
  254. IOutputStream* GetStatisticsStream()
  255. {
  256. static TFile file = Duplicate(USER_STATISTICS_FILE_DESCRIPTOR);
  257. static TFileOutput stream(file);
  258. return &stream;
  259. }
  260. template <typename T>
  261. void WriteCustomStatisticsAny(TStringBuf path, const T& value)
  262. {
  263. ::NYson::TYsonWriter writer(GetStatisticsStream(), NYson::EYsonFormat::Binary, ::NYson::EYsonType::ListFragment);
  264. int depth = 0;
  265. size_t begin = 0;
  266. size_t end = 0;
  267. TVector<TString> items;
  268. while (end <= path.size()) {
  269. if (end + 1 < path.size() && path[end] == ESCAPE && path[end + 1] == PATH_DELIMITER) {
  270. end += 2;
  271. continue;
  272. }
  273. if (end == path.size() || path[end] == PATH_DELIMITER) {
  274. writer.OnBeginMap();
  275. items.emplace_back(path.data() + begin, end - begin);
  276. SubstGlobal(items.back(), "\\/", "/");
  277. writer.OnKeyedItem(TStringBuf(items.back()));
  278. ++depth;
  279. begin = end + 1;
  280. }
  281. ++end;
  282. }
  283. Serialize(value, &writer);
  284. while (depth > 0) {
  285. writer.OnEndMap();
  286. --depth;
  287. }
  288. }
  289. }
  290. ////////////////////////////////////////////////////////////////////////////////
  291. void WriteCustomStatistics(const TNode& statistics)
  292. {
  293. ::NYson::TYsonWriter writer(GetStatisticsStream(), NYson::EYsonFormat::Binary, ::NYson::EYsonType::ListFragment);
  294. Serialize(statistics, &writer);
  295. }
  296. void WriteCustomStatistics(TStringBuf path, i64 value)
  297. {
  298. WriteCustomStatisticsAny(path, value);
  299. }
  300. void FlushCustomStatisticsStream() {
  301. GetStatisticsStream()->Flush();
  302. }
  303. ////////////////////////////////////////////////////////////////////////////////
  304. } // namespace NYT