yql_execution.h 4.9 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161
  1. #pragma once
  2. #include "yql_data_provider.h"
  3. #include "yql_type_annotation.h"
  4. #include <yql/essentials/ast/yql_gc_nodes.h>
  5. #include <util/system/mutex.h>
  6. #ifndef YQL_OPERATION_STATISTICS_CUSTOM_FIELDS
  7. #define YQL_OPERATION_STATISTICS_CUSTOM_FIELDS
  8. #endif
  9. namespace NYql {
  10. struct TOperationProgress {
  11. #define YQL_OPERATION_PROGRESS_STATE_MAP(xx) \
  12. xx(Started, 0) \
  13. xx(InProgress, 1) \
  14. xx(Finished, 2) \
  15. xx(Failed, 3) \
  16. xx(Aborted, 4)
  17. enum class EState {
  18. YQL_OPERATION_PROGRESS_STATE_MAP(ENUM_VALUE_GEN)
  19. };
  20. #define YQL_OPERATION_BLOCK_STATUS_MAP(xx) \
  21. xx(None, 0) \
  22. xx(Partial, 1) \
  23. xx(Full, 2)
  24. enum class EOpBlockStatus {
  25. YQL_OPERATION_BLOCK_STATUS_MAP(ENUM_VALUE_GEN)
  26. };
  27. TString Category;
  28. ui32 Id;
  29. EState State;
  30. TMaybe<EOpBlockStatus> BlockStatus;
  31. using TStage = std::pair<TString, TInstant>;
  32. TStage Stage;
  33. TString RemoteId;
  34. THashMap<TString, TString> RemoteData;
  35. struct TCounters {
  36. ui64 Completed = 0ULL;
  37. ui64 Running = 0ULL;
  38. ui64 Total = 0ULL;
  39. ui64 Aborted = 0ULL;
  40. ui64 Failed = 0ULL;
  41. ui64 Lost = 0ULL;
  42. ui64 Pending = 0ULL;
  43. THashMap<TString, i64> Custom = {};
  44. bool operator==(const TCounters& rhs) const noexcept {
  45. return Completed == rhs.Completed &&
  46. Running == rhs.Running &&
  47. Total == rhs.Total &&
  48. Aborted == rhs.Aborted &&
  49. Failed == rhs.Failed &&
  50. Lost == rhs.Lost &&
  51. Pending == rhs.Pending &&
  52. Custom == rhs.Custom;
  53. }
  54. bool operator!=(const TCounters& rhs) const noexcept {
  55. return !operator==(rhs);
  56. }
  57. };
  58. TMaybe<TCounters> Counters;
  59. TOperationProgress(const TString& category, ui32 id,
  60. EState state, const TString& stage = "")
  61. : Category(category)
  62. , Id(id)
  63. , State(state)
  64. , Stage(stage, TInstant::Now())
  65. {
  66. }
  67. };
  68. struct TOperationStatistics {
  69. struct TEntry {
  70. TString Name;
  71. TMaybe<i64> Sum;
  72. TMaybe<i64> Max;
  73. TMaybe<i64> Min;
  74. TMaybe<i64> Avg;
  75. TMaybe<i64> Count;
  76. TMaybe<TString> Value;
  77. TEntry(TString name, TMaybe<i64> sum, TMaybe<i64> max, TMaybe<i64> min, TMaybe<i64> avg, TMaybe<i64> count)
  78. : Name(std::move(name))
  79. , Sum(std::move(sum))
  80. , Max(std::move(max))
  81. , Min(std::move(min))
  82. , Avg(std::move(avg))
  83. , Count(std::move(count))
  84. {
  85. }
  86. TEntry(TString name, TString value)
  87. : Name(std::move(name))
  88. , Value(std::move(value))
  89. {
  90. }
  91. };
  92. TVector<TEntry> Entries;
  93. };
  94. using TStatWriter = std::function<void(ui32, const TVector<TOperationStatistics::TEntry>&)>;
  95. using TOperationProgressWriter = std::function<void(const TOperationProgress&)>;
  96. inline TStatWriter ThreadSafeStatWriter(TStatWriter base) {
  97. struct TState : public TThrRefBase {
  98. TStatWriter Base;
  99. TMutex Mutex;
  100. };
  101. auto state = MakeIntrusive<TState>();
  102. state->Base = base;
  103. return [state](ui32 id, const TVector<TOperationStatistics::TEntry>& stat) {
  104. with_lock(state->Mutex) {
  105. state->Base(id, stat);
  106. }
  107. };
  108. }
  109. inline void NullProgressWriter(const TOperationProgress& progress) {
  110. Y_UNUSED(progress);
  111. }
  112. inline TOperationProgressWriter ChainProgressWriters(TOperationProgressWriter left, TOperationProgressWriter right) {
  113. return [=](const TOperationProgress& progress) {
  114. left(progress);
  115. right(progress);
  116. };
  117. }
  118. inline TOperationProgressWriter ThreadSafeProgressWriter(TOperationProgressWriter base) {
  119. struct TState : public TThrRefBase {
  120. TOperationProgressWriter Base;
  121. TMutex Mutex;
  122. };
  123. auto state = MakeIntrusive<TState>();
  124. state->Base = base;
  125. return [state](const TOperationProgress& progress) {
  126. with_lock(state->Mutex) {
  127. state->Base(progress);
  128. }
  129. };
  130. }
  131. TAutoPtr<IGraphTransformer> CreateCheckExecutionTransformer(const TTypeAnnotationContext& types, bool checkWorld = true);
  132. TAutoPtr<IGraphTransformer> CreateExecutionTransformer(TTypeAnnotationContext& types, TOperationProgressWriter writer, bool withFinalize = true);
  133. IGraphTransformer::TStatus RequireChild(const TExprNode& node, ui32 index);
  134. }