yql_data_provider.h 10 KB


  1. #pragma once
  2. #include "yql_graph_transformer.h"
  3. #include <yql/essentials/core/sql_types/yql_callable_names.h>
  4. #include <yql/essentials/public/udf/udf_validate.h>
  5. #include <library/cpp/yson/writer.h>
  6. #include <util/generic/maybe.h>
  7. #include <util/generic/set.h>
  8. #include <util/generic/hash_set.h>
  9. #include <util/generic/string.h>
  10. #include <functional>
  11. class IRandomProvider;
  12. class ITimeProvider;
  13. namespace NKikimr {
  14. namespace NMiniKQL {
  15. class IFunctionRegistry;
  16. }
  17. }
  18. namespace NYql {
  19. struct TPinInfo {
  20. const TExprNode* DataSource;
  21. const TExprNode* DataSink;
  22. const TExprNode* Key;
  23. TString DisplayName;
  24. bool HideInBasicPlan;
  25. TPinInfo(const TExprNode* dataSource, const TExprNode* dataSink,
  26. const TExprNode* key, const TString& displayName, bool hideInBasicPlan)
  27. : DataSource(dataSource)
  28. , DataSink(dataSink)
  29. , Key(key)
  30. , DisplayName(displayName)
  31. , HideInBasicPlan(hideInBasicPlan)
  32. {}
  33. };
  34. class IPlanFormatter {
  35. public:
  36. virtual ~IPlanFormatter() {}
  37. virtual void WriteDetails(const TExprNode& node, NYson::TYsonWriter& writer) = 0;
  38. // returns visibility of node
  39. virtual bool GetDependencies(const TExprNode& node, TExprNode::TListType& children, bool compact) = 0;
  40. virtual void GetResultDependencies(const TExprNode::TPtr& node, TExprNode::TListType& children, bool compact) = 0;
  41. // returns full number of inputs
  42. virtual ui32 GetInputs(const TExprNode& node, TVector<TPinInfo>& inputs, bool withLimits) = 0;
  43. // returns full number of outputs
  44. virtual ui32 GetOutputs(const TExprNode& node, TVector<TPinInfo>& outputs, bool withLimits) = 0;
  45. virtual TString GetProviderPath(const TExprNode& node) = 0;
  46. virtual void WritePlanDetails(const TExprNode& node, NYson::TYsonWriter& writer, bool withLimits) = 0;
  47. virtual void WritePullDetails(const TExprNode& node, NYson::TYsonWriter& writer) = 0;
  48. virtual void WritePinDetails(const TExprNode& node, NYson::TYsonWriter& writer) = 0;
  49. virtual TString GetOperationDisplayName(const TExprNode& node) = 0;
  50. // returns false if provider schemas aren't supported
  51. virtual bool WriteSchemaHeader(NYson::TYsonWriter& writer) = 0;
  52. virtual void WriteTypeDetails(NYson::TYsonWriter& writer, const TTypeAnnotationNode& type) = 0;
  53. };
  54. class ITrackableNodeProcessor {
  55. public:
  56. virtual ~ITrackableNodeProcessor() = default;
  57. struct TExprNodeAndId
  58. {
  59. TExprNode::TPtr Node;
  60. TString Id;
  61. };
  62. virtual void GetUsedNodes(const TExprNode& node, TVector<TString>& usedNodeIds) = 0;
  63. virtual void GetCreatedNodes(const TExprNode& node, TVector<TExprNodeAndId>& createdNodes, TExprContext& ctx) = 0;
  64. virtual IGraphTransformer& GetCleanupTransformer() = 0;
  65. };
  66. class IDqIntegration;
  67. class IDqOptimization;
  68. class IOptimizationContext;
  69. class IDataProvider : public TThrRefBase {
  70. public:
  71. virtual ~IDataProvider() {}
  72. virtual TStringBuf GetName() const = 0;
  73. enum class EResultFormat {
  74. Yson,
  75. Custom,
  76. Skiff
  77. };
  78. // settings for result data provider
  79. struct TFillSettings {
  80. TMaybe<ui64> AllResultsBytesLimit = 100000;
  81. TMaybe<ui64> RowsLimitPerWrite = 1000; // only if list is written
  82. EResultFormat Format;
  83. TString FormatDetails;
  84. bool Discard = false;
  85. };
  86. virtual bool Initialize(TExprContext& ctx) = 0;
  87. //-- configuration
  88. virtual IGraphTransformer& GetConfigurationTransformer() = 0;
  89. virtual TExprNode::TPtr GetClusterInfo(const TString& cluster, TExprContext& ctx) = 0;
  90. virtual const THashMap<TString, TString>* GetClusterTokens() = 0;
  91. virtual void AddCluster(const TString& name, const THashMap<TString, TString>& properties) = 0;
  92. //-- discovery & rewrite
  93. virtual IGraphTransformer& GetIODiscoveryTransformer() = 0;
  94. //-- assign epochs
  95. virtual IGraphTransformer& GetEpochsTransformer() = 0;
  96. //-- intent determination
  97. virtual IGraphTransformer& GetIntentDeterminationTransformer() = 0;
  98. //-- type check
  99. virtual bool ValidateParameters(TExprNode& node, TExprContext& ctx, TMaybe<TString>& cluster) = 0;
  100. virtual bool CanParse(const TExprNode& node) = 0;
  101. virtual IGraphTransformer& GetTypeAnnotationTransformer(bool instantOnly) = 0;
  102. virtual IGraphTransformer& GetConstraintTransformer(bool instantOnly, bool subGraph) = 0;
  103. // Fill set of callables, which have world as first child and should be trimmed in evaluation
  104. virtual void FillModifyCallables(THashSet<TStringBuf>& callables) = 0;
  105. //-- optimizations
  106. virtual TExprNode::TPtr RewriteIO(const TExprNode::TPtr& node, TExprContext& ctx) = 0;
  107. virtual IGraphTransformer& GetRecaptureOptProposalTransformer() = 0;
  108. virtual IGraphTransformer& GetStatisticsProposalTransformer() = 0;
  109. virtual IGraphTransformer& GetLogicalOptProposalTransformer() = 0;
  110. virtual IGraphTransformer& GetPhysicalOptProposalTransformer() = 0;
  111. virtual IGraphTransformer& GetPhysicalFinalizingTransformer() = 0;
  112. virtual void PostRewriteIO() = 0;
  113. virtual void Reset() = 0;
  114. //-- metadata loading
  115. virtual IGraphTransformer& GetLoadTableMetadataTransformer() = 0;
  116. // This function is used in core optimizers to check either the node can be used as input multiple times or not
  117. virtual bool IsPersistent(const TExprNode& node) = 0;
  118. virtual bool IsRead(const TExprNode& node) = 0;
  119. virtual bool IsWrite(const TExprNode& node) = 0;
  120. // Right! or worlds are written to syncList
  121. virtual bool CanBuildResult(const TExprNode& node, TSyncMap& syncList) = 0;
  122. virtual bool CanPullResult(const TExprNode& node, TSyncMap& syncList, bool& canRef) = 0;
  123. virtual bool GetExecWorld(const TExprNode::TPtr& node, TExprNode::TPtr& root) = 0;
  124. virtual bool CanEvaluate(const TExprNode& node) = 0;
  125. virtual void EnterEvaluation(ui64 id) = 0;
  126. virtual void LeaveEvaluation(ui64 id) = 0;
  127. virtual TExprNode::TPtr CleanupWorld(const TExprNode::TPtr& node, TExprContext& ctx) = 0;
  128. virtual TExprNode::TPtr OptimizePull(const TExprNode::TPtr& source, const TFillSettings& fillSettings, TExprContext& ctx,
  129. IOptimizationContext& optCtx) = 0;
  130. //-- execution
  131. virtual bool CanExecute(const TExprNode& node) = 0;
  132. virtual bool ValidateExecution(const TExprNode& node, TExprContext& ctx) = 0;
  133. virtual void GetRequiredChildren(const TExprNode& node, TExprNode::TListType& children) = 0;
  134. virtual IGraphTransformer& GetCallableExecutionTransformer() = 0;
  135. //-- finalizing
  136. virtual IGraphTransformer& GetFinalizingTransformer() = 0;
  137. virtual bool CollectDiagnostics(NYson::TYsonWriter& writer) = 0;
  138. virtual bool GetTasksInfo(NYson::TYsonWriter& writer) = 0;
  139. virtual bool CollectStatistics(NYson::TYsonWriter& writer, bool totalOnly) = 0;
  140. virtual bool CollectDiscoveredData(NYson::TYsonWriter& writer) = 0;
  141. //-- plan
  142. virtual IGraphTransformer& GetPlanInfoTransformer() = 0;
  143. virtual IPlanFormatter& GetPlanFormatter() = 0;
  144. //-- garbage collection
  145. virtual ITrackableNodeProcessor& GetTrackableNodeProcessor() = 0;
  146. // DQ
  147. virtual IDqIntegration* GetDqIntegration() = 0;
  148. virtual IDqOptimization* GetDqOptimization() = 0;
  149. };
  150. struct IPipelineConfigurator;
  151. struct TTypeAnnotationContext;
  152. struct TResultProviderConfig;
  153. struct TYqlOperationOptions;
  154. struct TOperationProgress;
  155. class TGatewaysConfig;
  156. using TOperationProgressWriter = std::function<void(const TOperationProgress&)>;
  157. enum class ESourceSyntax {
  158. Unknown,
  159. Sql,
  160. Yql
  161. };
  162. struct TDataProviderInfo {
  163. using TFutureStatus = NThreading::TFuture<IGraphTransformer::TStatus>;
  164. THashSet<TString> Names;
  165. TIntrusivePtr<IDataProvider> Source;
  166. TIntrusivePtr<IDataProvider> Sink;
  167. bool SupportFullResultDataSink = false;
  168. bool WaitForActiveProcesses = true;
  169. bool SupportsHidden = false;
  170. std::function<TMaybe<TString>(const TMaybe<TSet<TString>>& usedClusters, const TMaybe<TSet<TString>>& usedProviders,
  171. ESourceSyntax syntax)> RemoteClusterProvider;
  172. std::function<TFutureStatus(const TString& cluster, ESourceSyntax sourceSyntax, const TString& sourceCode,
  173. TExprContext& ctx)> RemoteValidate;
  174. std::function<TFutureStatus(const TString& cluster,
  175. ESourceSyntax sourceSyntax, const TString& sourceCode,
  176. const IPipelineConfigurator* pipelineConf,
  177. TIntrusivePtr<TTypeAnnotationContext> typeCtx,
  178. TExprNode::TPtr& root, TExprContext& ctx,
  179. TMaybe<TString>& externalQueryAst, TMaybe<TString>& externalQueryPlan)> RemoteOptimize;
  180. std::function<TFutureStatus(const TString& cluster,
  181. ESourceSyntax sourceSyntax, const TString& sourceCode,
  182. const NYson::EYsonFormat& outputFormat, const NYson::EYsonFormat& resultFormat,
  183. const IPipelineConfigurator* pipelineConf,
  184. TIntrusivePtr<TTypeAnnotationContext> typeCtx,
  185. TExprNode::TPtr& root, TExprContext& ctx,
  186. TMaybe<TString>& externalQueryAst, TMaybe<TString>& externalQueryPlan, TMaybe<TString>& externalDiagnostics,
  187. TIntrusivePtr<TResultProviderConfig> resultProviderConfig)> RemoteRun;
  188. std::function<NThreading::TFuture<void>(const TString& sessionId, const TString& username,
  189. const TOperationProgressWriter& progressWriter, const TYqlOperationOptions& operationOptions,
  190. TIntrusivePtr<IRandomProvider> randomProvider, TIntrusivePtr<ITimeProvider> timeProvider)> OpenSession;
  191. std::function<bool()> HasActiveProcesses;
  192. // COMPAT(gritukan): Remove it after Arcadia migration.
  193. std::function<void(const TString& sessionId)> CloseSession;
  194. std::function<void(const TString& sessionId)> CleanupSession;
  195. std::function<NThreading::TFuture<void>(const TString& sessionId)> CloseSessionAsync;
  196. std::function<NThreading::TFuture<void>(const TString& sessionId)> CleanupSessionAsync;
  197. std::function<TString(const TString& url, const TString& alias)> TokenResolver;
  198. };
  199. using THiddenQueryAborter = std::function<void()>; // aborts hidden query, which is running within a separate TProgram
  200. class TQContext;
  201. using TDataProviderInitializer = std::function<TDataProviderInfo(
  202. const TString& userName,
  203. const TString& sessionId,
  204. const TGatewaysConfig* gatewaysConfig,
  205. const NKikimr::NMiniKQL::IFunctionRegistry* functionRegistry,
  206. TIntrusivePtr<IRandomProvider> randomProvider,
  207. TIntrusivePtr<TTypeAnnotationContext> typeCtx,
  208. const TOperationProgressWriter& progressWriter,
  209. const TYqlOperationOptions& operationOptions,
  210. THiddenQueryAborter hiddenAborter,
  211. const TQContext& qContext)>;
  212. } // namespace NYql