#pragma once #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include namespace NKikimr { namespace NMiniKQL { class IFunctionRegistry; } } namespace NYql { class TProgram; using TProgramPtr = TIntrusivePtr; class TProgramFactory; using TProgramFactoryPtr = TIntrusivePtr; /////////////////////////////////////////////////////////////////////////////// // TProgramFactory /////////////////////////////////////////////////////////////////////////////// class TProgramFactory: public TThrRefBase, private TMoveOnly { public: TProgramFactory( bool useRepeatableRandomAndTimeProviders, const NKikimr::NMiniKQL::IFunctionRegistry* functionRegistry, ui64 nextUniqueId, const TVector& dataProvidersInit, const TString& runner); void AddUserDataTable(const TUserDataTable& userDataTable); void SetCredentials(TCredentials::TPtr credentials); void SetGatewaysConfig(const TGatewaysConfig* gatewaysConfig); void SetModules(IModuleResolver::TPtr modules); void SetUrlListerManager(IUrlListerManagerPtr urlListerManager); void SetUdfResolver(IUdfResolver::TPtr udfResolver); void SetUdfIndex(TUdfIndex::TPtr udfIndex, TUdfIndexPackageSet::TPtr udfIndexPackageSet); void SetFileStorage(TFileStoragePtr fileStorage); void SetUrlPreprocessing(IUrlPreprocessing::TPtr urlPreprocessing); void EnableRangeComputeFor(); void SetArrowResolver(IArrowResolver::TPtr arrowResolver); TProgramPtr Create( const TFile& file, const TString& sessionId = TString(), const TQContext& qContext = {}, TMaybe gatewaysForMerge = {}); TProgramPtr Create( const TString& filename, const TString& sourceCode, const TString& sessionId = TString(), EHiddenMode hiddenMode = EHiddenMode::Disable, const TQContext& qContext = {}, TMaybe gatewaysForMerge = {}); void UnrepeatableRandom(); private: const bool UseRepeatableRandomAndTimeProviders_; bool UseUnrepeatableRandom = false; const NKikimr::NMiniKQL::IFunctionRegistry* FunctionRegistry_; const ui64 NextUniqueId_; TVector DataProvidersInit_; TUserDataTable UserDataTable_; TCredentials::TPtr Credentials_; const TGatewaysConfig* GatewaysConfig_; IModuleResolver::TPtr Modules_; IUrlListerManagerPtr UrlListerManager_; IUdfResolver::TPtr UdfResolver_; TUdfIndex::TPtr UdfIndex_; TUdfIndexPackageSet::TPtr UdfIndexPackageSet_; TFileStoragePtr FileStorage_; IUrlPreprocessing::TPtr UrlPreprocessing_; TString Runner_; bool EnableRangeComputeFor_ = false; IArrowResolver::TPtr ArrowResolver_; }; /////////////////////////////////////////////////////////////////////////////// // TProgram /////////////////////////////////////////////////////////////////////////////// class TProgram: public TThrRefBase, private TNonCopyable { public: friend TProgramFactory; using TStatus = IGraphTransformer::TStatus; using TFutureStatus = NThreading::TFuture; public: ~TProgram(); void AddCredentials(const TVector>& credentials); void ClearCredentials(); void AddUserDataTable(const TUserDataTable& userDataTable); bool ParseYql(); bool ParseSql(); bool ParseSql(const NSQLTranslation::TTranslationSettings& settings); bool Compile(const TString& username, bool skipLibraries = false); TStatus Discover(const TString& username); TFutureStatus DiscoverAsync(const TString& username); TStatus Lineage(const TString& username, IOutputStream* traceOut = nullptr, IOutputStream* exprOut = nullptr, bool withTypes = false); TFutureStatus LineageAsync(const TString& username, IOutputStream* traceOut = nullptr, IOutputStream* exprOut = nullptr, bool withTypes = false); TStatus Validate(const TString& username, IOutputStream* exprOut = nullptr, bool withTypes = false); TFutureStatus ValidateAsync(const TString& username, IOutputStream* exprOut = nullptr, bool withTypes = false); TStatus Optimize( const TString& username, IOutputStream* traceOut = nullptr, IOutputStream* tracePlan = nullptr, IOutputStream* exprOut = nullptr, bool withTypes = false); TFutureStatus OptimizeAsync( const TString& username, IOutputStream* traceOut = nullptr, IOutputStream* tracePlan = nullptr, IOutputStream* exprOut = nullptr, bool withTypes = false); TStatus Run( const TString& username, IOutputStream* traceOut = nullptr, IOutputStream* tracePlan = nullptr, IOutputStream* exprOut = nullptr, bool withTypes = false); TFutureStatus RunAsync( const TString& username, IOutputStream* traceOut = nullptr, IOutputStream* tracePlan = nullptr, IOutputStream* exprOut = nullptr, bool withTypes = false); TStatus LineageWithConfig( const TString& username, const IPipelineConfigurator& pipelineConf); TFutureStatus LineageAsyncWithConfig( const TString& username, const IPipelineConfigurator& pipelineConf); TStatus OptimizeWithConfig( const TString& username, const IPipelineConfigurator& pipelineConf); TFutureStatus OptimizeAsyncWithConfig( const TString& username, const IPipelineConfigurator& pipelineConf); TStatus RunWithConfig( const TString& username, const IPipelineConfigurator& pipelineConf); TFutureStatus RunAsyncWithConfig( const TString& username, const IPipelineConfigurator& pipelineConf); TFutureStatus ContinueAsync(); bool HasActiveProcesses(); bool NeedWaitForActiveProcesses(); [[nodiscard]] NThreading::TFuture Abort(); TIssues Issues() const; TIssues CompletedIssues() const; void FinalizeIssues(); void Print(IOutputStream* exprOut, IOutputStream* planOut, bool cleanPlan = false); inline void PrintErrorsTo(IOutputStream& out) const { Issues().PrintWithProgramTo(out, Filename_, SourceCode_); } inline const TAstNode* AstRoot() const { return AstRoot_; } inline const TExprNode::TPtr& ExprRoot() const { return ExprRoot_; } inline TExprContext& ExprCtx() const { return *ExprCtx_; } inline bool HasResults() const { return ResultProviderConfig_ && !ResultProviderConfig_->CommittedResults.empty(); } inline const TVector& Results() const { return ResultProviderConfig_->CommittedResults; } TMaybe GetQueryAst(TMaybe memoryLimit = {}); TMaybe GetQueryPlan(const TPlanSettings& settings = {}); void SetDiagnosticFormat(NYson::EYsonFormat format) { DiagnosticFormat_ = format; } void SetResultType(IDataProvider::EResultFormat type) { ResultType_ = type; } TMaybe GetDiagnostics(); IGraphTransformer::TStatistics GetRawDiagnostics(); TMaybe GetTasksInfo(); TMaybe GetStatistics(bool totalOnly = false, THashMap extraYsons = {}); TMaybe GetDiscoveredData(); TMaybe GetLineage(); TString ResultsAsString() const; void ConfigureYsonResultFormat(NYson::EYsonFormat format); inline IOutputStream* ExprStream() const { return ExprStream_; } inline IOutputStream* PlanStream() const { return PlanStream_; } NYson::EYsonFormat GetResultFormat() const { return ResultFormat_; } NYson::EYsonFormat GetOutputFormat() const { return OutputFormat_; } void SetValidateOptions(NUdf::EValidateMode validateMode); void SetDisableNativeUdfSupport(bool disable); void SetUseTableMetaFromGraph(bool use); void SetProgressWriter(TOperationProgressWriter writer) { Y_ENSURE(!TypeCtx_, "TypeCtx_ already created"); ProgressWriter_ = ThreadSafeProgressWriter(writer); } void SetAuthenticatedUser(const TString& user) { Y_ENSURE(!TypeCtx_, "TypeCtx_ already created"); OperationOptions_.AuthenticatedUser = user; } void SetOperationId(const TString& id) { Y_ENSURE(!TypeCtx_, "TypeCtx_ already created"); OperationOptions_.Id = id; } void SetSharedOperationId(const TString& id) { Y_ENSURE(!TypeCtx_, "TypeCtx_ already created"); OperationOptions_.SharedId = id; } void SetOperationTitle(const TString& title) { Y_ENSURE(!TypeCtx_, "TypeCtx_ already created"); if (!title.Contains("YQL")) { ythrow yexception() << "Please mention YQL in the title '" << title << "'"; } OperationOptions_.Title = title; } void SetOperationUrl(const TString& url) { Y_ENSURE(!TypeCtx_, "TypeCtx_ already created"); OperationOptions_.Url = url; } void SetQueryName(const TString& name) { Y_ENSURE(!TypeCtx_, "TypeCtx_ already created"); OperationOptions_.QueryName = name; } void SetOperationAttrsYson(const TString& attrs) { Y_ENSURE(!TypeCtx_, "TypeCtx_ already created"); OperationOptions_.AttrsYson = attrs; } void SetParametersYson(const TString& parameters); // should be used after Compile phase bool ExtractQueryParametersMetadata(); const TString& GetExtractedQueryParametersMetadataYson() const { return ExtractedQueryParametersMetadataYson_; } void EnableResultPosition() { SupportsResultPosition_ = true; } IPlanBuilder& GetPlanBuilder(); void SetAbortHidden(std::function&& func) { AbortHidden_ = std::move(func); } TMaybe> GetUsedClusters() { CollectUsedClusters(); return UsedClusters_; } private: TProgram( const NKikimr::NMiniKQL::IFunctionRegistry* functionRegistry, const TIntrusivePtr randomProvider, const TIntrusivePtr timeProvider, ui64 nextUniqueId, const TVector& dataProvidersInit, const TUserDataTable& userDataTable, const TCredentials::TPtr& credentials, const IModuleResolver::TPtr& modules, const IUrlListerManagerPtr& urlListerManager, const IUdfResolver::TPtr& udfResolver, const TUdfIndex::TPtr& udfIndex, const TUdfIndexPackageSet::TPtr& udfIndexPackageSet, const TFileStoragePtr& fileStorage, const IUrlPreprocessing::TPtr& urlPreprocessing, const TGatewaysConfig* gatewaysConfig, const TString& filename, const TString& sourceCode, const TString& sessionId, const TString& runner, bool enableRangeComputeFor, const IArrowResolver::TPtr& arrowResolver, EHiddenMode hiddenMode, const TQContext& qContext, TMaybe gatewaysForMerge); TTypeAnnotationContextPtr BuildTypeAnnotationContext(const TString& username); TTypeAnnotationContextPtr GetAnnotationContext() const; TTypeAnnotationContextPtr ProvideAnnotationContext(const TString& username); bool CollectUsedClusters(); NThreading::TFuture OpenSession(const TString& username); [[nodiscard]] NThreading::TFuture CleanupLastSession(); [[nodiscard]] NThreading::TFuture CloseLastSession(); TFutureStatus RemoteKikimrValidate(const TString& cluster); TFutureStatus RemoteKikimrOptimize(const TString& cluster, const IPipelineConfigurator* pipelineConf); TFutureStatus RemoteKikimrRun(const TString& cluster, const IPipelineConfigurator* pipelineConf); bool FillParseResult(NYql::TAstParseResult&& astRes, NYql::TWarningRules* warningRules = nullptr); TString GetSessionId() const; NThreading::TFuture AsyncTransformWithFallback(bool applyAsyncChanges); void SaveExprRoot(); private: std::optional CheckFallbackIssues(const TIssues& issues); void HandleSourceCode(TString& sourceCode); void HandleTranslationSettings(NSQLTranslation::TTranslationSettings& loadedSettings, NSQLTranslation::TTranslationSettings*& currentSettings); const NKikimr::NMiniKQL::IFunctionRegistry* FunctionRegistry_; const TIntrusivePtr RandomProvider_; const TIntrusivePtr TimeProvider_; const ui64 NextUniqueId_; TAstNode* AstRoot_; std::unique_ptr AstPool_; TAutoPtr ExprCtx_; TTypeAnnotationContextPtr TypeCtx_; const IModuleResolver::TPtr Modules_; TVector DataProvidersInit_; TAdaptiveLock DataProvidersLock_; TVector DataProviders_; TYqlOperationOptions OperationOptions_; TCredentials::TPtr Credentials_; const IUrlListerManagerPtr UrlListerManager_; IUdfResolver::TPtr UdfResolver_; const TUdfIndex::TPtr UdfIndex_; const TUdfIndexPackageSet::TPtr UdfIndexPackageSet_; const TFileStoragePtr FileStorage_; TUserDataTable SavedUserDataTable_; TUserDataStorage::TPtr UserDataStorage_; const TGatewaysConfig* GatewaysConfig_; TGatewaysConfig LoadedGatewaysConfig_; TString Filename_; TString SourceCode_; ESourceSyntax SourceSyntax_; ui16 SyntaxVersion_; TExprNode::TPtr ExprRoot_; TExprNode::TPtr SavedExprRoot_; mutable TAdaptiveLock SessionIdLock_; TString SessionId_; NThreading::TFuture CloseLastSessionFuture_; TAutoPtr PlanBuilder_; TAutoPtr Transformer_; TIntrusivePtr ResultProviderConfig_; bool SupportsResultPosition_ = false; IDataProvider::EResultFormat ResultType_; NYson::EYsonFormat ResultFormat_; NYson::EYsonFormat OutputFormat_; TMaybe DiagnosticFormat_; NUdf::EValidateMode ValidateMode_ = NUdf::EValidateMode::None; bool DisableNativeUdfSupport_ = false; bool UseTableMetaFromGraph_ = false; TMaybe> UsedClusters_; TMaybe> UsedProviders_; TMaybe ExternalQueryAst_; TMaybe ExternalQueryPlan_; TMaybe ExternalDiagnostics_; IOutputStream* ExprStream_ = nullptr; IOutputStream* PlanStream_ = nullptr; TOperationProgressWriter ProgressWriter_ = [](const TOperationProgress&) {}; TString ExtractedQueryParametersMetadataYson_; const bool EnableRangeComputeFor_; const IArrowResolver::TPtr ArrowResolver_; i64 FallbackCounter_ = 0; const EHiddenMode HiddenMode_ = EHiddenMode::Disable; THiddenQueryAborter AbortHidden_ = [](){}; TMaybe LineageStr_; TQContext QContext_; TMaybe GatewaysForMerge_; TIssues FinalIssues_; }; void UpdateSqlFlagsFromQContext(const TQContext& qContext, THashSet& flags); } // namspace NYql