yql_data_provider.h 11 KB


  1. #pragma once
  2. #include "yql_graph_transformer.h"
  3. #include <yql/essentials/core/sql_types/yql_callable_names.h>
  4. #include <yql/essentials/public/udf/udf_validate.h>
  5. #include <library/cpp/yson/writer.h>
  6. #include <util/generic/maybe.h>
  7. #include <util/generic/set.h>
  8. #include <util/generic/hash_set.h>
  9. #include <util/generic/string.h>
  10. #include <functional>
  11. class IRandomProvider;
  12. class ITimeProvider;
  13. namespace NKikimr {
  14. namespace NMiniKQL {
  15. class IFunctionRegistry;
  16. }
  17. }
  18. namespace NYql {
  19. struct TPinInfo {
  20. const TExprNode* DataSource;
  21. const TExprNode* DataSink;
  22. const TExprNode* Key;
  23. TString DisplayName;
  24. bool HideInBasicPlan;
  25. TPinInfo(const TExprNode* dataSource, const TExprNode* dataSink,
  26. const TExprNode* key, const TString& displayName, bool hideInBasicPlan)
  27. : DataSource(dataSource)
  28. , DataSink(dataSink)
  29. , Key(key)
  30. , DisplayName(displayName)
  31. , HideInBasicPlan(hideInBasicPlan)
  32. {}
  33. };
  34. class IPlanFormatter {
  35. public:
  36. virtual ~IPlanFormatter() {}
  37. virtual bool HasCustomPlan(const TExprNode& node) = 0;
  38. virtual void WriteDetails(const TExprNode& node, NYson::TYsonWriter& writer) = 0;
  39. // returns visibility of node
  40. virtual bool GetDependencies(const TExprNode& node, TExprNode::TListType& children, bool compact) = 0;
  41. virtual void GetResultDependencies(const TExprNode::TPtr& node, TExprNode::TListType& children, bool compact) = 0;
  42. // returns full number of inputs
  43. virtual ui32 GetInputs(const TExprNode& node, TVector<TPinInfo>& inputs, bool withLimits) = 0;
  44. // returns full number of outputs
  45. virtual ui32 GetOutputs(const TExprNode& node, TVector<TPinInfo>& outputs, bool withLimits) = 0;
  46. virtual TString GetProviderPath(const TExprNode& node) = 0;
  47. virtual void WritePlanDetails(const TExprNode& node, NYson::TYsonWriter& writer, bool withLimits) = 0;
  48. virtual void WritePullDetails(const TExprNode& node, NYson::TYsonWriter& writer) = 0;
  49. virtual void WritePinDetails(const TExprNode& node, NYson::TYsonWriter& writer) = 0;
  50. virtual TString GetOperationDisplayName(const TExprNode& node) = 0;
  51. // returns false if provider schemas aren't supported
  52. virtual bool WriteSchemaHeader(NYson::TYsonWriter& writer) = 0;
  53. virtual void WriteTypeDetails(NYson::TYsonWriter& writer, const TTypeAnnotationNode& type) = 0;
  54. };
  55. class ITrackableNodeProcessor {
  56. public:
  57. virtual ~ITrackableNodeProcessor() = default;
  58. struct TExprNodeAndId
  59. {
  60. TExprNode::TPtr Node;
  61. TString Id;
  62. };
  63. virtual void GetUsedNodes(const TExprNode& node, TVector<TString>& usedNodeIds) = 0;
  64. virtual void GetCreatedNodes(const TExprNode& node, TVector<TExprNodeAndId>& createdNodes, TExprContext& ctx) = 0;
  65. virtual IGraphTransformer& GetCleanupTransformer() = 0;
  66. };
  67. class IDqIntegration;
  68. class IDqOptimization;
  69. class IYtflowIntegration;
  70. class IYtflowOptimization;
  71. class IOptimizationContext;
  72. class IDataProvider : public TThrRefBase {
  73. public:
  74. virtual ~IDataProvider() {}
  75. virtual TStringBuf GetName() const = 0;
  76. enum class EResultFormat {
  77. Yson,
  78. Custom,
  79. Skiff
  80. };
  81. // settings for result data provider
  82. struct TFillSettings {
  83. TMaybe<ui64> AllResultsBytesLimit = 100000;
  84. TMaybe<ui64> RowsLimitPerWrite = 1000; // only if list is written
  85. EResultFormat Format;
  86. TString FormatDetails;
  87. bool Discard = false;
  88. };
  89. virtual bool Initialize(TExprContext& ctx) = 0;
  90. //-- configuration
  91. virtual IGraphTransformer& GetConfigurationTransformer() = 0;
  92. virtual TExprNode::TPtr GetClusterInfo(const TString& cluster, TExprContext& ctx) = 0;
  93. virtual const THashMap<TString, TString>* GetClusterTokens() = 0;
  94. virtual void AddCluster(const TString& name, const THashMap<TString, TString>& properties) = 0;
  95. //-- discovery & rewrite
  96. virtual IGraphTransformer& GetIODiscoveryTransformer() = 0;
  97. //-- assign epochs
  98. virtual IGraphTransformer& GetEpochsTransformer() = 0;
  99. //-- intent determination
  100. virtual IGraphTransformer& GetIntentDeterminationTransformer() = 0;
  101. //-- type check
  102. virtual bool ValidateParameters(TExprNode& node, TExprContext& ctx, TMaybe<TString>& cluster) = 0;
  103. virtual bool CanParse(const TExprNode& node) = 0;
  104. virtual IGraphTransformer& GetTypeAnnotationTransformer(bool instantOnly) = 0;
  105. virtual IGraphTransformer& GetConstraintTransformer(bool instantOnly, bool subGraph) = 0;
  106. // Fill set of callables, which have world as first child and should be trimmed in evaluation
  107. virtual void FillModifyCallables(THashSet<TStringBuf>& callables) = 0;
  108. //-- optimizations
  109. virtual TExprNode::TPtr RewriteIO(const TExprNode::TPtr& node, TExprContext& ctx) = 0;
  110. virtual IGraphTransformer& GetRecaptureOptProposalTransformer() = 0;
  111. virtual IGraphTransformer& GetStatisticsProposalTransformer() = 0;
  112. virtual IGraphTransformer& GetLogicalOptProposalTransformer() = 0;
  113. virtual IGraphTransformer& GetPhysicalOptProposalTransformer() = 0;
  114. virtual IGraphTransformer& GetPhysicalFinalizingTransformer() = 0;
  115. virtual void PostRewriteIO() = 0;
  116. virtual void Reset() = 0;
  117. //-- metadata loading
  118. virtual IGraphTransformer& GetLoadTableMetadataTransformer() = 0;
  119. // This function is used in core optimizers to check either the node can be used as input multiple times or not
  120. virtual bool IsPersistent(const TExprNode& node) = 0;
  121. virtual bool IsRead(const TExprNode& node) = 0;
  122. virtual bool IsWrite(const TExprNode& node) = 0;
  123. // Right! or worlds are written to syncList
  124. virtual bool CanBuildResult(const TExprNode& node, TSyncMap& syncList) = 0;
  125. virtual bool CanPullResult(const TExprNode& node, TSyncMap& syncList, bool& canRef) = 0;
  126. virtual bool GetExecWorld(const TExprNode::TPtr& node, TExprNode::TPtr& root) = 0;
  127. virtual bool CanEvaluate(const TExprNode& node) = 0;
  128. virtual void EnterEvaluation(ui64 id) = 0;
  129. virtual void LeaveEvaluation(ui64 id) = 0;
  130. virtual TExprNode::TPtr CleanupWorld(const TExprNode::TPtr& node, TExprContext& ctx) = 0;
  131. virtual TExprNode::TPtr OptimizePull(const TExprNode::TPtr& source, const TFillSettings& fillSettings, TExprContext& ctx,
  132. IOptimizationContext& optCtx) = 0;
  133. //-- execution
  134. virtual bool CanExecute(const TExprNode& node) = 0;
  135. virtual bool ValidateExecution(const TExprNode& node, TExprContext& ctx) = 0;
  136. virtual void GetRequiredChildren(const TExprNode& node, TExprNode::TListType& children) = 0;
  137. virtual IGraphTransformer& GetCallableExecutionTransformer() = 0;
  138. //-- finalizing
  139. virtual IGraphTransformer& GetFinalizingTransformer() = 0;
  140. virtual bool CollectDiagnostics(NYson::TYsonWriter& writer) = 0;
  141. virtual bool GetTasksInfo(NYson::TYsonWriter& writer) = 0;
  142. virtual bool CollectStatistics(NYson::TYsonWriter& writer, bool totalOnly) = 0;
  143. virtual bool CollectDiscoveredData(NYson::TYsonWriter& writer) = 0;
  144. //-- plan
  145. virtual IGraphTransformer& GetPlanInfoTransformer() = 0;
  146. virtual IPlanFormatter& GetPlanFormatter() = 0;
  147. //-- garbage collection
  148. virtual ITrackableNodeProcessor& GetTrackableNodeProcessor() = 0;
  149. // DQ
  150. virtual IDqIntegration* GetDqIntegration() = 0;
  151. virtual IDqOptimization* GetDqOptimization() = 0;
  152. // ytflow
  153. virtual IYtflowIntegration* GetYtflowIntegration() = 0;
  154. virtual IYtflowOptimization* GetYtflowOptimization() = 0;
  155. };
  156. struct IPipelineConfigurator;
  157. struct TTypeAnnotationContext;
  158. struct TResultProviderConfig;
  159. struct TYqlOperationOptions;
  160. struct TOperationProgress;
  161. class TGatewaysConfig;
  162. using TOperationProgressWriter = std::function<void(const TOperationProgress&)>;
  163. enum class ESourceSyntax {
  164. Unknown,
  165. Sql,
  166. Yql
  167. };
  168. struct TDataProviderInfo {
  169. using TFutureStatus = NThreading::TFuture<IGraphTransformer::TStatus>;
  170. THashSet<TString> Names;
  171. TIntrusivePtr<IDataProvider> Source;
  172. TIntrusivePtr<IDataProvider> Sink;
  173. bool SupportFullResultDataSink = false;
  174. bool WaitForActiveProcesses = true;
  175. bool SupportsHidden = false;
  176. std::function<TMaybe<TString>(const TMaybe<TSet<TString>>& usedClusters, const TMaybe<TSet<TString>>& usedProviders,
  177. ESourceSyntax syntax)> RemoteClusterProvider;
  178. std::function<TFutureStatus(const TString& cluster, ESourceSyntax sourceSyntax, const TString& sourceCode,
  179. TExprContext& ctx)> RemoteValidate;
  180. std::function<TFutureStatus(const TString& cluster,
  181. ESourceSyntax sourceSyntax, const TString& sourceCode,
  182. const IPipelineConfigurator* pipelineConf,
  183. TIntrusivePtr<TTypeAnnotationContext> typeCtx,
  184. TExprNode::TPtr& root, TExprContext& ctx,
  185. TMaybe<TString>& externalQueryAst, TMaybe<TString>& externalQueryPlan)> RemoteOptimize;
  186. std::function<TFutureStatus(const TString& cluster,
  187. ESourceSyntax sourceSyntax, const TString& sourceCode,
  188. const NYson::EYsonFormat& outputFormat, const NYson::EYsonFormat& resultFormat,
  189. const IPipelineConfigurator* pipelineConf,
  190. TIntrusivePtr<TTypeAnnotationContext> typeCtx,
  191. TExprNode::TPtr& root, TExprContext& ctx,
  192. TMaybe<TString>& externalQueryAst, TMaybe<TString>& externalQueryPlan, TMaybe<TString>& externalDiagnostics,
  193. TIntrusivePtr<TResultProviderConfig> resultProviderConfig)> RemoteRun;
  194. std::function<NThreading::TFuture<void>(const TString& sessionId, const TString& username,
  195. const TOperationProgressWriter& progressWriter, const TYqlOperationOptions& operationOptions,
  196. TIntrusivePtr<IRandomProvider> randomProvider, TIntrusivePtr<ITimeProvider> timeProvider)> OpenSession;
  197. std::function<bool()> HasActiveProcesses;
  198. // COMPAT(gritukan): Remove it after Arcadia migration.
  199. std::function<void(const TString& sessionId)> CloseSession;
  200. std::function<void(const TString& sessionId)> CleanupSession;
  201. std::function<NThreading::TFuture<void>(const TString& sessionId)> CloseSessionAsync;
  202. std::function<NThreading::TFuture<void>(const TString& sessionId)> CleanupSessionAsync;
  203. std::function<TString(const TString& url, const TString& alias)> TokenResolver;
  204. };
  205. using THiddenQueryAborter = std::function<void()>; // aborts hidden query, which is running within a separate TProgram
  206. class TQContext;
  207. using TDataProviderInitializer = std::function<TDataProviderInfo(
  208. const TString& userName,
  209. const TString& sessionId,
  210. const TGatewaysConfig* gatewaysConfig,
  211. const NKikimr::NMiniKQL::IFunctionRegistry* functionRegistry,
  212. TIntrusivePtr<IRandomProvider> randomProvider,
  213. TIntrusivePtr<TTypeAnnotationContext> typeCtx,
  214. const TOperationProgressWriter& progressWriter,
  215. const TYqlOperationOptions& operationOptions,
  216. THiddenQueryAborter hiddenAborter,
  217. const TQContext& qContext)>;
  218. } // namespace NYql