123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161 |
- #pragma once
- #include "yql_data_provider.h"
- #include "yql_type_annotation.h"
- #include <yql/essentials/ast/yql_gc_nodes.h>
- #include <util/system/mutex.h>
- #ifndef YQL_OPERATION_STATISTICS_CUSTOM_FIELDS
- #define YQL_OPERATION_STATISTICS_CUSTOM_FIELDS
- #endif
- namespace NYql {
- struct TOperationProgress {
- #define YQL_OPERATION_PROGRESS_STATE_MAP(xx) \
- xx(Started, 0) \
- xx(InProgress, 1) \
- xx(Finished, 2) \
- xx(Failed, 3) \
- xx(Aborted, 4)
- enum class EState {
- YQL_OPERATION_PROGRESS_STATE_MAP(ENUM_VALUE_GEN)
- };
- #define YQL_OPERATION_BLOCK_STATUS_MAP(xx) \
- xx(None, 0) \
- xx(Partial, 1) \
- xx(Full, 2)
- enum class EOpBlockStatus {
- YQL_OPERATION_BLOCK_STATUS_MAP(ENUM_VALUE_GEN)
- };
- TString Category;
- ui32 Id;
- EState State;
- TMaybe<EOpBlockStatus> BlockStatus;
- using TStage = std::pair<TString, TInstant>;
- TStage Stage;
- TString RemoteId;
- THashMap<TString, TString> RemoteData;
- struct TCounters {
- ui64 Completed = 0ULL;
- ui64 Running = 0ULL;
- ui64 Total = 0ULL;
- ui64 Aborted = 0ULL;
- ui64 Failed = 0ULL;
- ui64 Lost = 0ULL;
- ui64 Pending = 0ULL;
- THashMap<TString, i64> Custom = {};
- bool operator==(const TCounters& rhs) const noexcept {
- return Completed == rhs.Completed &&
- Running == rhs.Running &&
- Total == rhs.Total &&
- Aborted == rhs.Aborted &&
- Failed == rhs.Failed &&
- Lost == rhs.Lost &&
- Pending == rhs.Pending &&
- Custom == rhs.Custom;
- }
- bool operator!=(const TCounters& rhs) const noexcept {
- return !operator==(rhs);
- }
- };
- TMaybe<TCounters> Counters;
- TOperationProgress(const TString& category, ui32 id,
- EState state, const TString& stage = "")
- : Category(category)
- , Id(id)
- , State(state)
- , Stage(stage, TInstant::Now())
- {
- }
- };
- struct TOperationStatistics {
- struct TEntry {
- TString Name;
- TMaybe<i64> Sum;
- TMaybe<i64> Max;
- TMaybe<i64> Min;
- TMaybe<i64> Avg;
- TMaybe<i64> Count;
- TMaybe<TString> Value;
- TEntry(TString name, TMaybe<i64> sum, TMaybe<i64> max, TMaybe<i64> min, TMaybe<i64> avg, TMaybe<i64> count)
- : Name(std::move(name))
- , Sum(std::move(sum))
- , Max(std::move(max))
- , Min(std::move(min))
- , Avg(std::move(avg))
- , Count(std::move(count))
- {
- }
- TEntry(TString name, TString value)
- : Name(std::move(name))
- , Value(std::move(value))
- {
- }
- };
- TVector<TEntry> Entries;
- };
- using TStatWriter = std::function<void(ui32, const TVector<TOperationStatistics::TEntry>&)>;
- using TOperationProgressWriter = std::function<void(const TOperationProgress&)>;
- inline TStatWriter ThreadSafeStatWriter(TStatWriter base) {
- struct TState : public TThrRefBase {
- TStatWriter Base;
- TMutex Mutex;
- };
- auto state = MakeIntrusive<TState>();
- state->Base = base;
- return [state](ui32 id, const TVector<TOperationStatistics::TEntry>& stat) {
- with_lock(state->Mutex) {
- state->Base(id, stat);
- }
- };
- }
- inline void NullProgressWriter(const TOperationProgress& progress) {
- Y_UNUSED(progress);
- }
- inline TOperationProgressWriter ChainProgressWriters(TOperationProgressWriter left, TOperationProgressWriter right) {
- return [=](const TOperationProgress& progress) {
- left(progress);
- right(progress);
- };
- }
- inline TOperationProgressWriter ThreadSafeProgressWriter(TOperationProgressWriter base) {
- struct TState : public TThrRefBase {
- TOperationProgressWriter Base;
- TMutex Mutex;
- };
- auto state = MakeIntrusive<TState>();
- state->Base = base;
- return [state](const TOperationProgress& progress) {
- with_lock(state->Mutex) {
- state->Base(progress);
- }
- };
- }
- TAutoPtr<IGraphTransformer> CreateCheckExecutionTransformer(const TTypeAnnotationContext& types, bool checkWorld = true);
- TAutoPtr<IGraphTransformer> CreateExecutionTransformer(TTypeAnnotationContext& types, TOperationProgressWriter writer, bool withFinalize = true);
- IGraphTransformer::TStatus RequireChild(const TExprNode& node, ui32 index);
- }
|