#include "job_statistics.h" #include "operation.h" #include #include #include #include #include #include #include #include #include #include namespace NYT { //////////////////////////////////////////////////////////////////// template <> i64 ConvertJobStatisticsEntry(i64 value) { return value; } template <> TDuration ConvertJobStatisticsEntry(i64 value) { return TDuration::MilliSeconds(value); } //////////////////////////////////////////////////////////////////// static TTaskName JobTypeToTaskName(EJobType jobType) { switch (jobType) { case EJobType::PartitionMap: return ETaskName::PartitionMap0; case EJobType::Partition: return ETaskName::Partition0; default: return ToString(jobType); } } static TTaskName FixTaskName(TString taskName) { if (taskName == "partition") { return ETaskName::Partition0; } else if (taskName == "partition_map") { return ETaskName::PartitionMap0; } return taskName; } //////////////////////////////////////////////////////////////////// class TJobStatistics::TData : public TThrRefBase { public: using TTaskName2Data = THashMap; using TState2TaskName2Data = THashMap; using TName2State2TaskName2Data = THashMap; public: TName2State2TaskName2Data Name2State2TaskName2Data; public: TData() = default; TData(const TNode& statisticsNode) { ParseNode(statisticsNode, TString(), &Name2State2TaskName2Data); } static void Aggregate(TJobStatistics::TDataEntry* result, const TJobStatistics::TDataEntry& other) { result->Max = Max(result->Max, other.Max); result->Min = Min(result->Min, other.Min); result->Sum += other.Sum; result->Count += other.Count; } static void ParseNode(const TNode& node, TState2TaskName2Data* output) { auto getInt = [] (const TNode& theNode, TStringBuf key) { const auto& nodeAsMap = theNode.AsMap(); auto it = nodeAsMap.find(key); if (it == nodeAsMap.end()) { ythrow yexception() << "Key '" << key << "' is not found"; } const auto& valueNode = it->second; if (!valueNode.IsInt64()) { ythrow yexception() << "Key '" << key << "' is not of int64 type"; } return valueNode.AsInt64(); }; for (const auto& [stateStr, taskName2DataNode] : node.AsMap()) { EJobState state; if (!TryFromString(stateStr, state)) { continue; } for (const auto& [taskName, dataNode] : taskName2DataNode.AsMap()) { auto fixedTaskName = FixTaskName(taskName); auto& data = (*output)[state][fixedTaskName.Get()]; data.Max = getInt(dataNode, "max"); data.Min = getInt(dataNode, "min"); data.Sum = getInt(dataNode, "sum"); data.Count = getInt(dataNode, "count"); } } } static void ParseNode(const TNode& node, const TString& curPath, TName2State2TaskName2Data* output) { Y_ABORT_UNLESS(node.IsMap()); for (const auto& [key, value] : node.AsMap()) { if (key == "$"sv) { ParseNode(value, &(*output)[curPath]); } else { TString childPath = curPath; if (!childPath.empty()) { childPath.push_back('/'); } if (key.find_first_of('/') != key.npos) { TString keyCopy(key); SubstGlobal(keyCopy, "/", "\\/"); childPath += keyCopy; } else { childPath += key; } ParseNode(value, childPath, output); } } } }; //////////////////////////////////////////////////////////////////// struct TJobStatistics::TFilter : public TThrRefBase { TVector TaskNameFilter; TVector JobStateFilter = {EJobState::Completed}; }; //////////////////////////////////////////////////////////////////// const TString TJobStatistics::CustomStatisticsNamePrefix_ = "custom/"; TJobStatistics::TJobStatistics() : Data_(::MakeIntrusive()) , Filter_(::MakeIntrusive()) { } TJobStatistics::TJobStatistics(const NYT::TNode& statisticsNode) : Data_(::MakeIntrusive(statisticsNode)) , Filter_(::MakeIntrusive()) { } TJobStatistics::TJobStatistics(::TIntrusivePtr data, ::TIntrusivePtr filter) : Data_(data) , Filter_(::MakeIntrusive(*filter)) { } TJobStatistics::TJobStatistics(const TJobStatistics& jobStatistics) = default; TJobStatistics::TJobStatistics(TJobStatistics&&) = default; TJobStatistics& TJobStatistics::operator=(const TJobStatistics& jobStatistics) = default; TJobStatistics& TJobStatistics::operator=(TJobStatistics&& jobStatistics) = default; TJobStatistics::~TJobStatistics() = default; TJobStatistics TJobStatistics::TaskName(TVector taskNames) const { auto newFilter = ::MakeIntrusive(*Filter_); newFilter->TaskNameFilter = std::move(taskNames); return TJobStatistics(Data_, std::move(newFilter)); } TJobStatistics TJobStatistics::JobState(TVector jobStates) const { auto newFilter = ::MakeIntrusive(*Filter_); newFilter->JobStateFilter = std::move(jobStates); return TJobStatistics(Data_, std::move(newFilter)); } TJobStatistics TJobStatistics::JobType(TVector jobTypes) const { TVector taskNames; for (auto jobType : jobTypes) { taskNames.push_back(JobTypeToTaskName(jobType)); } return TaskName(std::move(taskNames)); } bool TJobStatistics::HasStatistics(TStringBuf name) const { return Data_->Name2State2TaskName2Data.contains(name); } TJobStatisticsEntry TJobStatistics::GetStatistics(TStringBuf name) const { return GetStatisticsAs(name); } TVector TJobStatistics::GetStatisticsNames() const { TVector result; result.reserve(Data_->Name2State2TaskName2Data.size()); for (const auto& entry : Data_->Name2State2TaskName2Data) { result.push_back(entry.first); } return result; } bool TJobStatistics::HasCustomStatistics(TStringBuf name) const { return HasStatistics(CustomStatisticsNamePrefix_ + name); } TJobStatisticsEntry TJobStatistics::GetCustomStatistics(TStringBuf name) const { return GetCustomStatisticsAs(name); } TVector TJobStatistics::GetCustomStatisticsNames() const { TVector result; for (const auto& entry : Data_->Name2State2TaskName2Data) { if (entry.first.StartsWith(CustomStatisticsNamePrefix_)) { result.push_back(entry.first.substr(CustomStatisticsNamePrefix_.size())); } } return result; } TMaybe TJobStatistics::GetStatisticsImpl(TStringBuf name) const { auto name2State2TaskName2DataIt = Data_->Name2State2TaskName2Data.find(name); Y_ENSURE( name2State2TaskName2DataIt != Data_->Name2State2TaskName2Data.end(), "Statistics '" << name << "' are missing"); const auto& state2TaskName2Data = name2State2TaskName2DataIt->second; TMaybe result; auto aggregate = [&] (const TDataEntry& data) { if (result) { TData::Aggregate(&result.GetRef(), data); } else { result = data; } }; auto aggregateTaskName2Data = [&] (const TData::TTaskName2Data& taskName2Data) { if (Filter_->TaskNameFilter.empty()) { for (const auto& [taskName, data] : taskName2Data) { aggregate(data); } } else { for (const auto& taskName : Filter_->TaskNameFilter) { auto it = taskName2Data.find(taskName.Get()); if (it == taskName2Data.end()) { continue; } const auto& data = it->second; aggregate(data); } } }; if (Filter_->JobStateFilter.empty()) { for (const auto& [state, taskName2Data] : state2TaskName2Data) { aggregateTaskName2Data(taskName2Data); } } else { for (auto state : Filter_->JobStateFilter) { auto it = state2TaskName2Data.find(state); if (it == state2TaskName2Data.end()) { continue; } const auto& taskName2Data = it->second; aggregateTaskName2Data(taskName2Data); } } return result; } //////////////////////////////////////////////////////////////////// namespace { constexpr int USER_STATISTICS_FILE_DESCRIPTOR = 5; constexpr char PATH_DELIMITER = '/'; constexpr char ESCAPE = '\\'; IOutputStream* GetStatisticsStream() { static TFile file = Duplicate(USER_STATISTICS_FILE_DESCRIPTOR); static TFileOutput stream(file); return &stream; } template void WriteCustomStatisticsAny(TStringBuf path, const T& value) { ::NYson::TYsonWriter writer(GetStatisticsStream(), NYson::EYsonFormat::Binary, ::NYson::EYsonType::ListFragment); int depth = 0; size_t begin = 0; size_t end = 0; TVector items; while (end <= path.size()) { if (end + 1 < path.size() && path[end] == ESCAPE && path[end + 1] == PATH_DELIMITER) { end += 2; continue; } if (end == path.size() || path[end] == PATH_DELIMITER) { writer.OnBeginMap(); items.emplace_back(path.data() + begin, end - begin); SubstGlobal(items.back(), "\\/", "/"); writer.OnKeyedItem(TStringBuf(items.back())); ++depth; begin = end + 1; } ++end; } Serialize(value, &writer); while (depth > 0) { writer.OnEndMap(); --depth; } } } //////////////////////////////////////////////////////////////////// void WriteCustomStatistics(const TNode& statistics) { ::NYson::TYsonWriter writer(GetStatisticsStream(), NYson::EYsonFormat::Binary, ::NYson::EYsonType::ListFragment); Serialize(statistics, &writer); } void WriteCustomStatistics(TStringBuf path, i64 value) { WriteCustomStatisticsAny(path, value); } void FlushCustomStatisticsStream() { GetStatisticsStream()->Flush(); } //////////////////////////////////////////////////////////////////// } // namespace NYT