yql_facade.cpp 71 KB


  1. #include "yql_facade.h"
  2. #include <yql/essentials/core/yql_execution.h>
  3. #include <yql/essentials/core/yql_expr_csee.h>
  4. #include <yql/essentials/core/yql_expr_optimize.h>
  5. #include <yql/essentials/core/yql_expr_type_annotation.h>
  6. #include <yql/essentials/core/yql_opt_rewrite_io.h>
  7. #include <yql/essentials/core/yql_opt_proposed_by_data.h>
  8. #include <yql/essentials/core/yql_gc_transformer.h>
  9. #include <yql/essentials/core/type_ann/type_ann_expr.h>
  10. #include <yql/essentials/core/services/yql_plan.h>
  11. #include <yql/essentials/core/services/yql_eval_params.h>
  12. #include <yql/essentials/sql/sql.h>
  13. #include <yql/essentials/sql/v1/sql.h>
  14. //FIXME {
  15. #include <yql/essentials/sql/v1/lexer/antlr3/lexer.h>
  16. #include <yql/essentials/sql/v1/lexer/antlr3_ansi/lexer.h>
  17. #include <yql/essentials/sql/v1/proto_parser/antlr3/proto_parser.h>
  18. #include <yql/essentials/sql/v1/proto_parser/antlr3_ansi/proto_parser.h>
  19. //}
  20. #include <yql/essentials/sql/v1/lexer/antlr4/lexer.h>
  21. #include <yql/essentials/sql/v1/lexer/antlr4_ansi/lexer.h>
  22. #include <yql/essentials/sql/v1/proto_parser/antlr4/proto_parser.h>
  23. #include <yql/essentials/sql/v1/proto_parser/antlr4_ansi/proto_parser.h>
  24. #include <yql/essentials/parser/pg_wrapper/interface/parser.h>
  25. #include <yql/essentials/utils/log/context.h>
  26. #include <yql/essentials/utils/log/profile.h>
  27. #include <yql/essentials/utils/limiting_allocator.h>
  28. #include <yql/essentials/core/services/yql_out_transformers.h>
  29. #include <yql/essentials/core/extract_predicate/extract_predicate_dbg.h>
  30. #include <yql/essentials/providers/common/provider/yql_provider_names.h>
  31. #include <yql/essentials/providers/common/provider/yql_provider.h>
  32. #include <yql/essentials/providers/common/udf_resolve/yql_simple_udf_resolver.h>
  33. #include <yql/essentials/providers/common/udf_resolve/yql_outproc_udf_resolver.h>
  34. #include <yql/essentials/providers/common/udf_resolve/yql_udf_resolver_with_index.h>
  35. #include <yql/essentials/providers/common/arrow_resolve/yql_simple_arrow_resolver.h>
  36. #include <yql/essentials/providers/common/config/yql_setting.h>
  37. #include <yql/essentials/core/qplayer/udf_resolver/yql_qplayer_udf_resolver.h>
  38. #include <library/cpp/yson/node/node_io.h>
  39. #include <library/cpp/deprecated/split/split_iterator.h>
  40. #include <library/cpp/yson/writer.h>
  41. #include <library/cpp/string_utils/base64/base64.h>
  42. #include <util/stream/file.h>
  43. #include <util/stream/null.h>
  44. #include <util/string/join.h>
  45. #include <util/string/split.h>
  46. #include <util/generic/guid.h>
  47. #include <util/system/rusage.h>
  48. #include <util/generic/yexception.h>
  49. using namespace NThreading;
  50. namespace NYql {
  51. namespace {
  52. const size_t DEFAULT_AST_BUF_SIZE = 1024;
  53. const size_t DEFAULT_PLAN_BUF_SIZE = 1024;
  54. const TString FacadeComponent = "Facade";
  55. const TString SourceCodeLabel = "SourceCode";
  56. const TString GatewaysLabel = "Gateways";
  57. const TString ParametersLabel = "Parameters";
  58. const TString TranslationLabel = "Translation";
  59. const TString StaticUserFilesLabel = "UserFiles";
  60. const TString DynamicUserFilesLabel = "DynamicUserFiles";
  61. const TString StaticCredentialsLabel = "Credentials";
  62. const TString DynamicCredentialsLabel = "DynamicCredentials";
  63. class TUrlLoader : public IUrlLoader {
  64. public:
  65. TUrlLoader(TFileStoragePtr storage)
  66. : Storage_(storage)
  67. {}
  68. TString Load(const TString& url, const TString& token) override {
  69. auto file = Storage_->PutUrl(url, token);
  70. return TFileInput(file->GetPath()).ReadAll();
  71. }
  72. private:
  73. TFileStoragePtr Storage_;
  74. };
  75. template <typename... Params1, typename... Params2>
  76. TProgram::TStatus SyncExecution(
  77. TProgram* program,
  78. TProgram::TFutureStatus (TProgram::*method)(Params1...),
  79. Params2&&... params) {
  80. TProgram::TFutureStatus future =
  81. (program->*method)(std::forward<Params2>(params)...);
  82. YQL_ENSURE(future.Initialized());
  83. future.Wait();
  84. HandleFutureException(future);
  85. TProgram::TStatus status = future.GetValue();
  86. while (status == TProgram::TStatus::Async) {
  87. auto continueFuture = program->ContinueAsync();
  88. continueFuture.Wait();
  89. HandleFutureException(continueFuture);
  90. status = continueFuture.GetValue();
  91. }
  92. if (status == TProgram::TStatus::Error) {
  93. program->Print(program->ExprStream(), program->PlanStream());
  94. }
  95. return status;
  96. }
  97. std::function<TString(const TString&, const TString&)> BuildDefaultTokenResolver(TCredentials::TPtr credentials) {
  98. return [credentials](const TString& /*url*/, const TString& alias) -> TString {
  99. if (alias) {
  100. if (auto cred = credentials->FindCredential(TString("default_").append(alias))) {
  101. return cred->Content;
  102. }
  103. }
  104. return {};
  105. };
  106. }
  107. std::function<TString(const TString&, const TString&)> BuildCompositeTokenResolver(TVector<std::function<TString(const TString&, const TString&)>>&& children) {
  108. if (children.empty()) {
  109. return {};
  110. }
  111. if (children.size() == 1) {
  112. return std::move(children[0]);
  113. }
  114. return [children = std::move(children)](const TString& url, const TString& alias) -> TString {
  115. for (auto& c : children) {
  116. if (auto r = c(url, alias)) {
  117. return r;
  118. }
  119. }
  120. return {};
  121. };
  122. }
  123. } // namspace
  124. ///////////////////////////////////////////////////////////////////////////////
  125. // TProgramFactory
  126. ///////////////////////////////////////////////////////////////////////////////
  127. TProgramFactory::TProgramFactory(
  128. bool useRepeatableRandomAndTimeProviders,
  129. const NKikimr::NMiniKQL::IFunctionRegistry* functionRegistry,
  130. ui64 nextUniqueId,
  131. const TVector<TDataProviderInitializer>& dataProvidersInit,
  132. const TString& runner)
  133. : UseRepeatableRandomAndTimeProviders_(useRepeatableRandomAndTimeProviders)
  134. , FunctionRegistry_(functionRegistry)
  135. , NextUniqueId_(nextUniqueId)
  136. , DataProvidersInit_(dataProvidersInit)
  137. , Credentials_(MakeIntrusive<TCredentials>())
  138. , GatewaysConfig_(nullptr)
  139. , Runner_(runner)
  140. , ArrowResolver_(MakeSimpleArrowResolver(*functionRegistry))
  141. {
  142. }
  143. void TProgramFactory::UnrepeatableRandom() {
  144. UseUnrepeatableRandom = true;
  145. }
  146. void TProgramFactory::EnableRangeComputeFor() {
  147. EnableRangeComputeFor_ = true;
  148. }
  149. void TProgramFactory::AddUserDataTable(const TUserDataTable& userDataTable) {
  150. for (const auto& p : userDataTable) {
  151. if (!UserDataTable_.emplace(p).second) {
  152. ythrow yexception() << "UserDataTable already has user data block with key " << p.first;
  153. }
  154. }
  155. }
  156. void TProgramFactory::SetCredentials(TCredentials::TPtr credentials) {
  157. Credentials_ = std::move(credentials);
  158. }
  159. void TProgramFactory::SetGatewaysConfig(const TGatewaysConfig* gatewaysConfig) {
  160. GatewaysConfig_ = gatewaysConfig;
  161. }
  162. void TProgramFactory::SetModules(IModuleResolver::TPtr modules) {
  163. Modules_ = modules;
  164. }
  165. void TProgramFactory::SetUdfResolver(IUdfResolver::TPtr udfResolver) {
  166. UdfResolver_ = udfResolver;
  167. }
  168. void TProgramFactory::SetUdfIndex(TUdfIndex::TPtr udfIndex, TUdfIndexPackageSet::TPtr udfIndexPackageSet) {
  169. UdfIndex_ = std::move(udfIndex);
  170. UdfIndexPackageSet_ = std::move(udfIndexPackageSet);
  171. }
  172. void TProgramFactory::SetFileStorage(TFileStoragePtr fileStorage) {
  173. FileStorage_ = std::move(fileStorage);
  174. }
  175. void TProgramFactory::SetUrlPreprocessing(IUrlPreprocessing::TPtr urlPreprocessing) {
  176. UrlPreprocessing_ = std::move(urlPreprocessing);
  177. }
  178. void TProgramFactory::SetArrowResolver(IArrowResolver::TPtr arrowResolver) {
  179. ArrowResolver_ = arrowResolver;
  180. }
  181. void TProgramFactory::SetUrlListerManager(IUrlListerManagerPtr urlListerManager) {
  182. UrlListerManager_ = std::move(urlListerManager);
  183. }
  184. TProgramPtr TProgramFactory::Create(
  185. const TFile& file,
  186. const TString& sessionId,
  187. const TQContext& qContext,
  188. TMaybe<TString> gatewaysForMerge)
  189. {
  190. TString sourceCode = TFileInput(file).ReadAll();
  191. return Create(file.GetName(), sourceCode, sessionId, EHiddenMode::Disable, qContext, gatewaysForMerge);
  192. }
  193. TProgramPtr TProgramFactory::Create(
  194. const TString& filename,
  195. const TString& sourceCode,
  196. const TString& sessionId,
  197. EHiddenMode hiddenMode,
  198. const TQContext& qContext,
  199. TMaybe<TString> gatewaysForMerge)
  200. {
  201. auto randomProvider = UseRepeatableRandomAndTimeProviders_ && !UseUnrepeatableRandom && hiddenMode == EHiddenMode::Disable ?
  202. CreateDeterministicRandomProvider(1) : CreateDefaultRandomProvider();
  203. auto timeProvider = UseRepeatableRandomAndTimeProviders_ ?
  204. CreateDeterministicTimeProvider(10000000) : CreateDefaultTimeProvider();
  205. TUdfIndex::TPtr udfIndex = UdfIndex_ ? UdfIndex_->Clone() : nullptr;
  206. TUdfIndexPackageSet::TPtr udfIndexPackageSet = (UdfIndexPackageSet_ && hiddenMode == EHiddenMode::Disable) ? UdfIndexPackageSet_->Clone() : nullptr;
  207. IModuleResolver::TPtr moduleResolver = Modules_ ? Modules_->CreateMutableChild() : nullptr;
  208. IUrlListerManagerPtr urlListerManager = UrlListerManager_ ? UrlListerManager_->Clone() : nullptr;
  209. auto udfResolver = udfIndex ? NCommon::CreateUdfResolverWithIndex(udfIndex, UdfResolver_, FileStorage_) : UdfResolver_;
  210. // make UserDataTable_ copy here
  211. return new TProgram(FunctionRegistry_, randomProvider, timeProvider, NextUniqueId_, DataProvidersInit_,
  212. UserDataTable_, Credentials_, moduleResolver, urlListerManager,
  213. udfResolver, udfIndex, udfIndexPackageSet, FileStorage_, UrlPreprocessing_,
  214. GatewaysConfig_, filename, sourceCode, sessionId, Runner_, EnableRangeComputeFor_, ArrowResolver_, hiddenMode,
  215. qContext, gatewaysForMerge);
  216. }
  217. ///////////////////////////////////////////////////////////////////////////////
  218. // TProgram
  219. ///////////////////////////////////////////////////////////////////////////////
  220. TProgram::TProgram(
  221. const NKikimr::NMiniKQL::IFunctionRegistry* functionRegistry,
  222. const TIntrusivePtr<IRandomProvider> randomProvider,
  223. const TIntrusivePtr<ITimeProvider> timeProvider,
  224. ui64 nextUniqueId,
  225. const TVector<TDataProviderInitializer>& dataProvidersInit,
  226. const TUserDataTable& userDataTable,
  227. const TCredentials::TPtr& credentials,
  228. const IModuleResolver::TPtr& modules,
  229. const IUrlListerManagerPtr& urlListerManager,
  230. const IUdfResolver::TPtr& udfResolver,
  231. const TUdfIndex::TPtr& udfIndex,
  232. const TUdfIndexPackageSet::TPtr& udfIndexPackageSet,
  233. const TFileStoragePtr& fileStorage,
  234. const IUrlPreprocessing::TPtr& urlPreprocessing,
  235. const TGatewaysConfig* gatewaysConfig,
  236. const TString& filename,
  237. const TString& sourceCode,
  238. const TString& sessionId,
  239. const TString& runner,
  240. bool enableRangeComputeFor,
  241. const IArrowResolver::TPtr& arrowResolver,
  242. EHiddenMode hiddenMode,
  243. const TQContext& qContext,
  244. TMaybe<TString> gatewaysForMerge
  245. )
  246. : FunctionRegistry_(functionRegistry)
  247. , RandomProvider_(randomProvider)
  248. , TimeProvider_(timeProvider)
  249. , NextUniqueId_(nextUniqueId)
  250. , AstRoot_(nullptr)
  251. , Modules_(modules)
  252. , DataProvidersInit_(dataProvidersInit)
  253. , Credentials_(MakeIntrusive<NYql::TCredentials>(*credentials))
  254. , UrlListerManager_(urlListerManager)
  255. , UdfResolver_(udfResolver)
  256. , UdfIndex_(udfIndex)
  257. , UdfIndexPackageSet_(udfIndexPackageSet)
  258. , FileStorage_(fileStorage)
  259. , SavedUserDataTable_(userDataTable)
  260. , GatewaysConfig_(gatewaysConfig)
  261. , Filename_(filename)
  262. , SourceCode_(sourceCode)
  263. , SourceSyntax_(ESourceSyntax::Unknown)
  264. , SyntaxVersion_(0)
  265. , ExprRoot_(nullptr)
  266. , SessionId_(sessionId)
  267. , ResultType_(IDataProvider::EResultFormat::Yson)
  268. , ResultFormat_(NYson::EYsonFormat::Binary)
  269. , OutputFormat_(NYson::EYsonFormat::Pretty)
  270. , EnableRangeComputeFor_(enableRangeComputeFor)
  271. , ArrowResolver_(arrowResolver)
  272. , HiddenMode_(hiddenMode)
  273. , QContext_(qContext)
  274. , GatewaysForMerge_(gatewaysForMerge)
  275. {
  276. if (SessionId_.empty()) {
  277. SessionId_ = CreateGuidAsString();
  278. }
  279. if (QContext_.CanWrite()) {
  280. NYT::TNode credListNode = NYT::TNode::CreateList();
  281. Credentials_->ForEach([&](const TString name, const TCredential& cred) {
  282. credListNode.Add(NYT::TNode()
  283. ("Name", name)
  284. ("Category", cred.Category)
  285. ("Subcategory", cred.Subcategory));
  286. });
  287. auto credList = NYT::NodeToYsonString(credListNode, NYT::NYson::EYsonFormat::Binary);
  288. QContext_.GetWriter()->Put({FacadeComponent, StaticCredentialsLabel}, credList).GetValueSync();
  289. } else if (QContext_.CanRead()) {
  290. Credentials_ = MakeIntrusive<TCredentials>();
  291. Credentials_->SetUserCredentials({
  292. .OauthToken = "REPLAY_OAUTH",
  293. .BlackboxSessionIdCookie = "REPLAY_SESSIONID"
  294. });
  295. for (const auto& label : {StaticCredentialsLabel, DynamicCredentialsLabel}) {
  296. auto item = QContext_.GetReader()->Get({FacadeComponent, label}).GetValueSync();
  297. if (item) {
  298. auto node = NYT::NodeFromYsonString(item->Value);
  299. for (const auto& c : node.AsList()) {
  300. Credentials_->AddCredential(c["Name"].AsString(), TCredential(
  301. c["Category"].AsString(),c["Subcategory"].AsString(),"REPLAY"));
  302. }
  303. }
  304. }
  305. }
  306. if (QContext_.CanWrite() && !SavedUserDataTable_.empty()) {
  307. NYT::TNode userFilesNode = NYT::TNode::CreateList();
  308. for (const auto& p : SavedUserDataTable_) {
  309. userFilesNode.Add(p.first.Alias());
  310. }
  311. auto userFiles = NYT::NodeToYsonString(userFilesNode, NYT::NYson::EYsonFormat::Binary);
  312. QContext_.GetWriter()->Put({FacadeComponent, StaticUserFilesLabel}, userFiles).GetValueSync();
  313. } else if (QContext_.CanRead()) {
  314. SavedUserDataTable_.clear();
  315. for (const auto& label : {StaticUserFilesLabel, DynamicUserFilesLabel}) {
  316. auto item = QContext_.GetReader()->Get({FacadeComponent, label}).GetValueSync();
  317. if (item) {
  318. auto node = NYT::NodeFromYsonString(item->Value);
  319. for (const auto& alias : node.AsList()) {
  320. TUserDataBlock block;
  321. block.Type = EUserDataType::RAW_INLINE_DATA;
  322. YQL_ENSURE(SavedUserDataTable_.emplace(TUserDataKey::File(alias.AsString()), block).second);
  323. }
  324. }
  325. }
  326. }
  327. UserDataStorage_ = MakeIntrusive<TUserDataStorage>(fileStorage, SavedUserDataTable_, udfResolver, udfIndex);
  328. if (auto modules = dynamic_cast<TModuleResolver*>(Modules_.get())) {
  329. modules->AttachUserData(UserDataStorage_);
  330. modules->SetUrlLoader(new TUrlLoader(FileStorage_));
  331. modules->SetCredentials(Credentials_);
  332. if (QContext_) {
  333. modules->SetQContext(QContext_);
  334. }
  335. }
  336. if (UrlListerManager_) {
  337. UrlListerManager_->SetCredentials(Credentials_);
  338. UrlListerManager_->SetUrlPreprocessing(urlPreprocessing);
  339. }
  340. OperationOptions_.Runner = runner;
  341. UserDataStorage_->SetUrlPreprocessor(urlPreprocessing);
  342. if (QContext_) {
  343. UdfResolver_ = NCommon::WrapUdfResolverWithQContext(UdfResolver_, QContext_);
  344. if (QContext_.CanRead()) {
  345. auto item = QContext_.GetReader()->Get({FacadeComponent, GatewaysLabel}).GetValueSync();
  346. if (item) {
  347. YQL_ENSURE(LoadedGatewaysConfig_.ParseFromString(item->Value));
  348. if (GatewaysForMerge_) {
  349. YQL_ENSURE(LoadedGatewaysConfig_.MergeFromString(*GatewaysForMerge_));
  350. }
  351. GatewaysConfig_ = &LoadedGatewaysConfig_;
  352. }
  353. } else if (QContext_.CanWrite() && GatewaysConfig_) {
  354. TGatewaysConfig cleaned;
  355. if (GatewaysConfig_->HasYt()) {
  356. cleaned.MutableYt()->CopyFrom(GatewaysConfig_->GetYt());
  357. }
  358. if (GatewaysConfig_->HasFs()) {
  359. cleaned.MutableFs()->CopyFrom(GatewaysConfig_->GetFs());
  360. }
  361. if (GatewaysConfig_->HasYqlCore()) {
  362. cleaned.MutableYqlCore()->CopyFrom(GatewaysConfig_->GetYqlCore());
  363. }
  364. if (GatewaysConfig_->HasSqlCore()) {
  365. cleaned.MutableSqlCore()->CopyFrom(GatewaysConfig_->GetSqlCore());
  366. }
  367. if (GatewaysConfig_->HasDq()) {
  368. cleaned.MutableDq()->CopyFrom(GatewaysConfig_->GetDq());
  369. }
  370. auto data = cleaned.SerializeAsString();
  371. QContext_.GetWriter()->Put({FacadeComponent, GatewaysLabel}, data).GetValueSync();
  372. }
  373. if (QContext_.CanRead()) {
  374. auto item = QContext_.GetReader()->Get({FacadeComponent, ParametersLabel}).GetValueSync();
  375. if (item) {
  376. SetParametersYson(item->Value);
  377. }
  378. }
  379. }
  380. }
  381. TProgram::~TProgram() {
  382. try {
  383. CloseLastSession().GetValueSync();
  384. // stop all non complete execution before deleting TExprCtx
  385. with_lock (DataProvidersLock_) {
  386. DataProviders_.clear();
  387. }
  388. } catch (...) {
  389. Cerr << CurrentExceptionMessage() << Endl;
  390. }
  391. }
  392. void TProgram::ConfigureYsonResultFormat(NYson::EYsonFormat format) {
  393. ResultFormat_ = format;
  394. OutputFormat_ = format;
  395. }
  396. void TProgram::SetValidateOptions(NUdf::EValidateMode validateMode) {
  397. Y_ENSURE(!TypeCtx_, "TypeCtx_ already created");
  398. ValidateMode_ = validateMode;
  399. }
  400. void TProgram::SetDisableNativeUdfSupport(bool disable) {
  401. Y_ENSURE(!TypeCtx_, "TypeCtx_ already created");
  402. DisableNativeUdfSupport_ = disable;
  403. }
  404. void TProgram::SetUseTableMetaFromGraph(bool use) {
  405. Y_ENSURE(!TypeCtx_, "TypeCtx_ already created");
  406. UseTableMetaFromGraph_ = use;
  407. }
  408. TTypeAnnotationContextPtr TProgram::GetAnnotationContext() const {
  409. Y_ENSURE(TypeCtx_, "TypeCtx_ is not created");
  410. return TypeCtx_;
  411. }
  412. TTypeAnnotationContextPtr TProgram::ProvideAnnotationContext(const TString& username) {
  413. if (!TypeCtx_) {
  414. TypeCtx_ = BuildTypeAnnotationContext(username);
  415. TypeCtx_->OperationOptions = OperationOptions_;
  416. TypeCtx_->ValidateMode = ValidateMode_;
  417. TypeCtx_->DisableNativeUdfSupport = DisableNativeUdfSupport_;
  418. TypeCtx_->UseTableMetaFromGraph = UseTableMetaFromGraph_;
  419. }
  420. return TypeCtx_;
  421. }
  422. IPlanBuilder& TProgram::GetPlanBuilder() {
  423. if (!PlanBuilder_) {
  424. PlanBuilder_ = CreatePlanBuilder(*GetAnnotationContext());
  425. }
  426. return *PlanBuilder_;
  427. }
  428. void TProgram::SetParametersYson(const TString& parameters) {
  429. Y_ENSURE(!TypeCtx_, "TypeCtx_ already created");
  430. auto node = NYT::NodeFromYsonString(parameters);
  431. YQL_ENSURE(node.IsMap());
  432. for (const auto& x : node.AsMap()) {
  433. YQL_ENSURE(x.second.IsMap());
  434. YQL_ENSURE(x.second.HasKey("Data"));
  435. YQL_ENSURE(x.second.Size() == 1);
  436. }
  437. OperationOptions_.ParametersYson = node;
  438. if (auto modules = dynamic_cast<TModuleResolver*>(Modules_.get())) {
  439. modules->SetParameters(node);
  440. }
  441. if (UrlListerManager_) {
  442. UrlListerManager_->SetParameters(node);
  443. }
  444. if (QContext_.CanWrite()) {
  445. QContext_.GetWriter()->Put({FacadeComponent, ParametersLabel}, parameters).GetValueSync();
  446. }
  447. }
  448. bool TProgram::ExtractQueryParametersMetadata() {
  449. auto& types = *GetAnnotationContext();
  450. NYT::TNode params = NYT::TNode::CreateMap();
  451. Y_ENSURE(ExprCtx_);
  452. if (!ExtractParametersMetaAsYson(ExprRoot_, types, *ExprCtx_, params)) {
  453. return false;
  454. }
  455. ExtractedQueryParametersMetadataYson_ = NYT::NodeToYsonString(params, ResultFormat_);
  456. return true;
  457. }
  458. bool TProgram::FillParseResult(NYql::TAstParseResult&& astRes, NYql::TWarningRules* warningRules) {
  459. if (!astRes.Issues.Empty()) {
  460. if (!ExprCtx_) {
  461. ExprCtx_.Reset(new TExprContext(NextUniqueId_));
  462. }
  463. auto& iManager = ExprCtx_->IssueManager;
  464. if (warningRules) {
  465. for (auto warningRule: *warningRules) {
  466. iManager.AddWarningRule(warningRule);
  467. }
  468. }
  469. iManager.AddScope([this]() {
  470. TIssuePtr issueHolder = new TIssue();
  471. issueHolder->SetMessage(TStringBuilder() << "Parse " << SourceSyntax_);
  472. issueHolder->Severity = TSeverityIds::S_INFO;
  473. return issueHolder;
  474. });
  475. for (auto issue: astRes.Issues) {
  476. iManager.RaiseWarning(issue);
  477. }
  478. iManager.LeaveScope();
  479. }
  480. if (!astRes.IsOk()) {
  481. return false;
  482. }
  483. AstRoot_ = astRes.Root;
  484. AstPool_ = std::move(astRes.Pool);
  485. return true;
  486. }
  487. TString TProgram::GetSessionId() const {
  488. with_lock(SessionIdLock_) {
  489. return SessionId_;
  490. }
  491. }
  492. void TProgram::AddCredentials(const TVector<std::pair<TString, TCredential>>& credentials) {
  493. Y_ENSURE(!TypeCtx_, "TypeCtx_ already created");
  494. if (QContext_.CanWrite()) {
  495. NYT::TNode credListNode = NYT::TNode::CreateList();
  496. for (const auto& c : credentials) {
  497. credListNode.Add(NYT::TNode()
  498. ("Name", c.first)
  499. ("Category", c.second.Category)
  500. ("Subcategory", c.second.Subcategory));
  501. }
  502. auto credList = NYT::NodeToYsonString(credListNode, NYT::NYson::EYsonFormat::Binary);
  503. QContext_.GetWriter()->Put({FacadeComponent, DynamicCredentialsLabel}, credList).GetValueSync();
  504. }
  505. for (const auto& credential : credentials) {
  506. Credentials_->AddCredential(credential.first, credential.second);
  507. }
  508. if (auto modules = dynamic_cast<TModuleResolver*>(Modules_.get())) {
  509. modules->SetCredentials(Credentials_);
  510. }
  511. if (UrlListerManager_) {
  512. UrlListerManager_->SetCredentials(Credentials_);
  513. }
  514. }
  515. void TProgram::ClearCredentials() {
  516. Y_ENSURE(!TypeCtx_, "TypeCtx_ already created");
  517. Credentials_ = MakeIntrusive<TCredentials>();
  518. if (auto modules = dynamic_cast<TModuleResolver*>(Modules_.get())) {
  519. modules->SetCredentials(Credentials_);
  520. }
  521. if (UrlListerManager_) {
  522. UrlListerManager_->SetCredentials(Credentials_);
  523. }
  524. }
  525. void TProgram::AddUserDataTable(const TUserDataTable& userDataTable) {
  526. for (const auto& p : userDataTable) {
  527. if (!SavedUserDataTable_.emplace(p).second) {
  528. ythrow yexception() << "UserDataTable already has user data block with key " << p.first;
  529. }
  530. UserDataStorage_->AddUserDataBlock(p.first, p.second);
  531. }
  532. if (QContext_.CanWrite()) {
  533. NYT::TNode userFilesNode;
  534. for (const auto& p : userDataTable) {
  535. userFilesNode.Add(p.first.Alias());
  536. }
  537. auto userFiles = NYT::NodeToYsonString(userFilesNode, NYT::NYson::EYsonFormat::Binary);
  538. QContext_.GetWriter()->Put({FacadeComponent, DynamicUserFilesLabel}, userFiles).GetValueSync();
  539. }
  540. }
  541. void TProgram::HandleSourceCode(TString& sourceCode) {
  542. if (QContext_.CanWrite()) {
  543. QContext_.GetWriter()->Put({FacadeComponent, SourceCodeLabel}, sourceCode).GetValueSync();
  544. } else if (QContext_.CanRead()) {
  545. auto loaded = QContext_.GetReader()->Get({FacadeComponent, SourceCodeLabel}).GetValueSync();
  546. Y_ENSURE(loaded.Defined(), "No source code");
  547. sourceCode = loaded->Value;
  548. }
  549. }
  550. namespace {
  551. THashSet<TString> ExtractSqlFlags(const NYT::TNode& dataNode) {
  552. THashSet<TString> result;
  553. for (const auto& f : dataNode["SqlFlags"].AsList()) {
  554. result.insert(f.AsString());
  555. }
  556. return result;
  557. }
  558. } // namespace
  559. void UpdateSqlFlagsFromQContext(const TQContext& qContext, THashSet<TString>& flags) {
  560. if (qContext.CanRead()) {
  561. auto loaded = qContext.GetReader()->Get({FacadeComponent, TranslationLabel}).GetValueSync();
  562. if (!loaded) {
  563. return;
  564. }
  565. auto dataNode = NYT::NodeFromYsonString(loaded->Value);
  566. flags = ExtractSqlFlags(dataNode);
  567. }
  568. }
  569. void TProgram::HandleTranslationSettings(NSQLTranslation::TTranslationSettings& loadedSettings,
  570. NSQLTranslation::TTranslationSettings*& currentSettings)
  571. {
  572. if (QContext_.CanWrite()) {
  573. auto clusterMappingsNode = NYT::TNode::CreateMap();
  574. for (const auto& c : currentSettings->ClusterMapping) {
  575. clusterMappingsNode(c.first, c.second);
  576. }
  577. auto sqlFlagsNode = NYT::TNode::CreateList();
  578. for (const auto& f : currentSettings->Flags) {
  579. sqlFlagsNode.Add(f);
  580. }
  581. auto dataNode = NYT::TNode()
  582. ("ClusterMapping", clusterMappingsNode)
  583. ("V0Behavior", ui64(currentSettings->V0Behavior))
  584. ("V0WarnAsError", currentSettings->V0WarnAsError->Allow())
  585. ("DqDefaultAuto", currentSettings->DqDefaultAuto->Allow())
  586. ("BlockDefaultAuto", currentSettings->BlockDefaultAuto->Allow())
  587. ("SqlFlags", sqlFlagsNode);
  588. auto data = NYT::NodeToYsonString(dataNode, NYT::NYson::EYsonFormat::Binary);
  589. QContext_.GetWriter()->Put({FacadeComponent, TranslationLabel}, data).GetValueSync();
  590. } else if (QContext_.CanRead()) {
  591. auto loaded = QContext_.GetReader()->Get({FacadeComponent, TranslationLabel}).GetValueSync();
  592. if (!loaded) {
  593. return;
  594. }
  595. auto dataNode = NYT::NodeFromYsonString(loaded->Value);
  596. loadedSettings.ClusterMapping.clear();
  597. for (const auto& c : dataNode["ClusterMapping"].AsMap()) {
  598. loadedSettings.ClusterMapping[c.first] = c.second.AsString();
  599. }
  600. loadedSettings.Flags = ExtractSqlFlags(dataNode);
  601. loadedSettings.V0Behavior = (NSQLTranslation::EV0Behavior)dataNode["V0Behavior"].AsUint64();
  602. loadedSettings.V0WarnAsError = NSQLTranslation::ISqlFeaturePolicy::Make(dataNode["V0WarnAsError"].AsBool());
  603. loadedSettings.DqDefaultAuto = NSQLTranslation::ISqlFeaturePolicy::Make(dataNode["DqDefaultAuto"].AsBool());
  604. loadedSettings.BlockDefaultAuto = NSQLTranslation::ISqlFeaturePolicy::Make(dataNode["BlockDefaultAuto"].AsBool());
  605. currentSettings = &loadedSettings;
  606. }
  607. }
  608. bool TProgram::ParseYql() {
  609. YQL_PROFILE_FUNC(TRACE);
  610. YQL_ENSURE(SourceSyntax_ == ESourceSyntax::Unknown);
  611. SourceSyntax_ = ESourceSyntax::Yql;
  612. SyntaxVersion_ = 1;
  613. auto sourceCode = SourceCode_;
  614. HandleSourceCode(sourceCode);
  615. return FillParseResult(ParseAst(sourceCode));
  616. }
  617. bool TProgram::ParseSql() {
  618. YQL_PROFILE_FUNC(TRACE);
  619. static const THashMap<TString, TString> clusters = {
  620. { "plato", TString(YtProviderName) }
  621. };
  622. NSQLTranslation::TTranslationSettings settings;
  623. settings.ClusterMapping = clusters;
  624. return ParseSql(settings);
  625. }
  626. bool TProgram::ParseSql(const NSQLTranslation::TTranslationSettings& settings)
  627. {
  628. YQL_PROFILE_FUNC(TRACE);
  629. YQL_ENSURE(SourceSyntax_ == ESourceSyntax::Unknown);
  630. SourceSyntax_ = ESourceSyntax::Sql;
  631. SyntaxVersion_ = settings.SyntaxVersion;
  632. NYql::TWarningRules warningRules;
  633. auto sourceCode = SourceCode_;
  634. HandleSourceCode(sourceCode);
  635. NSQLTranslation::TTranslationSettings outerSettings = settings;
  636. NSQLTranslation::TTranslationSettings* currentSettings = &outerSettings;
  637. NSQLTranslation::TTranslationSettings loadedSettings;
  638. loadedSettings.PgParser = settings.PgParser;
  639. if (QContext_) {
  640. HandleTranslationSettings(loadedSettings, currentSettings);
  641. }
  642. currentSettings->EmitReadsForExists = true;
  643. NSQLTranslationV1::TLexers lexers;
  644. lexers.Antlr3 = NSQLTranslationV1::MakeAntlr3LexerFactory();
  645. lexers.Antlr3Ansi = NSQLTranslationV1::MakeAntlr3AnsiLexerFactory();
  646. lexers.Antlr4 = NSQLTranslationV1::MakeAntlr4LexerFactory();
  647. lexers.Antlr4Ansi = NSQLTranslationV1::MakeAntlr4AnsiLexerFactory();
  648. NSQLTranslationV1::TParsers parsers;
  649. parsers.Antlr3 = NSQLTranslationV1::MakeAntlr3ParserFactory();
  650. parsers.Antlr3Ansi = NSQLTranslationV1::MakeAntlr3AnsiParserFactory();
  651. parsers.Antlr4 = NSQLTranslationV1::MakeAntlr4ParserFactory();
  652. parsers.Antlr4Ansi = NSQLTranslationV1::MakeAntlr4AnsiParserFactory();
  653. NSQLTranslation::TTranslators translators(
  654. nullptr,
  655. NSQLTranslationV1::MakeTranslator(lexers, parsers),
  656. NSQLTranslationPG::MakeTranslator()
  657. );
  658. return FillParseResult(SqlToYql(translators, sourceCode, *currentSettings, &warningRules), &warningRules);
  659. }
  660. bool TProgram::Compile(const TString& username, bool skipLibraries) {
  661. YQL_PROFILE_FUNC(TRACE);
  662. Y_ENSURE(AstRoot_, "Program not parsed yet");
  663. if (!ExprCtx_) {
  664. ExprCtx_.Reset(new TExprContext(NextUniqueId_));
  665. }
  666. if (!ProvideAnnotationContext(username)->Initialize(*ExprCtx_)) {
  667. return false;
  668. }
  669. TypeCtx_->IsReadOnly = true;
  670. if (!skipLibraries && Modules_.get()) {
  671. auto libs = UserDataStorage_->GetLibraries();
  672. for (auto lib : libs) {
  673. if (!Modules_->AddFromFile(lib, *ExprCtx_, SyntaxVersion_, 0)) {
  674. return false;
  675. }
  676. }
  677. }
  678. if (!CompileExpr(
  679. *AstRoot_, ExprRoot_, *ExprCtx_,
  680. skipLibraries ? nullptr : Modules_.get(),
  681. skipLibraries ? nullptr : UrlListerManager_.Get(), 0, SyntaxVersion_
  682. )) {
  683. return false;
  684. }
  685. return true;
  686. }
  687. bool TProgram::CollectUsedClusters() {
  688. using namespace NNodes;
  689. if (!UsedClusters_) {
  690. UsedClusters_.ConstructInPlace();
  691. UsedProviders_.ConstructInPlace();
  692. auto& typesCtx = *GetAnnotationContext();
  693. auto& ctx = *ExprCtx_;
  694. auto& usedClusters = *UsedClusters_;
  695. auto& usedProviders = *UsedProviders_;
  696. bool hasErrors = false;
  697. VisitExpr(ExprRoot_, [&typesCtx, &ctx, &usedClusters, &usedProviders, &hasErrors](const TExprNode::TPtr& node) {
  698. if (auto dsNode = TMaybeNode<TCoDataSource>(node)) {
  699. auto datasource = typesCtx.DataSourceMap.FindPtr(dsNode.Cast().Category());
  700. YQL_ENSURE(datasource, "Unknown DataSource: " << dsNode.Cast().Category().Value());
  701. TMaybe<TString> cluster;
  702. if (!(*datasource)->ValidateParameters(*node, ctx, cluster)) {
  703. hasErrors = true;
  704. return false;
  705. }
  706. usedProviders.insert(TString(dsNode.Cast().Category().Value()));
  707. if (cluster && *cluster != NCommon::ALL_CLUSTERS) {
  708. usedClusters.insert(*cluster);
  709. }
  710. }
  711. if (auto dsNode = TMaybeNode<TCoDataSink>(node)) {
  712. auto datasink = typesCtx.DataSinkMap.FindPtr(dsNode.Cast().Category());
  713. YQL_ENSURE(datasink, "Unknown DataSink: " << dsNode.Cast().Category().Value());
  714. TMaybe<TString> cluster;
  715. if (!(*datasink)->ValidateParameters(*node, ctx, cluster)) {
  716. hasErrors = true;
  717. return false;
  718. }
  719. usedProviders.insert(TString(dsNode.Cast().Category().Value()));
  720. if (cluster) {
  721. usedClusters.insert(*cluster);
  722. }
  723. }
  724. return true;
  725. });
  726. if (hasErrors) {
  727. UsedClusters_ = Nothing();
  728. UsedProviders_ = Nothing();
  729. return false;
  730. }
  731. }
  732. return true;
  733. }
  734. TProgram::TStatus TProgram::Discover(const TString& username) {
  735. YQL_PROFILE_FUNC(TRACE);
  736. auto m = &TProgram::DiscoverAsync;
  737. return SyncExecution(this, m, username);
  738. }
  739. TProgram::TFutureStatus TProgram::DiscoverAsync(const TString& username) {
  740. if (!ProvideAnnotationContext(username)->Initialize(*ExprCtx_) || !CollectUsedClusters()) {
  741. return NThreading::MakeFuture<TStatus>(IGraphTransformer::TStatus::Error);
  742. }
  743. TypeCtx_->DiscoveryMode = true;
  744. Y_ENSURE(ExprRoot_, "Program not compiled yet");
  745. Transformer_ = TTransformationPipeline(TypeCtx_)
  746. .AddServiceTransformers()
  747. .AddParametersEvaluation(*FunctionRegistry_)
  748. .AddPreTypeAnnotation()
  749. .AddExpressionEvaluation(*FunctionRegistry_)
  750. .AddPreIOAnnotation()
  751. .Build();
  752. TFuture<void> openSession = OpenSession(username);
  753. if (!openSession.Initialized()) {
  754. return NThreading::MakeFuture<TStatus>(IGraphTransformer::TStatus::Error);
  755. }
  756. return openSession.Apply([this](const TFuture<void>& f) {
  757. YQL_LOG_CTX_ROOT_SESSION_SCOPE(GetSessionId());
  758. try {
  759. f.GetValue();
  760. } catch (const std::exception& e) {
  761. YQL_LOG(ERROR) << "OpenSession error: " << e.what();
  762. ExprCtx_->IssueManager.RaiseIssue(ExceptionToIssue(e));
  763. return NThreading::MakeFuture<TStatus>(IGraphTransformer::TStatus::Error);
  764. }
  765. return AsyncTransform(*Transformer_, ExprRoot_, *ExprCtx_, false);
  766. });
  767. }
  768. TProgram::TStatus TProgram::Lineage(const TString& username, IOutputStream* traceOut, IOutputStream* exprOut, bool withTypes) {
  769. YQL_PROFILE_FUNC(TRACE);
  770. auto m = &TProgram::LineageAsync;
  771. return SyncExecution(this, m, username, traceOut, exprOut, withTypes);
  772. }
  773. TProgram::TFutureStatus TProgram::LineageAsync(const TString& username, IOutputStream* traceOut, IOutputStream* exprOut, bool withTypes) {
  774. if (!ProvideAnnotationContext(username)->Initialize(*ExprCtx_) || !CollectUsedClusters()) {
  775. return NThreading::MakeFuture<TStatus>(IGraphTransformer::TStatus::Error);
  776. }
  777. TypeCtx_->IsReadOnly = true;
  778. Y_ENSURE(ExprRoot_, "Program not compiled yet");
  779. ExprStream_ = exprOut;
  780. Transformer_ = TTransformationPipeline(TypeCtx_)
  781. .AddServiceTransformers()
  782. .AddParametersEvaluation(*FunctionRegistry_)
  783. .AddPreTypeAnnotation()
  784. .AddExpressionEvaluation(*FunctionRegistry_)
  785. .AddIOAnnotation()
  786. .AddTypeAnnotation(TIssuesIds::CORE_TYPE_ANN, true)
  787. .AddPostTypeAnnotation()
  788. .Add(TExprOutputTransformer::Sync(ExprRoot_, traceOut), "ExprOutput")
  789. .AddLineageOptimization(LineageStr_)
  790. .Add(TExprOutputTransformer::Sync(ExprRoot_, exprOut, withTypes), "AstOutput")
  791. .Build();
  792. TFuture<void> openSession = OpenSession(username);
  793. if (!openSession.Initialized())
  794. return NThreading::MakeFuture<TStatus>(IGraphTransformer::TStatus::Error);
  795. SaveExprRoot();
  796. return openSession.Apply([this](const TFuture<void>& f) {
  797. YQL_LOG_CTX_ROOT_SESSION_SCOPE(GetSessionId());
  798. try {
  799. f.GetValue();
  800. } catch (const std::exception& e) {
  801. YQL_LOG(ERROR) << "OpenSession error: " << e.what();
  802. ExprCtx_->IssueManager.RaiseIssue(ExceptionToIssue(e));
  803. return NThreading::MakeFuture<TStatus>(IGraphTransformer::TStatus::Error);
  804. }
  805. return AsyncTransformWithFallback(false);
  806. });
  807. }
  808. TProgram::TStatus TProgram::Validate(const TString& username, IOutputStream* exprOut, bool withTypes) {
  809. YQL_PROFILE_FUNC(TRACE);
  810. auto m = &TProgram::ValidateAsync;
  811. return SyncExecution(this, m, username, exprOut, withTypes);
  812. }
  813. TProgram::TFutureStatus TProgram::ValidateAsync(const TString& username, IOutputStream* exprOut, bool withTypes) {
  814. if (!ProvideAnnotationContext(username)->Initialize(*ExprCtx_) || !CollectUsedClusters()) {
  815. return NThreading::MakeFuture<TStatus>(IGraphTransformer::TStatus::Error);
  816. }
  817. TypeCtx_->IsReadOnly = true;
  818. TVector<TDataProviderInfo> dataProviders;
  819. with_lock (DataProvidersLock_) {
  820. dataProviders = DataProviders_;
  821. }
  822. for (const auto& dp : dataProviders) {
  823. if (!dp.RemoteClusterProvider || !dp.RemoteValidate) {
  824. continue;
  825. }
  826. if (auto cluster = dp.RemoteClusterProvider(UsedClusters_, UsedProviders_, SourceSyntax_)) {
  827. return dp.RemoteValidate(*cluster, SourceSyntax_, SourceCode_, *ExprCtx_);
  828. }
  829. }
  830. Y_ENSURE(ExprRoot_, "Program not compiled yet");
  831. ExprStream_ = exprOut;
  832. Transformer_ = TTransformationPipeline(TypeCtx_)
  833. .AddServiceTransformers()
  834. .AddParametersEvaluation(*FunctionRegistry_)
  835. .AddPreTypeAnnotation()
  836. .AddExpressionEvaluation(*FunctionRegistry_)
  837. .AddIOAnnotation()
  838. .AddTypeAnnotation(TIssuesIds::CORE_TYPE_ANN, true)
  839. .Add(TExprOutputTransformer::Sync(ExprRoot_, exprOut, withTypes), "AstOutput")
  840. .Build();
  841. TFuture<void> openSession = OpenSession(username);
  842. if (!openSession.Initialized()) {
  843. return NThreading::MakeFuture<TStatus>(IGraphTransformer::TStatus::Error);
  844. }
  845. SaveExprRoot();
  846. return openSession.Apply([this](const TFuture<void>& f) {
  847. YQL_LOG_CTX_ROOT_SESSION_SCOPE(GetSessionId());
  848. try {
  849. f.GetValue();
  850. } catch (const std::exception& e) {
  851. YQL_LOG(ERROR) << "OpenSession error: " << e.what();
  852. ExprCtx_->IssueManager.RaiseIssue(ExceptionToIssue(e));
  853. return NThreading::MakeFuture<TStatus>(IGraphTransformer::TStatus::Error);
  854. }
  855. return AsyncTransformWithFallback(false);
  856. });
  857. }
  858. TProgram::TStatus TProgram::Optimize(
  859. const TString& username,
  860. IOutputStream* traceOut,
  861. IOutputStream* tracePlan,
  862. IOutputStream* exprOut,
  863. bool withTypes)
  864. {
  865. YQL_PROFILE_FUNC(TRACE);
  866. auto m = &TProgram::OptimizeAsync;
  867. return SyncExecution(this, m, username, traceOut, tracePlan, exprOut, withTypes);
  868. }
  869. TProgram::TFutureStatus TProgram::OptimizeAsync(
  870. const TString& username,
  871. IOutputStream* traceOut,
  872. IOutputStream* tracePlan,
  873. IOutputStream* exprOut,
  874. bool withTypes)
  875. {
  876. if (!ProvideAnnotationContext(username)->Initialize(*ExprCtx_) || !CollectUsedClusters()) {
  877. return NThreading::MakeFuture<TStatus>(IGraphTransformer::TStatus::Error);
  878. }
  879. TypeCtx_->IsReadOnly = true;
  880. TVector<TDataProviderInfo> dataProviders;
  881. with_lock (DataProvidersLock_) {
  882. dataProviders = DataProviders_;
  883. }
  884. for (const auto& dp : dataProviders) {
  885. if (!dp.RemoteClusterProvider || !dp.RemoteOptimize) {
  886. continue;
  887. }
  888. if (auto cluster = dp.RemoteClusterProvider(UsedClusters_, UsedProviders_, SourceSyntax_)) {
  889. return dp.RemoteOptimize(*cluster,
  890. SourceSyntax_, SourceCode_, nullptr,
  891. TypeCtx_, ExprRoot_, *ExprCtx_, ExternalQueryAst_, ExternalQueryPlan_);
  892. }
  893. }
  894. Y_ENSURE(ExprRoot_, "Program not compiled yet");
  895. ExprStream_ = exprOut;
  896. PlanStream_ = tracePlan;
  897. Transformer_ = TTransformationPipeline(TypeCtx_)
  898. .AddServiceTransformers()
  899. .AddParametersEvaluation(*FunctionRegistry_)
  900. .AddPreTypeAnnotation()
  901. .AddExpressionEvaluation(*FunctionRegistry_)
  902. .AddIOAnnotation()
  903. .AddTypeAnnotation(TIssuesIds::CORE_TYPE_ANN, true)
  904. .AddPostTypeAnnotation()
  905. .Add(TExprOutputTransformer::Sync(ExprRoot_, traceOut), "ExprOutput")
  906. .AddOptimization()
  907. .Add(TExprOutputTransformer::Sync(ExprRoot_, exprOut, withTypes), "AstOutput")
  908. .Build();
  909. TFuture<void> openSession = OpenSession(username);
  910. if (!openSession.Initialized())
  911. return NThreading::MakeFuture<TStatus>(IGraphTransformer::TStatus::Error);
  912. SaveExprRoot();
  913. return openSession.Apply([this](const TFuture<void>& f) {
  914. YQL_LOG_CTX_ROOT_SESSION_SCOPE(GetSessionId());
  915. try {
  916. f.GetValue();
  917. } catch (const std::exception& e) {
  918. YQL_LOG(ERROR) << "OpenSession error: " << e.what();
  919. ExprCtx_->IssueManager.RaiseIssue(ExceptionToIssue(e));
  920. return NThreading::MakeFuture<TStatus>(IGraphTransformer::TStatus::Error);
  921. }
  922. return AsyncTransformWithFallback(false);
  923. });
  924. }
  925. TProgram::TStatus TProgram::OptimizeWithConfig(
  926. const TString& username, const IPipelineConfigurator& pipelineConf)
  927. {
  928. YQL_PROFILE_FUNC(TRACE);
  929. auto m = &TProgram::OptimizeAsyncWithConfig;
  930. return SyncExecution(this, m, username, pipelineConf);
  931. }
  932. TProgram::TFutureStatus TProgram::OptimizeAsyncWithConfig(
  933. const TString& username, const IPipelineConfigurator& pipelineConf)
  934. {
  935. if (!ProvideAnnotationContext(username)->Initialize(*ExprCtx_) || !CollectUsedClusters()) {
  936. return NThreading::MakeFuture<TStatus>(IGraphTransformer::TStatus::Error);
  937. }
  938. TypeCtx_->IsReadOnly = true;
  939. TVector<TDataProviderInfo> dataProviders;
  940. with_lock (DataProvidersLock_) {
  941. dataProviders = DataProviders_;
  942. }
  943. for (const auto& dp : DataProviders_) {
  944. if (!dp.RemoteClusterProvider || !dp.RemoteOptimize) {
  945. continue;
  946. }
  947. if (auto cluster = dp.RemoteClusterProvider(UsedClusters_, UsedProviders_, SourceSyntax_)) {
  948. return dp.RemoteOptimize(*cluster,
  949. SourceSyntax_, SourceCode_, &pipelineConf,
  950. TypeCtx_, ExprRoot_, *ExprCtx_, ExternalQueryAst_, ExternalQueryPlan_);
  951. }
  952. }
  953. Y_ENSURE(ExprRoot_, "Program not compiled yet");
  954. TTransformationPipeline pipeline(TypeCtx_);
  955. pipelineConf.AfterCreate(&pipeline);
  956. pipeline.AddServiceTransformers();
  957. pipeline.AddParametersEvaluation(*FunctionRegistry_);
  958. pipeline.AddPreTypeAnnotation();
  959. pipeline.AddExpressionEvaluation(*FunctionRegistry_);
  960. pipeline.AddIOAnnotation();
  961. pipeline.AddTypeAnnotation(TIssuesIds::CORE_TYPE_ANN, true);
  962. pipeline.AddPostTypeAnnotation();
  963. pipelineConf.AfterTypeAnnotation(&pipeline);
  964. pipeline.AddOptimization();
  965. if (EnableRangeComputeFor_) {
  966. pipeline.Add(MakeExpandRangeComputeForTransformer(pipeline.GetTypeAnnotationContext()),
  967. "ExpandRangeComputeFor", TIssuesIds::CORE_EXEC);
  968. }
  969. pipeline.Add(CreatePlanInfoTransformer(*TypeCtx_), "PlanInfo");
  970. pipelineConf.AfterOptimize(&pipeline);
  971. Transformer_ = pipeline.Build();
  972. TFuture<void> openSession = OpenSession(username);
  973. if (!openSession.Initialized())
  974. return NThreading::MakeFuture<TStatus>(IGraphTransformer::TStatus::Error);
  975. SaveExprRoot();
  976. return openSession.Apply([this](const TFuture<void>& f) {
  977. YQL_LOG_CTX_ROOT_SESSION_SCOPE(GetSessionId());
  978. try {
  979. f.GetValue();
  980. } catch (const std::exception& e) {
  981. YQL_LOG(ERROR) << "OpenSession error: " << e.what();
  982. ExprCtx_->IssueManager.RaiseIssue(ExceptionToIssue(e));
  983. return NThreading::MakeFuture<TStatus>(IGraphTransformer::TStatus::Error);
  984. }
  985. return AsyncTransformWithFallback(false);
  986. });
  987. }
  988. TProgram::TStatus TProgram::LineageWithConfig(
  989. const TString& username, const IPipelineConfigurator& pipelineConf)
  990. {
  991. YQL_PROFILE_FUNC(TRACE);
  992. auto m = &TProgram::LineageAsyncWithConfig;
  993. return SyncExecution(this, m, username, pipelineConf);
  994. }
  995. TProgram::TFutureStatus TProgram::LineageAsyncWithConfig(
  996. const TString& username, const IPipelineConfigurator& pipelineConf)
  997. {
  998. if (!ProvideAnnotationContext(username)->Initialize(*ExprCtx_) || !CollectUsedClusters()) {
  999. return NThreading::MakeFuture<TStatus>(IGraphTransformer::TStatus::Error);
  1000. }
  1001. TypeCtx_->IsReadOnly = true;
  1002. Y_ENSURE(ExprRoot_, "Program not compiled yet");
  1003. TTransformationPipeline pipeline(TypeCtx_);
  1004. pipelineConf.AfterCreate(&pipeline);
  1005. pipeline.AddServiceTransformers();
  1006. pipeline.AddParametersEvaluation(*FunctionRegistry_);
  1007. pipeline.AddPreTypeAnnotation();
  1008. pipeline.AddExpressionEvaluation(*FunctionRegistry_);
  1009. pipeline.AddIOAnnotation();
  1010. pipeline.AddTypeAnnotation(TIssuesIds::CORE_TYPE_ANN, true);
  1011. pipeline.AddPostTypeAnnotation();
  1012. pipelineConf.AfterTypeAnnotation(&pipeline);
  1013. pipeline.AddLineageOptimization(LineageStr_);
  1014. Transformer_ = pipeline.Build();
  1015. TFuture<void> openSession = OpenSession(username);
  1016. if (!openSession.Initialized())
  1017. return NThreading::MakeFuture<TStatus>(IGraphTransformer::TStatus::Error);
  1018. SaveExprRoot();
  1019. return openSession.Apply([this](const TFuture<void>& f) {
  1020. YQL_LOG_CTX_ROOT_SESSION_SCOPE(GetSessionId());
  1021. try {
  1022. f.GetValue();
  1023. } catch (const std::exception& e) {
  1024. YQL_LOG(ERROR) << "OpenSession error: " << e.what();
  1025. ExprCtx_->IssueManager.RaiseIssue(ExceptionToIssue(e));
  1026. return NThreading::MakeFuture<TStatus>(IGraphTransformer::TStatus::Error);
  1027. }
  1028. return AsyncTransformWithFallback(false);
  1029. });
  1030. }
  1031. TProgram::TStatus TProgram::Run(
  1032. const TString& username,
  1033. IOutputStream* traceOut,
  1034. IOutputStream* tracePlan,
  1035. IOutputStream* exprOut,
  1036. bool withTypes)
  1037. {
  1038. YQL_PROFILE_FUNC(TRACE);
  1039. auto m = &TProgram::RunAsync;
  1040. return SyncExecution(this, m, username, traceOut, tracePlan, exprOut, withTypes);
  1041. }
  1042. TProgram::TFutureStatus TProgram::RunAsync(
  1043. const TString& username,
  1044. IOutputStream* traceOut,
  1045. IOutputStream* tracePlan,
  1046. IOutputStream* exprOut,
  1047. bool withTypes)
  1048. {
  1049. if (!ProvideAnnotationContext(username)->Initialize(*ExprCtx_) || !CollectUsedClusters()) {
  1050. return NThreading::MakeFuture<TStatus>(IGraphTransformer::TStatus::Error);
  1051. }
  1052. TypeCtx_->IsReadOnly = (HiddenMode_ != EHiddenMode::Disable);
  1053. TVector<TDataProviderInfo> dataProviders;
  1054. with_lock (DataProvidersLock_) {
  1055. dataProviders = DataProviders_;
  1056. }
  1057. for (const auto& dp : DataProviders_) {
  1058. if (!dp.RemoteClusterProvider || !dp.RemoteRun) {
  1059. continue;
  1060. }
  1061. if (auto cluster = dp.RemoteClusterProvider(UsedClusters_, UsedProviders_, SourceSyntax_)) {
  1062. return dp.RemoteRun(*cluster, SourceSyntax_, SourceCode_,
  1063. OutputFormat_, ResultFormat_, nullptr,
  1064. TypeCtx_, ExprRoot_, *ExprCtx_, ExternalQueryAst_, ExternalQueryPlan_, ExternalDiagnostics_,
  1065. ResultProviderConfig_);
  1066. }
  1067. }
  1068. Y_ENSURE(ExprRoot_, "Program not compiled yet");
  1069. ExprStream_ = exprOut;
  1070. PlanStream_ = tracePlan;
  1071. TTransformationPipeline pipeline(TypeCtx_);
  1072. pipeline.AddServiceTransformers();
  1073. pipeline.AddParametersEvaluation(*FunctionRegistry_);
  1074. pipeline.AddPreTypeAnnotation();
  1075. pipeline.AddExpressionEvaluation(*FunctionRegistry_);
  1076. pipeline.AddIOAnnotation();
  1077. pipeline.AddTypeAnnotation(TIssuesIds::CORE_TYPE_ANN, true);
  1078. pipeline.AddPostTypeAnnotation();
  1079. pipeline.Add(TExprOutputTransformer::Sync(ExprRoot_, traceOut), "ExprOutput");
  1080. pipeline.AddOptimization();
  1081. if (EnableRangeComputeFor_) {
  1082. pipeline.Add(MakeExpandRangeComputeForTransformer(pipeline.GetTypeAnnotationContext()),
  1083. "ExpandRangeComputeFor", TIssuesIds::CORE_EXEC);
  1084. }
  1085. pipeline.Add(TExprOutputTransformer::Sync(ExprRoot_, exprOut, withTypes), "AstOutput");
  1086. pipeline.Add(TPlanOutputTransformer::Sync(tracePlan, GetPlanBuilder(), OutputFormat_), "PlanOutput");
  1087. pipeline.AddRun(ProgressWriter_);
  1088. Transformer_ = pipeline.Build();
  1089. TFuture<void> openSession = OpenSession(username);
  1090. if (!openSession.Initialized()) {
  1091. return NThreading::MakeFuture<TStatus>(IGraphTransformer::TStatus::Error);
  1092. }
  1093. SaveExprRoot();
  1094. return openSession.Apply([this](const TFuture<void>& f) {
  1095. YQL_LOG_CTX_ROOT_SESSION_SCOPE(GetSessionId());
  1096. try {
  1097. f.GetValue();
  1098. } catch (const std::exception& e) {
  1099. YQL_LOG(ERROR) << "OpenSession error: " << e.what();
  1100. ExprCtx_->IssueManager.RaiseIssue(ExceptionToIssue(e));
  1101. return NThreading::MakeFuture<TStatus>(IGraphTransformer::TStatus::Error);
  1102. }
  1103. return AsyncTransformWithFallback(false);
  1104. });
  1105. }
  1106. TProgram::TStatus TProgram::RunWithConfig(
  1107. const TString& username, const IPipelineConfigurator& pipelineConf)
  1108. {
  1109. YQL_PROFILE_FUNC(TRACE);
  1110. auto m = &TProgram::RunAsyncWithConfig;
  1111. return SyncExecution(this, m, username, pipelineConf);
  1112. }
  1113. TProgram::TFutureStatus TProgram::RunAsyncWithConfig(
  1114. const TString& username, const IPipelineConfigurator& pipelineConf)
  1115. {
  1116. if (!ProvideAnnotationContext(username)->Initialize(*ExprCtx_) || !CollectUsedClusters()) {
  1117. return NThreading::MakeFuture<TStatus>(IGraphTransformer::TStatus::Error);
  1118. }
  1119. TypeCtx_->IsReadOnly = (HiddenMode_ != EHiddenMode::Disable);
  1120. TVector<TDataProviderInfo> dataProviders;
  1121. with_lock (DataProvidersLock_) {
  1122. dataProviders = DataProviders_;
  1123. }
  1124. for (const auto& dp : DataProviders_) {
  1125. if (!dp.RemoteClusterProvider || !dp.RemoteRun) {
  1126. continue;
  1127. }
  1128. if (auto cluster = dp.RemoteClusterProvider(UsedClusters_, UsedProviders_, SourceSyntax_)) {
  1129. return dp.RemoteRun(*cluster, SourceSyntax_, SourceCode_,
  1130. OutputFormat_, ResultFormat_, &pipelineConf,
  1131. TypeCtx_, ExprRoot_, *ExprCtx_, ExternalQueryAst_, ExternalQueryPlan_, ExternalDiagnostics_,
  1132. ResultProviderConfig_);
  1133. }
  1134. }
  1135. Y_ENSURE(ExprRoot_, "Program not compiled yet");
  1136. TTransformationPipeline pipeline(TypeCtx_);
  1137. pipelineConf.AfterCreate(&pipeline);
  1138. pipeline.AddServiceTransformers();
  1139. pipeline.AddParametersEvaluation(*FunctionRegistry_);
  1140. pipeline.AddPreTypeAnnotation();
  1141. pipeline.AddExpressionEvaluation(*FunctionRegistry_);
  1142. pipeline.AddIOAnnotation();
  1143. pipeline.AddTypeAnnotation(TIssuesIds::CORE_TYPE_ANN, true);
  1144. pipeline.AddPostTypeAnnotation();
  1145. pipelineConf.AfterTypeAnnotation(&pipeline);
  1146. pipeline.AddOptimization();
  1147. if (EnableRangeComputeFor_) {
  1148. pipeline.Add(MakeExpandRangeComputeForTransformer(pipeline.GetTypeAnnotationContext()),
  1149. "ExpandRangeComputeFor", TIssuesIds::CORE_EXEC);
  1150. }
  1151. pipelineConf.AfterOptimize(&pipeline);
  1152. pipeline.AddRun(ProgressWriter_);
  1153. Transformer_ = pipeline.Build();
  1154. TFuture<void> openSession = OpenSession(username);
  1155. if (!openSession.Initialized()) {
  1156. return NThreading::MakeFuture<TStatus>(IGraphTransformer::TStatus::Error);
  1157. }
  1158. SaveExprRoot();
  1159. return openSession.Apply([this](const TFuture<void>& f) {
  1160. YQL_LOG_CTX_ROOT_SESSION_SCOPE(GetSessionId());
  1161. try {
  1162. f.GetValue();
  1163. } catch (const std::exception& e) {
  1164. YQL_LOG(ERROR) << "OpenSession error: " << e.what();
  1165. ExprCtx_->IssueManager.RaiseIssue(ExceptionToIssue(e));
  1166. return NThreading::MakeFuture<TStatus>(IGraphTransformer::TStatus::Error);
  1167. }
  1168. return AsyncTransformWithFallback(false);
  1169. });
  1170. }
  1171. void TProgram::SaveExprRoot() {
  1172. TNodeOnNodeOwnedMap deepClones;
  1173. SavedExprRoot_ = ExprCtx_->DeepCopy(*ExprRoot_, *ExprCtx_, deepClones, /*internStrings*/false, /*copyTypes*/true, /*copyResult*/false, {});
  1174. }
  1175. std::optional<bool> TProgram::CheckFallbackIssues(const TIssues& issues) {
  1176. auto isFallback = std::optional<bool>();
  1177. auto checkIssue = [&](const TIssue& issue) {
  1178. if (issue.GetCode() == TIssuesIds::DQ_GATEWAY_ERROR) {
  1179. YQL_LOG(DEBUG) << "Gateway Error " << issue;
  1180. isFallback = false;
  1181. } else if (issue.GetCode() == TIssuesIds::DQ_GATEWAY_NEED_FALLBACK_ERROR) {
  1182. YQL_LOG(DEBUG) << "Gateway Fallback Error " << issue;
  1183. isFallback = true;
  1184. } else if (issue.GetCode() == TIssuesIds::DQ_OPTIMIZE_ERROR) {
  1185. YQL_LOG(DEBUG) << "Optimize Error " << issue;
  1186. isFallback = true;
  1187. } else if (issue.GetCode() >= TIssuesIds::YT_ACCESS_DENIED &&
  1188. issue.GetCode() <= TIssuesIds::YT_FOLDER_INPUT_IS_NOT_A_FOLDER &&
  1189. (issue.GetSeverity() == TSeverityIds::S_ERROR ||
  1190. issue.GetSeverity() == TSeverityIds::S_FATAL)) {
  1191. YQL_LOG(DEBUG) << "Yt Error " << issue;
  1192. isFallback = false;
  1193. }
  1194. };
  1195. std::function<void(const TIssuePtr& issue)> recursiveCheck = [&](const TIssuePtr& issue) {
  1196. checkIssue(*issue);
  1197. for (const auto& subissue : issue->GetSubIssues()) {
  1198. recursiveCheck(subissue);
  1199. }
  1200. };
  1201. for (const auto& issue : issues) {
  1202. checkIssue(issue);
  1203. // check subissues
  1204. for (const auto& subissue : issue.GetSubIssues()) {
  1205. recursiveCheck(subissue);
  1206. }
  1207. }
  1208. return isFallback;
  1209. }
  1210. TFuture<IGraphTransformer::TStatus> TProgram::AsyncTransformWithFallback(bool applyAsyncChanges)
  1211. {
  1212. return AsyncTransform(*Transformer_, ExprRoot_, *ExprCtx_, applyAsyncChanges).Apply([this](const TFuture<IGraphTransformer::TStatus>& res) {
  1213. auto status = res.GetValueSync();
  1214. if (status == IGraphTransformer::TStatus::Error
  1215. && !TypeCtx_->ForceDq
  1216. && SavedExprRoot_
  1217. && TypeCtx_->DqCaptured
  1218. && TypeCtx_->DqFallbackPolicy != EFallbackPolicy::Never)
  1219. {
  1220. auto issues = ExprCtx_->IssueManager.GetIssues();
  1221. bool isFallback = CheckFallbackIssues(issues).value_or(true);
  1222. if (!isFallback && TypeCtx_->DqFallbackPolicy != EFallbackPolicy::Always) {
  1223. // unrecoverable error
  1224. return res;
  1225. }
  1226. ExprRoot_ = SavedExprRoot_;
  1227. SavedExprRoot_ = nullptr;
  1228. UserDataStorage_->SetUserDataTable(std::move(SavedUserDataTable_));
  1229. ExprCtx_->IssueManager.Reset();
  1230. YQL_LOG(DEBUG) << "Fallback, Issues: " << issues.ToString();
  1231. ExprCtx_->Reset();
  1232. Transformer_->Rewind();
  1233. for (auto sink : TypeCtx_->DataSinks) {
  1234. sink->Reset();
  1235. }
  1236. for (auto source : TypeCtx_->DataSources) {
  1237. source->Reset();
  1238. }
  1239. TypeCtx_->Reset();
  1240. try {
  1241. CleanupLastSession().GetValueSync();
  1242. } catch (...) {
  1243. ExprCtx_->IssueManager.RaiseIssue(TIssue({}, "Failed to cleanup session: " + CurrentExceptionMessage()));
  1244. return NThreading::MakeFuture<IGraphTransformer::TStatus>(IGraphTransformer::TStatus::Error);
  1245. }
  1246. std::function<void(const TIssuePtr& issue)> toInfo = [&](const TIssuePtr& issue) {
  1247. if (issue->Severity == TSeverityIds::S_ERROR
  1248. || issue->Severity == TSeverityIds::S_FATAL
  1249. || issue->Severity == TSeverityIds::S_WARNING)
  1250. {
  1251. issue->Severity = TSeverityIds::S_INFO;
  1252. }
  1253. for (const auto& subissue : issue->GetSubIssues()) {
  1254. toInfo(subissue);
  1255. }
  1256. };
  1257. TIssue info("DQ cannot execute the query");
  1258. info.Severity = TSeverityIds::S_INFO;
  1259. for (auto& issue : issues) {
  1260. TIssuePtr newIssue = new TIssue(issue);
  1261. if (newIssue->Severity == TSeverityIds::S_ERROR
  1262. || issue.Severity == TSeverityIds::S_FATAL
  1263. || issue.Severity == TSeverityIds::S_WARNING)
  1264. {
  1265. newIssue->Severity = TSeverityIds::S_INFO;
  1266. }
  1267. for (auto& subissue : newIssue->GetSubIssues()) {
  1268. toInfo(subissue);
  1269. }
  1270. info.AddSubIssue(newIssue);
  1271. }
  1272. ExprCtx_->IssueManager.AddIssues({info});
  1273. ++FallbackCounter_;
  1274. // don't execute recapture again
  1275. ExprCtx_->Step.Done(TExprStep::Recapture);
  1276. AbortHidden_();
  1277. return AsyncTransformWithFallback(false);
  1278. }
  1279. if (status == IGraphTransformer::TStatus::Error && (TypeCtx_->DqFallbackPolicy == EFallbackPolicy::Never || TypeCtx_->ForceDq)) {
  1280. YQL_LOG(INFO) << "Fallback skipped due to per query policy";
  1281. }
  1282. return res;
  1283. });
  1284. }
  1285. TMaybe<TString> TProgram::GetQueryAst(TMaybe<size_t> memoryLimit) {
  1286. if (ExternalQueryAst_) {
  1287. return ExternalQueryAst_;
  1288. }
  1289. TStringStream astStream;
  1290. astStream.Reserve(DEFAULT_AST_BUF_SIZE);
  1291. if (ExprRoot_) {
  1292. std::unique_ptr<IAllocator> limitingAllocator;
  1293. TConvertToAstSettings settings;
  1294. settings.AnnotationFlags = TExprAnnotationFlags::None;
  1295. settings.RefAtoms = true;
  1296. settings.Allocator = TDefaultAllocator::Instance();
  1297. if (memoryLimit) {
  1298. limitingAllocator = MakeLimitingAllocator(*memoryLimit, TDefaultAllocator::Instance());
  1299. settings.Allocator = limitingAllocator.get();
  1300. }
  1301. auto ast = ConvertToAst(*ExprRoot_, *ExprCtx_, settings);
  1302. ast.Root->PrettyPrintTo(astStream, TAstPrintFlags::ShortQuote | TAstPrintFlags::PerLine);
  1303. return astStream.Str();
  1304. } else if (AstRoot_) {
  1305. AstRoot_->PrettyPrintTo(astStream, TAstPrintFlags::ShortQuote | TAstPrintFlags::PerLine);
  1306. return astStream.Str();
  1307. }
  1308. return Nothing();
  1309. }
  1310. TMaybe<TString> TProgram::GetQueryPlan(const TPlanSettings& settings) {
  1311. if (ExternalQueryPlan_) {
  1312. return ExternalQueryPlan_;
  1313. }
  1314. if (ExprRoot_) {
  1315. TStringStream planStream;
  1316. planStream.Reserve(DEFAULT_PLAN_BUF_SIZE);
  1317. NYson::TYsonWriter writer(&planStream, OutputFormat_);
  1318. PlanBuilder_->WritePlan(writer, ExprRoot_, settings);
  1319. return planStream.Str();
  1320. }
  1321. return Nothing();
  1322. }
  1323. TMaybe<TString> TProgram::GetDiagnostics() {
  1324. if (ExternalDiagnostics_) {
  1325. return ExternalDiagnostics_;
  1326. }
  1327. if (!TypeCtx_ || !TypeCtx_->Diagnostics) {
  1328. return Nothing();
  1329. }
  1330. if (!Transformer_) {
  1331. return Nothing();
  1332. }
  1333. TStringStream out;
  1334. NYson::TYsonWriter writer(&out, DiagnosticFormat_.GetOrElse(ResultFormat_));
  1335. writer.OnBeginMap();
  1336. writer.OnKeyedItem("Write");
  1337. writer.OnBeginList();
  1338. writer.OnListItem();
  1339. writer.OnBeginMap();
  1340. writer.OnKeyedItem("Data");
  1341. writer.OnBeginMap();
  1342. writer.OnKeyedItem("Diagnostics");
  1343. writer.OnBeginMap();
  1344. writer.OnKeyedItem("TransformStats");
  1345. auto transformStats = Transformer_->GetStatistics();
  1346. NCommon::TransformerStatsToYson("", transformStats, writer);
  1347. for (auto& datasink : TypeCtx_->DataSinks) {
  1348. writer.OnKeyedItem(datasink->GetName());
  1349. if (!datasink->CollectDiagnostics(writer)) {
  1350. writer.OnEntity();
  1351. }
  1352. }
  1353. writer.OnEndMap();
  1354. writer.OnEndMap();
  1355. writer.OnEndMap();
  1356. writer.OnEndList();
  1357. writer.OnEndMap();
  1358. return out.Str();
  1359. }
  1360. IGraphTransformer::TStatistics TProgram::GetRawDiagnostics() {
  1361. return Transformer_ ? Transformer_->GetStatistics() : IGraphTransformer::TStatistics::NotPresent();
  1362. }
  1363. TMaybe<TString> TProgram::GetTasksInfo() {
  1364. if (!TypeCtx_) {
  1365. return Nothing();
  1366. }
  1367. bool hasTasks = false;
  1368. TStringStream out;
  1369. NYson::TYsonWriter writer(&out, ResultFormat_);
  1370. writer.OnBeginMap();
  1371. writer.OnKeyedItem("Write");
  1372. writer.OnBeginList();
  1373. writer.OnListItem();
  1374. writer.OnBeginMap();
  1375. writer.OnKeyedItem("Tasks");
  1376. writer.OnBeginList();
  1377. for (auto& datasink : TypeCtx_->DataSinks) {
  1378. hasTasks = hasTasks || datasink->GetTasksInfo(writer);
  1379. }
  1380. writer.OnEndList();
  1381. writer.OnEndMap();
  1382. writer.OnEndList();
  1383. writer.OnEndMap();
  1384. if (hasTasks) {
  1385. return out.Str();
  1386. } else {
  1387. return Nothing();
  1388. }
  1389. }
  1390. TMaybe<TString> TProgram::GetStatistics(bool totalOnly, THashMap<TString, TStringBuf> extraYsons) {
  1391. if (!TypeCtx_) {
  1392. return Nothing();
  1393. }
  1394. TStringStream out;
  1395. NYson::TYsonWriter writer(&out, OutputFormat_);
  1396. // Header
  1397. writer.OnBeginMap();
  1398. writer.OnKeyedItem("ExecutionStatistics");
  1399. writer.OnBeginMap();
  1400. // Providers
  1401. THashSet<TStringBuf> processed;
  1402. for (auto& datasink : TypeCtx_->DataSinks) {
  1403. TStringStream providerOut;
  1404. NYson::TYsonWriter providerWriter(&providerOut);
  1405. if (datasink->CollectStatistics(providerWriter, totalOnly)) {
  1406. writer.OnKeyedItem(datasink->GetName());
  1407. writer.OnRaw(providerOut.Str());
  1408. processed.insert(datasink->GetName());
  1409. }
  1410. }
  1411. for (auto& datasource : TypeCtx_->DataSources) {
  1412. if (processed.insert(datasource->GetName()).second) {
  1413. TStringStream providerOut;
  1414. NYson::TYsonWriter providerWriter(&providerOut);
  1415. if (datasource->CollectStatistics(providerWriter, totalOnly)) {
  1416. writer.OnKeyedItem(datasource->GetName());
  1417. writer.OnRaw(providerOut.Str());
  1418. }
  1419. }
  1420. }
  1421. auto rusage = TRusage::Get();
  1422. // System stats
  1423. writer.OnKeyedItem("system");
  1424. writer.OnBeginMap();
  1425. writer.OnKeyedItem("MaxRSS");
  1426. writer.OnBeginMap();
  1427. writer.OnKeyedItem("max");
  1428. writer.OnInt64Scalar(rusage.MaxRss);
  1429. writer.OnEndMap();
  1430. writer.OnKeyedItem("MajorPageFaults");
  1431. writer.OnBeginMap();
  1432. writer.OnKeyedItem("count");
  1433. writer.OnInt64Scalar(rusage.MajorPageFaults);
  1434. writer.OnEndMap();
  1435. if (FallbackCounter_) {
  1436. writer.OnKeyedItem("Fallback");
  1437. writer.OnBeginMap();
  1438. writer.OnKeyedItem("count");
  1439. writer.OnInt64Scalar(FallbackCounter_);
  1440. writer.OnEndMap();
  1441. }
  1442. writer.OnEndMap(); // system
  1443. if (TypeCtx_->Modules) {
  1444. writer.OnKeyedItem("moduleResolver");
  1445. writer.OnBeginMap();
  1446. TypeCtx_->Modules->WriteStatistics(writer);
  1447. writer.OnEndMap();
  1448. }
  1449. // extra
  1450. for (const auto &[k, extraYson] : extraYsons) {
  1451. writer.OnKeyedItem(k);
  1452. writer.OnRaw(extraYson);
  1453. }
  1454. // Footer
  1455. writer.OnEndMap();
  1456. writer.OnEndMap();
  1457. return out.Str();
  1458. }
  1459. TMaybe<TString> TProgram::GetDiscoveredData() {
  1460. if (!TypeCtx_) {
  1461. return Nothing();
  1462. }
  1463. TStringStream out;
  1464. NYson::TYsonWriter writer(&out, OutputFormat_);
  1465. writer.OnBeginMap();
  1466. for (auto& datasource: TypeCtx_->DataSources) {
  1467. TStringStream providerOut;
  1468. NYson::TYsonWriter providerWriter(&providerOut);
  1469. if (datasource->CollectDiscoveredData(providerWriter)) {
  1470. writer.OnKeyedItem(datasource->GetName());
  1471. writer.OnRaw(providerOut.Str());
  1472. }
  1473. }
  1474. writer.OnEndMap();
  1475. return out.Str();
  1476. }
  1477. TMaybe<TString> TProgram::GetLineage() {
  1478. return LineageStr_;
  1479. }
  1480. TProgram::TFutureStatus TProgram::ContinueAsync() {
  1481. YQL_LOG_CTX_ROOT_SESSION_SCOPE(GetSessionId());
  1482. return AsyncTransformWithFallback(true);
  1483. }
  1484. NThreading::TFuture<void> TProgram::Abort()
  1485. {
  1486. return CloseLastSession();
  1487. }
  1488. TIssues TProgram::Issues() const {
  1489. TIssues result;
  1490. if (ExprCtx_) {
  1491. result.AddIssues(ExprCtx_->IssueManager.GetIssues());
  1492. }
  1493. result.AddIssues(FinalIssues_);
  1494. CheckFatalIssues(result);
  1495. return result;
  1496. }
  1497. TIssues TProgram::CompletedIssues() const {
  1498. TIssues result;
  1499. if (ExprCtx_) {
  1500. result.AddIssues(ExprCtx_->IssueManager.GetCompletedIssues());
  1501. }
  1502. result.AddIssues(FinalIssues_);
  1503. CheckFatalIssues(result);
  1504. return result;
  1505. }
  1506. TIssue MakeNoBlocksInfoIssue(const TVector<TString>& names, bool isTypes) {
  1507. TIssue result;
  1508. TString msg = TStringBuilder() << "Most frequent " << (isTypes ? "types " : "callables ")
  1509. << "which do not support block mode: " << JoinRange(", ", names.begin(), names.end());
  1510. result.SetMessage(msg);
  1511. result.SetCode(isTypes ? TIssuesIds::CORE_TOP_UNSUPPORTED_BLOCK_TYPES : TIssuesIds::CORE_TOP_UNSUPPORTED_BLOCK_CALLABLES, TSeverityIds::S_INFO);
  1512. return result;
  1513. }
  1514. void TProgram::FinalizeIssues() {
  1515. FinalIssues_.Clear();
  1516. if (TypeCtx_) {
  1517. static const size_t topCount = 10;
  1518. auto noBlockTypes = TypeCtx_->GetTopNoBlocksTypes(topCount);
  1519. if (!noBlockTypes.empty()) {
  1520. FinalIssues_.AddIssue(MakeNoBlocksInfoIssue(noBlockTypes, true));
  1521. }
  1522. auto noBlockCallables = TypeCtx_->GetTopNoBlocksCallables(topCount);
  1523. if (!noBlockCallables.empty()) {
  1524. FinalIssues_.AddIssue(MakeNoBlocksInfoIssue(noBlockCallables, false));
  1525. }
  1526. }
  1527. }
  1528. NThreading::TFuture<void> TProgram::CleanupLastSession() {
  1529. YQL_LOG_CTX_ROOT_SESSION_SCOPE(GetSessionId());
  1530. TString sessionId = GetSessionId();
  1531. if (sessionId.empty()) {
  1532. return MakeFuture();
  1533. }
  1534. TVector<TDataProviderInfo> dataProviders;
  1535. with_lock (DataProvidersLock_) {
  1536. dataProviders = DataProviders_;
  1537. }
  1538. TVector<NThreading::TFuture<void>> cleanupFutures;
  1539. cleanupFutures.reserve(dataProviders.size());
  1540. for (const auto& dp : dataProviders) {
  1541. if (dp.CleanupSession) {
  1542. dp.CleanupSession(sessionId);
  1543. }
  1544. if (dp.CleanupSessionAsync) {
  1545. cleanupFutures.push_back(dp.CleanupSessionAsync(sessionId));
  1546. }
  1547. }
  1548. return NThreading::WaitExceptionOrAll(cleanupFutures);
  1549. }
  1550. NThreading::TFuture<void> TProgram::CloseLastSession() {
  1551. YQL_LOG_CTX_ROOT_SESSION_SCOPE(GetSessionId());
  1552. TVector<TDataProviderInfo> dataProviders;
  1553. with_lock (DataProvidersLock_) {
  1554. dataProviders = DataProviders_;
  1555. }
  1556. auto promise = NThreading::NewPromise<void>();
  1557. TString sessionId;
  1558. with_lock(SessionIdLock_) {
  1559. // post-condition: SessionId_ will be empty
  1560. sessionId = std::move(SessionId_);
  1561. if (sessionId.empty()) {
  1562. return CloseLastSessionFuture_;
  1563. }
  1564. CloseLastSessionFuture_ = promise.GetFuture();
  1565. }
  1566. TVector<NThreading::TFuture<void>> closeFutures;
  1567. closeFutures.reserve(dataProviders.size());
  1568. for (const auto& dp : dataProviders) {
  1569. if (dp.CloseSession) {
  1570. dp.CloseSession(sessionId);
  1571. }
  1572. if (dp.CloseSessionAsync) {
  1573. closeFutures.push_back(dp.CloseSessionAsync(sessionId));
  1574. }
  1575. }
  1576. return NThreading::WaitExceptionOrAll(closeFutures)
  1577. .Apply([promise = std::move(promise)](const NThreading::TFuture<void>&) mutable {
  1578. promise.SetValue();
  1579. });
  1580. }
  1581. TString TProgram::ResultsAsString() const {
  1582. if (!ResultProviderConfig_)
  1583. return "";
  1584. TStringStream resultOut;
  1585. NYson::TYsonWriter yson(&resultOut, OutputFormat_);
  1586. yson.OnBeginList();
  1587. for (const auto& result: Results()) {
  1588. yson.OnListItem();
  1589. yson.OnRaw(result);
  1590. }
  1591. yson.OnEndList();
  1592. return resultOut.Str();
  1593. }
  1594. TTypeAnnotationContextPtr TProgram::BuildTypeAnnotationContext(const TString& username) {
  1595. auto typeAnnotationContext = MakeIntrusive<TTypeAnnotationContext>();
  1596. typeAnnotationContext->UserDataStorage = UserDataStorage_;
  1597. typeAnnotationContext->Credentials = Credentials_;
  1598. typeAnnotationContext->Modules = Modules_;
  1599. typeAnnotationContext->UrlListerManager = UrlListerManager_;
  1600. typeAnnotationContext->UdfResolver = UdfResolver_;
  1601. typeAnnotationContext->UdfIndex = UdfIndex_;
  1602. typeAnnotationContext->UdfIndexPackageSet = UdfIndexPackageSet_;
  1603. typeAnnotationContext->RandomProvider = RandomProvider_;
  1604. typeAnnotationContext->TimeProvider = TimeProvider_;
  1605. if (DiagnosticFormat_) {
  1606. typeAnnotationContext->Diagnostics = true;
  1607. }
  1608. typeAnnotationContext->ArrowResolver = ArrowResolver_;
  1609. typeAnnotationContext->FileStorage = FileStorage_;
  1610. typeAnnotationContext->QContext = QContext_;
  1611. typeAnnotationContext->HiddenMode = HiddenMode_;
  1612. if (UdfIndex_ && UdfIndexPackageSet_) {
  1613. // setup default versions at the beginning
  1614. // could be overridden by pragma later
  1615. UdfIndexPackageSet_->AddResourcesTo(UdfIndex_);
  1616. }
  1617. PlanBuilder_ = CreatePlanBuilder(*typeAnnotationContext);
  1618. THashSet<TString> providerNames;
  1619. TVector<TString> fullResultDataSinks;
  1620. TVector<std::function<TString(const TString&, const TString&)>> tokenResolvers;
  1621. for (const auto& dpi : DataProvidersInit_) {
  1622. auto dp = dpi(
  1623. username,
  1624. SessionId_,
  1625. GatewaysConfig_,
  1626. FunctionRegistry_,
  1627. RandomProvider_,
  1628. typeAnnotationContext,
  1629. ProgressWriter_,
  1630. OperationOptions_,
  1631. AbortHidden_,
  1632. QContext_
  1633. );
  1634. if (HiddenMode_ != EHiddenMode::Disable && !dp.SupportsHidden) {
  1635. continue;
  1636. }
  1637. providerNames.insert(dp.Names.begin(), dp.Names.end());
  1638. with_lock (DataProvidersLock_) {
  1639. DataProviders_.emplace_back(dp);
  1640. }
  1641. if (dp.Source) {
  1642. typeAnnotationContext->AddDataSource(dp.Names, dp.Source);
  1643. }
  1644. if (dp.Sink) {
  1645. typeAnnotationContext->AddDataSink(dp.Names, dp.Sink);
  1646. }
  1647. if (dp.TokenResolver) {
  1648. tokenResolvers.push_back(dp.TokenResolver);
  1649. }
  1650. if (dp.SupportFullResultDataSink) {
  1651. fullResultDataSinks.insert(fullResultDataSinks.end(), dp.Names.begin(), dp.Names.end());
  1652. }
  1653. }
  1654. TVector<TString> resultProviderDataSources;
  1655. if (providerNames.contains(YtProviderName)) {
  1656. resultProviderDataSources.push_back(TString(YtProviderName));
  1657. }
  1658. if (providerNames.contains(KikimrProviderName)) {
  1659. resultProviderDataSources.push_back(TString(KikimrProviderName));
  1660. }
  1661. if (providerNames.contains(RtmrProviderName)) {
  1662. resultProviderDataSources.push_back(TString(RtmrProviderName));
  1663. }
  1664. if (providerNames.contains(PqProviderName)) {
  1665. resultProviderDataSources.push_back(TString(PqProviderName));
  1666. }
  1667. if (providerNames.contains(DqProviderName)) {
  1668. resultProviderDataSources.push_back(TString(DqProviderName));
  1669. }
  1670. if (providerNames.contains(PureProviderName)) {
  1671. resultProviderDataSources.push_back(TString(PureProviderName));
  1672. }
  1673. if (!resultProviderDataSources.empty())
  1674. {
  1675. auto resultFormat = ResultFormat_;
  1676. auto writerFactory = [resultFormat] () { return CreateYsonResultWriter(resultFormat); };
  1677. ResultProviderConfig_ = MakeIntrusive<TResultProviderConfig>(*typeAnnotationContext,
  1678. *FunctionRegistry_, ResultType_, ToString((ui32)resultFormat), writerFactory);
  1679. ResultProviderConfig_->SupportsResultPosition = SupportsResultPosition_;
  1680. auto resultProvider = CreateResultProvider(ResultProviderConfig_);
  1681. typeAnnotationContext->AddDataSink(ResultProviderName, resultProvider);
  1682. typeAnnotationContext->AvailablePureResultDataSources = resultProviderDataSources;
  1683. }
  1684. if (!fullResultDataSinks.empty()) {
  1685. typeAnnotationContext->FullResultDataSink = fullResultDataSinks.front();
  1686. }
  1687. {
  1688. auto configProvider = CreateConfigProvider(*typeAnnotationContext, GatewaysConfig_, username);
  1689. typeAnnotationContext->AddDataSource(ConfigProviderName, configProvider);
  1690. }
  1691. tokenResolvers.push_back(BuildDefaultTokenResolver(typeAnnotationContext->Credentials));
  1692. typeAnnotationContext->UserDataStorage->SetTokenResolver(BuildCompositeTokenResolver(std::move(tokenResolvers)));
  1693. return typeAnnotationContext;
  1694. }
  1695. TFuture<void> TProgram::OpenSession(const TString& username)
  1696. {
  1697. TVector<TFuture<void>> openFutures;
  1698. with_lock (DataProvidersLock_) {
  1699. for (const auto& dp : DataProviders_) {
  1700. if (dp.OpenSession) {
  1701. auto future = dp.OpenSession(SessionId_, username, ProgressWriter_, OperationOptions_,
  1702. RandomProvider_, TimeProvider_);
  1703. openFutures.push_back(future);
  1704. }
  1705. }
  1706. }
  1707. return WaitExceptionOrAll(openFutures);
  1708. }
  1709. void TProgram::Print(IOutputStream* exprOut, IOutputStream* planOut, bool cleanPlan) {
  1710. TVector<TTransformStage> printTransformers;
  1711. const auto issueCode = TIssuesIds::DEFAULT_ERROR;
  1712. if (exprOut) {
  1713. printTransformers.push_back(TTransformStage(
  1714. TExprOutputTransformer::Sync(ExprRoot_, exprOut),
  1715. "ExprOutput",
  1716. issueCode));
  1717. }
  1718. if (planOut) {
  1719. if (cleanPlan) {
  1720. GetPlanBuilder().Clear();
  1721. }
  1722. printTransformers.push_back(TTransformStage(
  1723. TPlanOutputTransformer::Sync(planOut, GetPlanBuilder(), OutputFormat_),
  1724. "PlanOutput",
  1725. issueCode));
  1726. }
  1727. auto compositeTransformer = CreateCompositeGraphTransformer(printTransformers, false);
  1728. InstantTransform(*compositeTransformer, ExprRoot_, *ExprCtx_);
  1729. }
  1730. bool TProgram::HasActiveProcesses() {
  1731. with_lock (DataProvidersLock_) {
  1732. for (const auto& dp : DataProviders_) {
  1733. if (dp.HasActiveProcesses && dp.HasActiveProcesses()) {
  1734. return true;
  1735. }
  1736. }
  1737. }
  1738. return false;
  1739. }
  1740. bool TProgram::NeedWaitForActiveProcesses() {
  1741. with_lock (DataProvidersLock_) {
  1742. for (const auto& dp : DataProviders_) {
  1743. if (dp.HasActiveProcesses && dp.HasActiveProcesses() && dp.WaitForActiveProcesses) {
  1744. return true;
  1745. }
  1746. }
  1747. }
  1748. return false;
  1749. }
  1750. } // namespace NYql