#pragma once #include "yql_data_provider.h" #include "yql_type_annotation.h" #include #include #ifndef YQL_OPERATION_STATISTICS_CUSTOM_FIELDS #define YQL_OPERATION_STATISTICS_CUSTOM_FIELDS #endif namespace NYql { struct TOperationProgress { #define YQL_OPERATION_PROGRESS_STATE_MAP(xx) \ xx(Started, 0) \ xx(InProgress, 1) \ xx(Finished, 2) \ xx(Failed, 3) \ xx(Aborted, 4) enum class EState { YQL_OPERATION_PROGRESS_STATE_MAP(ENUM_VALUE_GEN) }; #define YQL_OPERATION_BLOCK_STATUS_MAP(xx) \ xx(None, 0) \ xx(Partial, 1) \ xx(Full, 2) enum class EOpBlockStatus { YQL_OPERATION_BLOCK_STATUS_MAP(ENUM_VALUE_GEN) }; TString Category; ui32 Id; EState State; TMaybe BlockStatus; using TStage = std::pair; TStage Stage; TString RemoteId; THashMap RemoteData; struct TCounters { ui64 Completed = 0ULL; ui64 Running = 0ULL; ui64 Total = 0ULL; ui64 Aborted = 0ULL; ui64 Failed = 0ULL; ui64 Lost = 0ULL; ui64 Pending = 0ULL; THashMap Custom = {}; bool operator==(const TCounters& rhs) const noexcept { return Completed == rhs.Completed && Running == rhs.Running && Total == rhs.Total && Aborted == rhs.Aborted && Failed == rhs.Failed && Lost == rhs.Lost && Pending == rhs.Pending && Custom == rhs.Custom; } bool operator!=(const TCounters& rhs) const noexcept { return !operator==(rhs); } }; TMaybe Counters; TOperationProgress(const TString& category, ui32 id, EState state, const TString& stage = "") : Category(category) , Id(id) , State(state) , Stage(stage, TInstant::Now()) { } }; struct TOperationStatistics { struct TEntry { TString Name; TMaybe Sum; TMaybe Max; TMaybe Min; TMaybe Avg; TMaybe Count; TMaybe Value; TEntry(TString name, TMaybe sum, TMaybe max, TMaybe min, TMaybe avg, TMaybe count) : Name(std::move(name)) , Sum(std::move(sum)) , Max(std::move(max)) , Min(std::move(min)) , Avg(std::move(avg)) , Count(std::move(count)) { } TEntry(TString name, TString value) : Name(std::move(name)) , Value(std::move(value)) { } }; TVector Entries; }; using TStatWriter = std::function&)>; using TOperationProgressWriter = std::function; inline TStatWriter ThreadSafeStatWriter(TStatWriter base) { struct TState : public TThrRefBase { TStatWriter Base; TMutex Mutex; }; auto state = MakeIntrusive(); state->Base = base; return [state](ui32 id, const TVector& stat) { with_lock(state->Mutex) { state->Base(id, stat); } }; } inline void NullProgressWriter(const TOperationProgress& progress) { Y_UNUSED(progress); } inline TOperationProgressWriter ChainProgressWriters(TOperationProgressWriter left, TOperationProgressWriter right) { return [=](const TOperationProgress& progress) { left(progress); right(progress); }; } inline TOperationProgressWriter ThreadSafeProgressWriter(TOperationProgressWriter base) { struct TState : public TThrRefBase { TOperationProgressWriter Base; TMutex Mutex; }; auto state = MakeIntrusive(); state->Base = base; return [state](const TOperationProgress& progress) { with_lock(state->Mutex) { state->Base(progress); } }; } TAutoPtr CreateCheckExecutionTransformer(const TTypeAnnotationContext& types, bool checkWorld = true); TAutoPtr CreateExecutionTransformer(TTypeAnnotationContext& types, TOperationProgressWriter writer, bool withFinalize = true); IGraphTransformer::TStatus RequireChild(const TExprNode& node, ui32 index); }