yql_facade.cpp 70 KB


  1. #include "yql_facade.h"
  2. #include <yql/essentials/core/yql_execution.h>
  3. #include <yql/essentials/core/yql_expr_csee.h>
  4. #include <yql/essentials/core/yql_expr_optimize.h>
  5. #include <yql/essentials/core/yql_expr_type_annotation.h>
  6. #include <yql/essentials/core/yql_opt_rewrite_io.h>
  7. #include <yql/essentials/core/yql_opt_proposed_by_data.h>
  8. #include <yql/essentials/core/yql_gc_transformer.h>
  9. #include <yql/essentials/core/type_ann/type_ann_expr.h>
  10. #include <yql/essentials/core/services/yql_plan.h>
  11. #include <yql/essentials/core/services/yql_eval_params.h>
  12. #include <yql/essentials/sql/sql.h>
  13. #include <yql/essentials/sql/v1/sql.h>
  14. #include <yql/essentials/parser/pg_wrapper/interface/parser.h>
  15. #include <yql/essentials/utils/log/context.h>
  16. #include <yql/essentials/utils/log/profile.h>
  17. #include <yql/essentials/utils/limiting_allocator.h>
  18. #include <yql/essentials/core/services/yql_out_transformers.h>
  19. #include <yql/essentials/core/extract_predicate/extract_predicate_dbg.h>
  20. #include <yql/essentials/providers/common/provider/yql_provider_names.h>
  21. #include <yql/essentials/providers/common/provider/yql_provider.h>
  22. #include <yql/essentials/providers/common/udf_resolve/yql_simple_udf_resolver.h>
  23. #include <yql/essentials/providers/common/udf_resolve/yql_outproc_udf_resolver.h>
  24. #include <yql/essentials/providers/common/udf_resolve/yql_udf_resolver_with_index.h>
  25. #include <yql/essentials/providers/common/arrow_resolve/yql_simple_arrow_resolver.h>
  26. #include <yql/essentials/providers/common/config/yql_setting.h>
  27. #include <yql/essentials/core/qplayer/udf_resolver/yql_qplayer_udf_resolver.h>
  28. #include <library/cpp/yson/node/node_io.h>
  29. #include <library/cpp/deprecated/split/split_iterator.h>
  30. #include <library/cpp/yson/writer.h>
  31. #include <library/cpp/string_utils/base64/base64.h>
  32. #include <util/stream/file.h>
  33. #include <util/stream/null.h>
  34. #include <util/string/join.h>
  35. #include <util/string/split.h>
  36. #include <util/generic/guid.h>
  37. #include <util/system/rusage.h>
  38. #include <util/generic/yexception.h>
  39. using namespace NThreading;
  40. namespace NYql {
  41. namespace {
  42. const size_t DEFAULT_AST_BUF_SIZE = 1024;
  43. const size_t DEFAULT_PLAN_BUF_SIZE = 1024;
  44. const TString FacadeComponent = "Facade";
  45. const TString SourceCodeLabel = "SourceCode";
  46. const TString GatewaysLabel = "Gateways";
  47. const TString ParametersLabel = "Parameters";
  48. const TString TranslationLabel = "Translation";
  49. const TString StaticUserFilesLabel = "UserFiles";
  50. const TString DynamicUserFilesLabel = "DynamicUserFiles";
  51. const TString StaticCredentialsLabel = "Credentials";
  52. const TString DynamicCredentialsLabel = "DynamicCredentials";
  53. class TUrlLoader : public IUrlLoader {
  54. public:
  55. TUrlLoader(TFileStoragePtr storage)
  56. : Storage_(storage)
  57. {}
  58. TString Load(const TString& url, const TString& token) override {
  59. auto file = Storage_->PutUrl(url, token);
  60. return TFileInput(file->GetPath()).ReadAll();
  61. }
  62. private:
  63. TFileStoragePtr Storage_;
  64. };
  65. template <typename... Params1, typename... Params2>
  66. TProgram::TStatus SyncExecution(
  67. TProgram* program,
  68. TProgram::TFutureStatus (TProgram::*method)(Params1...),
  69. Params2&&... params) {
  70. TProgram::TFutureStatus future =
  71. (program->*method)(std::forward<Params2>(params)...);
  72. YQL_ENSURE(future.Initialized());
  73. future.Wait();
  74. HandleFutureException(future);
  75. TProgram::TStatus status = future.GetValue();
  76. while (status == TProgram::TStatus::Async) {
  77. auto continueFuture = program->ContinueAsync();
  78. continueFuture.Wait();
  79. HandleFutureException(continueFuture);
  80. status = continueFuture.GetValue();
  81. }
  82. if (status == TProgram::TStatus::Error) {
  83. program->Print(program->ExprStream(), program->PlanStream());
  84. }
  85. return status;
  86. }
  87. std::function<TString(const TString&, const TString&)> BuildDefaultTokenResolver(TCredentials::TPtr credentials) {
  88. return [credentials](const TString& /*url*/, const TString& alias) -> TString {
  89. if (alias) {
  90. if (auto cred = credentials->FindCredential(TString("default_").append(alias))) {
  91. return cred->Content;
  92. }
  93. }
  94. return {};
  95. };
  96. }
  97. std::function<TString(const TString&, const TString&)> BuildCompositeTokenResolver(TVector<std::function<TString(const TString&, const TString&)>>&& children) {
  98. if (children.empty()) {
  99. return {};
  100. }
  101. if (children.size() == 1) {
  102. return std::move(children[0]);
  103. }
  104. return [children = std::move(children)](const TString& url, const TString& alias) -> TString {
  105. for (auto& c : children) {
  106. if (auto r = c(url, alias)) {
  107. return r;
  108. }
  109. }
  110. return {};
  111. };
  112. }
  113. } // namspace
  114. ///////////////////////////////////////////////////////////////////////////////
  115. // TProgramFactory
  116. ///////////////////////////////////////////////////////////////////////////////
  117. TProgramFactory::TProgramFactory(
  118. bool useRepeatableRandomAndTimeProviders,
  119. const NKikimr::NMiniKQL::IFunctionRegistry* functionRegistry,
  120. ui64 nextUniqueId,
  121. const TVector<TDataProviderInitializer>& dataProvidersInit,
  122. const TString& runner)
  123. : UseRepeatableRandomAndTimeProviders_(useRepeatableRandomAndTimeProviders)
  124. , FunctionRegistry_(functionRegistry)
  125. , NextUniqueId_(nextUniqueId)
  126. , DataProvidersInit_(dataProvidersInit)
  127. , Credentials_(MakeIntrusive<TCredentials>())
  128. , GatewaysConfig_(nullptr)
  129. , Runner_(runner)
  130. , ArrowResolver_(MakeSimpleArrowResolver(*functionRegistry))
  131. {
  132. }
  133. void TProgramFactory::UnrepeatableRandom() {
  134. UseUnrepeatableRandom = true;
  135. }
  136. void TProgramFactory::EnableRangeComputeFor() {
  137. EnableRangeComputeFor_ = true;
  138. }
  139. void TProgramFactory::AddUserDataTable(const TUserDataTable& userDataTable) {
  140. for (const auto& p : userDataTable) {
  141. if (!UserDataTable_.emplace(p).second) {
  142. ythrow yexception() << "UserDataTable already has user data block with key " << p.first;
  143. }
  144. }
  145. }
  146. void TProgramFactory::SetCredentials(TCredentials::TPtr credentials) {
  147. Credentials_ = std::move(credentials);
  148. }
  149. void TProgramFactory::SetGatewaysConfig(const TGatewaysConfig* gatewaysConfig) {
  150. GatewaysConfig_ = gatewaysConfig;
  151. }
  152. void TProgramFactory::SetModules(IModuleResolver::TPtr modules) {
  153. Modules_ = modules;
  154. }
  155. void TProgramFactory::SetUdfResolver(IUdfResolver::TPtr udfResolver) {
  156. UdfResolver_ = udfResolver;
  157. }
  158. void TProgramFactory::SetUdfIndex(TUdfIndex::TPtr udfIndex, TUdfIndexPackageSet::TPtr udfIndexPackageSet) {
  159. UdfIndex_ = std::move(udfIndex);
  160. UdfIndexPackageSet_ = std::move(udfIndexPackageSet);
  161. }
  162. void TProgramFactory::SetFileStorage(TFileStoragePtr fileStorage) {
  163. FileStorage_ = std::move(fileStorage);
  164. }
  165. void TProgramFactory::SetUrlPreprocessing(IUrlPreprocessing::TPtr urlPreprocessing) {
  166. UrlPreprocessing_ = std::move(urlPreprocessing);
  167. }
  168. void TProgramFactory::SetArrowResolver(IArrowResolver::TPtr arrowResolver) {
  169. ArrowResolver_ = arrowResolver;
  170. }
  171. void TProgramFactory::SetUrlListerManager(IUrlListerManagerPtr urlListerManager) {
  172. UrlListerManager_ = std::move(urlListerManager);
  173. }
  174. TProgramPtr TProgramFactory::Create(
  175. const TFile& file,
  176. const TString& sessionId,
  177. const TQContext& qContext,
  178. TMaybe<TString> gatewaysForMerge)
  179. {
  180. TString sourceCode = TFileInput(file).ReadAll();
  181. return Create(file.GetName(), sourceCode, sessionId, EHiddenMode::Disable, qContext, gatewaysForMerge);
  182. }
  183. TProgramPtr TProgramFactory::Create(
  184. const TString& filename,
  185. const TString& sourceCode,
  186. const TString& sessionId,
  187. EHiddenMode hiddenMode,
  188. const TQContext& qContext,
  189. TMaybe<TString> gatewaysForMerge)
  190. {
  191. auto randomProvider = UseRepeatableRandomAndTimeProviders_ && !UseUnrepeatableRandom && hiddenMode == EHiddenMode::Disable ?
  192. CreateDeterministicRandomProvider(1) : CreateDefaultRandomProvider();
  193. auto timeProvider = UseRepeatableRandomAndTimeProviders_ ?
  194. CreateDeterministicTimeProvider(10000000) : CreateDefaultTimeProvider();
  195. TUdfIndex::TPtr udfIndex = UdfIndex_ ? UdfIndex_->Clone() : nullptr;
  196. TUdfIndexPackageSet::TPtr udfIndexPackageSet = (UdfIndexPackageSet_ && hiddenMode == EHiddenMode::Disable) ? UdfIndexPackageSet_->Clone() : nullptr;
  197. IModuleResolver::TPtr moduleResolver = Modules_ ? Modules_->CreateMutableChild() : nullptr;
  198. IUrlListerManagerPtr urlListerManager = UrlListerManager_ ? UrlListerManager_->Clone() : nullptr;
  199. auto udfResolver = udfIndex ? NCommon::CreateUdfResolverWithIndex(udfIndex, UdfResolver_, FileStorage_) : UdfResolver_;
  200. // make UserDataTable_ copy here
  201. return new TProgram(FunctionRegistry_, randomProvider, timeProvider, NextUniqueId_, DataProvidersInit_,
  202. UserDataTable_, Credentials_, moduleResolver, urlListerManager,
  203. udfResolver, udfIndex, udfIndexPackageSet, FileStorage_, UrlPreprocessing_,
  204. GatewaysConfig_, filename, sourceCode, sessionId, Runner_, EnableRangeComputeFor_, ArrowResolver_, hiddenMode,
  205. qContext, gatewaysForMerge);
  206. }
  207. ///////////////////////////////////////////////////////////////////////////////
  208. // TProgram
  209. ///////////////////////////////////////////////////////////////////////////////
  210. TProgram::TProgram(
  211. const NKikimr::NMiniKQL::IFunctionRegistry* functionRegistry,
  212. const TIntrusivePtr<IRandomProvider> randomProvider,
  213. const TIntrusivePtr<ITimeProvider> timeProvider,
  214. ui64 nextUniqueId,
  215. const TVector<TDataProviderInitializer>& dataProvidersInit,
  216. const TUserDataTable& userDataTable,
  217. const TCredentials::TPtr& credentials,
  218. const IModuleResolver::TPtr& modules,
  219. const IUrlListerManagerPtr& urlListerManager,
  220. const IUdfResolver::TPtr& udfResolver,
  221. const TUdfIndex::TPtr& udfIndex,
  222. const TUdfIndexPackageSet::TPtr& udfIndexPackageSet,
  223. const TFileStoragePtr& fileStorage,
  224. const IUrlPreprocessing::TPtr& urlPreprocessing,
  225. const TGatewaysConfig* gatewaysConfig,
  226. const TString& filename,
  227. const TString& sourceCode,
  228. const TString& sessionId,
  229. const TString& runner,
  230. bool enableRangeComputeFor,
  231. const IArrowResolver::TPtr& arrowResolver,
  232. EHiddenMode hiddenMode,
  233. const TQContext& qContext,
  234. TMaybe<TString> gatewaysForMerge
  235. )
  236. : FunctionRegistry_(functionRegistry)
  237. , RandomProvider_(randomProvider)
  238. , TimeProvider_(timeProvider)
  239. , NextUniqueId_(nextUniqueId)
  240. , AstRoot_(nullptr)
  241. , Modules_(modules)
  242. , DataProvidersInit_(dataProvidersInit)
  243. , Credentials_(MakeIntrusive<NYql::TCredentials>(*credentials))
  244. , UrlListerManager_(urlListerManager)
  245. , UdfResolver_(udfResolver)
  246. , UdfIndex_(udfIndex)
  247. , UdfIndexPackageSet_(udfIndexPackageSet)
  248. , FileStorage_(fileStorage)
  249. , SavedUserDataTable_(userDataTable)
  250. , GatewaysConfig_(gatewaysConfig)
  251. , Filename_(filename)
  252. , SourceCode_(sourceCode)
  253. , SourceSyntax_(ESourceSyntax::Unknown)
  254. , SyntaxVersion_(0)
  255. , ExprRoot_(nullptr)
  256. , SessionId_(sessionId)
  257. , ResultType_(IDataProvider::EResultFormat::Yson)
  258. , ResultFormat_(NYson::EYsonFormat::Binary)
  259. , OutputFormat_(NYson::EYsonFormat::Pretty)
  260. , EnableRangeComputeFor_(enableRangeComputeFor)
  261. , ArrowResolver_(arrowResolver)
  262. , HiddenMode_(hiddenMode)
  263. , QContext_(qContext)
  264. , GatewaysForMerge_(gatewaysForMerge)
  265. {
  266. if (SessionId_.empty()) {
  267. SessionId_ = CreateGuidAsString();
  268. }
  269. if (QContext_.CanWrite()) {
  270. NYT::TNode credListNode = NYT::TNode::CreateList();
  271. Credentials_->ForEach([&](const TString name, const TCredential& cred) {
  272. credListNode.Add(NYT::TNode()
  273. ("Name", name)
  274. ("Category", cred.Category)
  275. ("Subcategory", cred.Subcategory));
  276. });
  277. auto credList = NYT::NodeToYsonString(credListNode, NYT::NYson::EYsonFormat::Binary);
  278. QContext_.GetWriter()->Put({FacadeComponent, StaticCredentialsLabel}, credList).GetValueSync();
  279. } else if (QContext_.CanRead()) {
  280. Credentials_ = MakeIntrusive<TCredentials>();
  281. Credentials_->SetUserCredentials({
  282. .OauthToken = "REPLAY_OAUTH",
  283. .BlackboxSessionIdCookie = "REPLAY_SESSIONID"
  284. });
  285. for (const auto& label : {StaticCredentialsLabel, DynamicCredentialsLabel}) {
  286. auto item = QContext_.GetReader()->Get({FacadeComponent, label}).GetValueSync();
  287. if (item) {
  288. auto node = NYT::NodeFromYsonString(item->Value);
  289. for (const auto& c : node.AsList()) {
  290. Credentials_->AddCredential(c["Name"].AsString(), TCredential(
  291. c["Category"].AsString(),c["Subcategory"].AsString(),"REPLAY"));
  292. }
  293. }
  294. }
  295. }
  296. if (QContext_.CanWrite() && !SavedUserDataTable_.empty()) {
  297. NYT::TNode userFilesNode = NYT::TNode::CreateList();
  298. for (const auto& p : SavedUserDataTable_) {
  299. userFilesNode.Add(p.first.Alias());
  300. }
  301. auto userFiles = NYT::NodeToYsonString(userFilesNode, NYT::NYson::EYsonFormat::Binary);
  302. QContext_.GetWriter()->Put({FacadeComponent, StaticUserFilesLabel}, userFiles).GetValueSync();
  303. } else if (QContext_.CanRead()) {
  304. SavedUserDataTable_.clear();
  305. for (const auto& label : {StaticUserFilesLabel, DynamicUserFilesLabel}) {
  306. auto item = QContext_.GetReader()->Get({FacadeComponent, label}).GetValueSync();
  307. if (item) {
  308. auto node = NYT::NodeFromYsonString(item->Value);
  309. for (const auto& alias : node.AsList()) {
  310. TUserDataBlock block;
  311. block.Type = EUserDataType::RAW_INLINE_DATA;
  312. YQL_ENSURE(SavedUserDataTable_.emplace(TUserDataKey::File(alias.AsString()), block).second);
  313. }
  314. }
  315. }
  316. }
  317. UserDataStorage_ = MakeIntrusive<TUserDataStorage>(fileStorage, SavedUserDataTable_, udfResolver, udfIndex);
  318. if (auto modules = dynamic_cast<TModuleResolver*>(Modules_.get())) {
  319. modules->AttachUserData(UserDataStorage_);
  320. modules->SetUrlLoader(new TUrlLoader(FileStorage_));
  321. modules->SetCredentials(Credentials_);
  322. if (QContext_) {
  323. modules->SetQContext(QContext_);
  324. }
  325. }
  326. if (UrlListerManager_) {
  327. UrlListerManager_->SetCredentials(Credentials_);
  328. UrlListerManager_->SetUrlPreprocessing(urlPreprocessing);
  329. }
  330. OperationOptions_.Runner = runner;
  331. UserDataStorage_->SetUrlPreprocessor(urlPreprocessing);
  332. if (QContext_) {
  333. UdfResolver_ = NCommon::WrapUdfResolverWithQContext(UdfResolver_, QContext_);
  334. if (QContext_.CanRead()) {
  335. auto item = QContext_.GetReader()->Get({FacadeComponent, GatewaysLabel}).GetValueSync();
  336. if (item) {
  337. YQL_ENSURE(LoadedGatewaysConfig_.ParseFromString(item->Value));
  338. if (GatewaysForMerge_) {
  339. YQL_ENSURE(LoadedGatewaysConfig_.MergeFromString(*GatewaysForMerge_));
  340. }
  341. GatewaysConfig_ = &LoadedGatewaysConfig_;
  342. }
  343. } else if (QContext_.CanWrite() && GatewaysConfig_) {
  344. TGatewaysConfig cleaned;
  345. if (GatewaysConfig_->HasYt()) {
  346. cleaned.MutableYt()->CopyFrom(GatewaysConfig_->GetYt());
  347. }
  348. if (GatewaysConfig_->HasFs()) {
  349. cleaned.MutableFs()->CopyFrom(GatewaysConfig_->GetFs());
  350. }
  351. if (GatewaysConfig_->HasYqlCore()) {
  352. cleaned.MutableYqlCore()->CopyFrom(GatewaysConfig_->GetYqlCore());
  353. }
  354. if (GatewaysConfig_->HasSqlCore()) {
  355. cleaned.MutableSqlCore()->CopyFrom(GatewaysConfig_->GetSqlCore());
  356. }
  357. if (GatewaysConfig_->HasDq()) {
  358. cleaned.MutableDq()->CopyFrom(GatewaysConfig_->GetDq());
  359. }
  360. auto data = cleaned.SerializeAsString();
  361. QContext_.GetWriter()->Put({FacadeComponent, GatewaysLabel}, data).GetValueSync();
  362. }
  363. if (QContext_.CanRead()) {
  364. auto item = QContext_.GetReader()->Get({FacadeComponent, ParametersLabel}).GetValueSync();
  365. if (item) {
  366. SetParametersYson(item->Value);
  367. }
  368. }
  369. }
  370. }
  371. TProgram::~TProgram() {
  372. try {
  373. CloseLastSession().GetValueSync();
  374. // stop all non complete execution before deleting TExprCtx
  375. with_lock (DataProvidersLock_) {
  376. DataProviders_.clear();
  377. }
  378. } catch (...) {
  379. Cerr << CurrentExceptionMessage() << Endl;
  380. }
  381. }
  382. void TProgram::ConfigureYsonResultFormat(NYson::EYsonFormat format) {
  383. ResultFormat_ = format;
  384. OutputFormat_ = format;
  385. }
  386. void TProgram::SetValidateOptions(NUdf::EValidateMode validateMode) {
  387. Y_ENSURE(!TypeCtx_, "TypeCtx_ already created");
  388. ValidateMode_ = validateMode;
  389. }
  390. void TProgram::SetDisableNativeUdfSupport(bool disable) {
  391. Y_ENSURE(!TypeCtx_, "TypeCtx_ already created");
  392. DisableNativeUdfSupport_ = disable;
  393. }
  394. void TProgram::SetUseTableMetaFromGraph(bool use) {
  395. Y_ENSURE(!TypeCtx_, "TypeCtx_ already created");
  396. UseTableMetaFromGraph_ = use;
  397. }
  398. TTypeAnnotationContextPtr TProgram::GetAnnotationContext() const {
  399. Y_ENSURE(TypeCtx_, "TypeCtx_ is not created");
  400. return TypeCtx_;
  401. }
  402. TTypeAnnotationContextPtr TProgram::ProvideAnnotationContext(const TString& username) {
  403. if (!TypeCtx_) {
  404. TypeCtx_ = BuildTypeAnnotationContext(username);
  405. TypeCtx_->OperationOptions = OperationOptions_;
  406. TypeCtx_->ValidateMode = ValidateMode_;
  407. TypeCtx_->DisableNativeUdfSupport = DisableNativeUdfSupport_;
  408. TypeCtx_->UseTableMetaFromGraph = UseTableMetaFromGraph_;
  409. }
  410. return TypeCtx_;
  411. }
  412. IPlanBuilder& TProgram::GetPlanBuilder() {
  413. if (!PlanBuilder_) {
  414. PlanBuilder_ = CreatePlanBuilder(*GetAnnotationContext());
  415. }
  416. return *PlanBuilder_;
  417. }
  418. void TProgram::SetParametersYson(const TString& parameters) {
  419. Y_ENSURE(!TypeCtx_, "TypeCtx_ already created");
  420. auto node = NYT::NodeFromYsonString(parameters);
  421. YQL_ENSURE(node.IsMap());
  422. for (const auto& x : node.AsMap()) {
  423. YQL_ENSURE(x.second.IsMap());
  424. YQL_ENSURE(x.second.HasKey("Data"));
  425. YQL_ENSURE(x.second.Size() == 1);
  426. }
  427. OperationOptions_.ParametersYson = node;
  428. if (auto modules = dynamic_cast<TModuleResolver*>(Modules_.get())) {
  429. modules->SetParameters(node);
  430. }
  431. if (UrlListerManager_) {
  432. UrlListerManager_->SetParameters(node);
  433. }
  434. if (QContext_.CanWrite()) {
  435. QContext_.GetWriter()->Put({FacadeComponent, ParametersLabel}, parameters).GetValueSync();
  436. }
  437. }
  438. bool TProgram::ExtractQueryParametersMetadata() {
  439. auto& types = *GetAnnotationContext();
  440. NYT::TNode params = NYT::TNode::CreateMap();
  441. Y_ENSURE(ExprCtx_);
  442. if (!ExtractParametersMetaAsYson(ExprRoot_, types, *ExprCtx_, params)) {
  443. return false;
  444. }
  445. ExtractedQueryParametersMetadataYson_ = NYT::NodeToYsonString(params, ResultFormat_);
  446. return true;
  447. }
  448. bool TProgram::FillParseResult(NYql::TAstParseResult&& astRes, NYql::TWarningRules* warningRules) {
  449. if (!astRes.Issues.Empty()) {
  450. if (!ExprCtx_) {
  451. ExprCtx_.Reset(new TExprContext(NextUniqueId_));
  452. }
  453. auto& iManager = ExprCtx_->IssueManager;
  454. if (warningRules) {
  455. for (auto warningRule: *warningRules) {
  456. iManager.AddWarningRule(warningRule);
  457. }
  458. }
  459. iManager.AddScope([this]() {
  460. TIssuePtr issueHolder = new TIssue();
  461. issueHolder->SetMessage(TStringBuilder() << "Parse " << SourceSyntax_);
  462. issueHolder->Severity = TSeverityIds::S_INFO;
  463. return issueHolder;
  464. });
  465. for (auto issue: astRes.Issues) {
  466. iManager.RaiseWarning(issue);
  467. }
  468. iManager.LeaveScope();
  469. }
  470. if (!astRes.IsOk()) {
  471. return false;
  472. }
  473. AstRoot_ = astRes.Root;
  474. AstPool_ = std::move(astRes.Pool);
  475. return true;
  476. }
  477. TString TProgram::GetSessionId() const {
  478. with_lock(SessionIdLock_) {
  479. return SessionId_;
  480. }
  481. }
  482. void TProgram::AddCredentials(const TVector<std::pair<TString, TCredential>>& credentials) {
  483. Y_ENSURE(!TypeCtx_, "TypeCtx_ already created");
  484. if (QContext_.CanWrite()) {
  485. NYT::TNode credListNode = NYT::TNode::CreateList();
  486. for (const auto& c : credentials) {
  487. credListNode.Add(NYT::TNode()
  488. ("Name", c.first)
  489. ("Category", c.second.Category)
  490. ("Subcategory", c.second.Subcategory));
  491. }
  492. auto credList = NYT::NodeToYsonString(credListNode, NYT::NYson::EYsonFormat::Binary);
  493. QContext_.GetWriter()->Put({FacadeComponent, DynamicCredentialsLabel}, credList).GetValueSync();
  494. }
  495. for (const auto& credential : credentials) {
  496. Credentials_->AddCredential(credential.first, credential.second);
  497. }
  498. if (auto modules = dynamic_cast<TModuleResolver*>(Modules_.get())) {
  499. modules->SetCredentials(Credentials_);
  500. }
  501. if (UrlListerManager_) {
  502. UrlListerManager_->SetCredentials(Credentials_);
  503. }
  504. }
  505. void TProgram::ClearCredentials() {
  506. Y_ENSURE(!TypeCtx_, "TypeCtx_ already created");
  507. Credentials_ = MakeIntrusive<TCredentials>();
  508. if (auto modules = dynamic_cast<TModuleResolver*>(Modules_.get())) {
  509. modules->SetCredentials(Credentials_);
  510. }
  511. if (UrlListerManager_) {
  512. UrlListerManager_->SetCredentials(Credentials_);
  513. }
  514. }
  515. void TProgram::AddUserDataTable(const TUserDataTable& userDataTable) {
  516. for (const auto& p : userDataTable) {
  517. if (!SavedUserDataTable_.emplace(p).second) {
  518. ythrow yexception() << "UserDataTable already has user data block with key " << p.first;
  519. }
  520. UserDataStorage_->AddUserDataBlock(p.first, p.second);
  521. }
  522. if (QContext_.CanWrite()) {
  523. NYT::TNode userFilesNode;
  524. for (const auto& p : userDataTable) {
  525. userFilesNode.Add(p.first.Alias());
  526. }
  527. auto userFiles = NYT::NodeToYsonString(userFilesNode, NYT::NYson::EYsonFormat::Binary);
  528. QContext_.GetWriter()->Put({FacadeComponent, DynamicUserFilesLabel}, userFiles).GetValueSync();
  529. }
  530. }
  531. void TProgram::HandleSourceCode(TString& sourceCode) {
  532. if (QContext_.CanWrite()) {
  533. QContext_.GetWriter()->Put({FacadeComponent, SourceCodeLabel}, sourceCode).GetValueSync();
  534. } else if (QContext_.CanRead()) {
  535. auto loaded = QContext_.GetReader()->Get({FacadeComponent, SourceCodeLabel}).GetValueSync();
  536. Y_ENSURE(loaded.Defined(), "No source code");
  537. sourceCode = loaded->Value;
  538. }
  539. }
  540. namespace {
  541. THashSet<TString> ExtractSqlFlags(const NYT::TNode& dataNode) {
  542. THashSet<TString> result;
  543. for (const auto& f : dataNode["SqlFlags"].AsList()) {
  544. result.insert(f.AsString());
  545. }
  546. return result;
  547. }
  548. } // namespace
  549. void UpdateSqlFlagsFromQContext(const TQContext& qContext, THashSet<TString>& flags) {
  550. if (qContext.CanRead()) {
  551. auto loaded = qContext.GetReader()->Get({FacadeComponent, TranslationLabel}).GetValueSync();
  552. if (!loaded) {
  553. return;
  554. }
  555. auto dataNode = NYT::NodeFromYsonString(loaded->Value);
  556. flags = ExtractSqlFlags(dataNode);
  557. }
  558. }
  559. void TProgram::HandleTranslationSettings(NSQLTranslation::TTranslationSettings& loadedSettings,
  560. NSQLTranslation::TTranslationSettings*& currentSettings)
  561. {
  562. if (QContext_.CanWrite()) {
  563. auto clusterMappingsNode = NYT::TNode::CreateMap();
  564. for (const auto& c : currentSettings->ClusterMapping) {
  565. clusterMappingsNode(c.first, c.second);
  566. }
  567. auto sqlFlagsNode = NYT::TNode::CreateList();
  568. for (const auto& f : currentSettings->Flags) {
  569. sqlFlagsNode.Add(f);
  570. }
  571. auto dataNode = NYT::TNode()
  572. ("ClusterMapping", clusterMappingsNode)
  573. ("V0Behavior", ui64(currentSettings->V0Behavior))
  574. ("V0WarnAsError", currentSettings->V0WarnAsError->Allow())
  575. ("DqDefaultAuto", currentSettings->DqDefaultAuto->Allow())
  576. ("BlockDefaultAuto", currentSettings->BlockDefaultAuto->Allow())
  577. ("SqlFlags", sqlFlagsNode);
  578. auto data = NYT::NodeToYsonString(dataNode, NYT::NYson::EYsonFormat::Binary);
  579. QContext_.GetWriter()->Put({FacadeComponent, TranslationLabel}, data).GetValueSync();
  580. } else if (QContext_.CanRead()) {
  581. auto loaded = QContext_.GetReader()->Get({FacadeComponent, TranslationLabel}).GetValueSync();
  582. if (!loaded) {
  583. return;
  584. }
  585. auto dataNode = NYT::NodeFromYsonString(loaded->Value);
  586. loadedSettings.ClusterMapping.clear();
  587. for (const auto& c : dataNode["ClusterMapping"].AsMap()) {
  588. loadedSettings.ClusterMapping[c.first] = c.second.AsString();
  589. }
  590. loadedSettings.Flags = ExtractSqlFlags(dataNode);
  591. loadedSettings.V0Behavior = (NSQLTranslation::EV0Behavior)dataNode["V0Behavior"].AsUint64();
  592. loadedSettings.V0WarnAsError = NSQLTranslation::ISqlFeaturePolicy::Make(dataNode["V0WarnAsError"].AsBool());
  593. loadedSettings.DqDefaultAuto = NSQLTranslation::ISqlFeaturePolicy::Make(dataNode["DqDefaultAuto"].AsBool());
  594. loadedSettings.BlockDefaultAuto = NSQLTranslation::ISqlFeaturePolicy::Make(dataNode["BlockDefaultAuto"].AsBool());
  595. currentSettings = &loadedSettings;
  596. }
  597. }
  598. bool TProgram::ParseYql() {
  599. YQL_PROFILE_FUNC(TRACE);
  600. YQL_ENSURE(SourceSyntax_ == ESourceSyntax::Unknown);
  601. SourceSyntax_ = ESourceSyntax::Yql;
  602. SyntaxVersion_ = 1;
  603. auto sourceCode = SourceCode_;
  604. HandleSourceCode(sourceCode);
  605. return FillParseResult(ParseAst(sourceCode));
  606. }
  607. bool TProgram::ParseSql() {
  608. YQL_PROFILE_FUNC(TRACE);
  609. static const THashMap<TString, TString> clusters = {
  610. { "plato", TString(YtProviderName) }
  611. };
  612. NSQLTranslation::TTranslationSettings settings;
  613. settings.ClusterMapping = clusters;
  614. return ParseSql(settings);
  615. }
  616. bool TProgram::ParseSql(const NSQLTranslation::TTranslationSettings& settings)
  617. {
  618. YQL_PROFILE_FUNC(TRACE);
  619. YQL_ENSURE(SourceSyntax_ == ESourceSyntax::Unknown);
  620. SourceSyntax_ = ESourceSyntax::Sql;
  621. SyntaxVersion_ = settings.SyntaxVersion;
  622. NYql::TWarningRules warningRules;
  623. auto sourceCode = SourceCode_;
  624. HandleSourceCode(sourceCode);
  625. NSQLTranslation::TTranslationSettings outerSettings = settings;
  626. NSQLTranslation::TTranslationSettings* currentSettings = &outerSettings;
  627. NSQLTranslation::TTranslationSettings loadedSettings;
  628. loadedSettings.PgParser = settings.PgParser;
  629. if (QContext_) {
  630. HandleTranslationSettings(loadedSettings, currentSettings);
  631. }
  632. currentSettings->EmitReadsForExists = true;
  633. NSQLTranslation::TTranslators translators(
  634. nullptr,
  635. NSQLTranslationV1::MakeTranslator(),
  636. NSQLTranslationPG::MakeTranslator()
  637. );
  638. return FillParseResult(SqlToYql(translators, sourceCode, *currentSettings, &warningRules), &warningRules);
  639. }
  640. bool TProgram::Compile(const TString& username, bool skipLibraries) {
  641. YQL_PROFILE_FUNC(TRACE);
  642. Y_ENSURE(AstRoot_, "Program not parsed yet");
  643. if (!ExprCtx_) {
  644. ExprCtx_.Reset(new TExprContext(NextUniqueId_));
  645. }
  646. if (!ProvideAnnotationContext(username)->Initialize(*ExprCtx_)) {
  647. return false;
  648. }
  649. TypeCtx_->IsReadOnly = true;
  650. if (!skipLibraries && Modules_.get()) {
  651. auto libs = UserDataStorage_->GetLibraries();
  652. for (auto lib : libs) {
  653. if (!Modules_->AddFromFile(lib, *ExprCtx_, SyntaxVersion_, 0)) {
  654. return false;
  655. }
  656. }
  657. }
  658. if (!CompileExpr(
  659. *AstRoot_, ExprRoot_, *ExprCtx_,
  660. skipLibraries ? nullptr : Modules_.get(),
  661. skipLibraries ? nullptr : UrlListerManager_.Get(), 0, SyntaxVersion_
  662. )) {
  663. return false;
  664. }
  665. return true;
  666. }
  667. bool TProgram::CollectUsedClusters() {
  668. using namespace NNodes;
  669. if (!UsedClusters_) {
  670. UsedClusters_.ConstructInPlace();
  671. UsedProviders_.ConstructInPlace();
  672. auto& typesCtx = *GetAnnotationContext();
  673. auto& ctx = *ExprCtx_;
  674. auto& usedClusters = *UsedClusters_;
  675. auto& usedProviders = *UsedProviders_;
  676. bool hasErrors = false;
  677. VisitExpr(ExprRoot_, [&typesCtx, &ctx, &usedClusters, &usedProviders, &hasErrors](const TExprNode::TPtr& node) {
  678. if (auto dsNode = TMaybeNode<TCoDataSource>(node)) {
  679. auto datasource = typesCtx.DataSourceMap.FindPtr(dsNode.Cast().Category());
  680. YQL_ENSURE(datasource, "Unknown DataSource: " << dsNode.Cast().Category().Value());
  681. TMaybe<TString> cluster;
  682. if (!(*datasource)->ValidateParameters(*node, ctx, cluster)) {
  683. hasErrors = true;
  684. return false;
  685. }
  686. usedProviders.insert(TString(dsNode.Cast().Category().Value()));
  687. if (cluster && *cluster != NCommon::ALL_CLUSTERS) {
  688. usedClusters.insert(*cluster);
  689. }
  690. }
  691. if (auto dsNode = TMaybeNode<TCoDataSink>(node)) {
  692. auto datasink = typesCtx.DataSinkMap.FindPtr(dsNode.Cast().Category());
  693. YQL_ENSURE(datasink, "Unknown DataSink: " << dsNode.Cast().Category().Value());
  694. TMaybe<TString> cluster;
  695. if (!(*datasink)->ValidateParameters(*node, ctx, cluster)) {
  696. hasErrors = true;
  697. return false;
  698. }
  699. usedProviders.insert(TString(dsNode.Cast().Category().Value()));
  700. if (cluster) {
  701. usedClusters.insert(*cluster);
  702. }
  703. }
  704. return true;
  705. });
  706. if (hasErrors) {
  707. UsedClusters_ = Nothing();
  708. UsedProviders_ = Nothing();
  709. return false;
  710. }
  711. }
  712. return true;
  713. }
  714. TProgram::TStatus TProgram::Discover(const TString& username) {
  715. YQL_PROFILE_FUNC(TRACE);
  716. auto m = &TProgram::DiscoverAsync;
  717. return SyncExecution(this, m, username);
  718. }
  719. TProgram::TFutureStatus TProgram::DiscoverAsync(const TString& username) {
  720. if (!ProvideAnnotationContext(username)->Initialize(*ExprCtx_) || !CollectUsedClusters()) {
  721. return NThreading::MakeFuture<TStatus>(IGraphTransformer::TStatus::Error);
  722. }
  723. TypeCtx_->DiscoveryMode = true;
  724. Y_ENSURE(ExprRoot_, "Program not compiled yet");
  725. Transformer_ = TTransformationPipeline(TypeCtx_)
  726. .AddServiceTransformers()
  727. .AddParametersEvaluation(*FunctionRegistry_)
  728. .AddPreTypeAnnotation()
  729. .AddExpressionEvaluation(*FunctionRegistry_)
  730. .AddPreIOAnnotation()
  731. .Build();
  732. TFuture<void> openSession = OpenSession(username);
  733. if (!openSession.Initialized()) {
  734. return NThreading::MakeFuture<TStatus>(IGraphTransformer::TStatus::Error);
  735. }
  736. return openSession.Apply([this](const TFuture<void>& f) {
  737. YQL_LOG_CTX_ROOT_SESSION_SCOPE(GetSessionId());
  738. try {
  739. f.GetValue();
  740. } catch (const std::exception& e) {
  741. YQL_LOG(ERROR) << "OpenSession error: " << e.what();
  742. ExprCtx_->IssueManager.RaiseIssue(ExceptionToIssue(e));
  743. return NThreading::MakeFuture<TStatus>(IGraphTransformer::TStatus::Error);
  744. }
  745. return AsyncTransform(*Transformer_, ExprRoot_, *ExprCtx_, false);
  746. });
  747. }
  748. TProgram::TStatus TProgram::Lineage(const TString& username, IOutputStream* traceOut, IOutputStream* exprOut, bool withTypes) {
  749. YQL_PROFILE_FUNC(TRACE);
  750. auto m = &TProgram::LineageAsync;
  751. return SyncExecution(this, m, username, traceOut, exprOut, withTypes);
  752. }
  753. TProgram::TFutureStatus TProgram::LineageAsync(const TString& username, IOutputStream* traceOut, IOutputStream* exprOut, bool withTypes) {
  754. if (!ProvideAnnotationContext(username)->Initialize(*ExprCtx_) || !CollectUsedClusters()) {
  755. return NThreading::MakeFuture<TStatus>(IGraphTransformer::TStatus::Error);
  756. }
  757. TypeCtx_->IsReadOnly = true;
  758. Y_ENSURE(ExprRoot_, "Program not compiled yet");
  759. ExprStream_ = exprOut;
  760. Transformer_ = TTransformationPipeline(TypeCtx_)
  761. .AddServiceTransformers()
  762. .AddParametersEvaluation(*FunctionRegistry_)
  763. .AddPreTypeAnnotation()
  764. .AddExpressionEvaluation(*FunctionRegistry_)
  765. .AddIOAnnotation()
  766. .AddTypeAnnotation(TIssuesIds::CORE_TYPE_ANN, true)
  767. .AddPostTypeAnnotation()
  768. .Add(TExprOutputTransformer::Sync(ExprRoot_, traceOut), "ExprOutput")
  769. .AddLineageOptimization(LineageStr_)
  770. .Add(TExprOutputTransformer::Sync(ExprRoot_, exprOut, withTypes), "AstOutput")
  771. .Build();
  772. TFuture<void> openSession = OpenSession(username);
  773. if (!openSession.Initialized())
  774. return NThreading::MakeFuture<TStatus>(IGraphTransformer::TStatus::Error);
  775. SaveExprRoot();
  776. return openSession.Apply([this](const TFuture<void>& f) {
  777. YQL_LOG_CTX_ROOT_SESSION_SCOPE(GetSessionId());
  778. try {
  779. f.GetValue();
  780. } catch (const std::exception& e) {
  781. YQL_LOG(ERROR) << "OpenSession error: " << e.what();
  782. ExprCtx_->IssueManager.RaiseIssue(ExceptionToIssue(e));
  783. return NThreading::MakeFuture<TStatus>(IGraphTransformer::TStatus::Error);
  784. }
  785. return AsyncTransformWithFallback(false);
  786. });
  787. }
  788. TProgram::TStatus TProgram::Validate(const TString& username, IOutputStream* exprOut, bool withTypes) {
  789. YQL_PROFILE_FUNC(TRACE);
  790. auto m = &TProgram::ValidateAsync;
  791. return SyncExecution(this, m, username, exprOut, withTypes);
  792. }
  793. TProgram::TFutureStatus TProgram::ValidateAsync(const TString& username, IOutputStream* exprOut, bool withTypes) {
  794. if (!ProvideAnnotationContext(username)->Initialize(*ExprCtx_) || !CollectUsedClusters()) {
  795. return NThreading::MakeFuture<TStatus>(IGraphTransformer::TStatus::Error);
  796. }
  797. TypeCtx_->IsReadOnly = true;
  798. TVector<TDataProviderInfo> dataProviders;
  799. with_lock (DataProvidersLock_) {
  800. dataProviders = DataProviders_;
  801. }
  802. for (const auto& dp : dataProviders) {
  803. if (!dp.RemoteClusterProvider || !dp.RemoteValidate) {
  804. continue;
  805. }
  806. if (auto cluster = dp.RemoteClusterProvider(UsedClusters_, UsedProviders_, SourceSyntax_)) {
  807. return dp.RemoteValidate(*cluster, SourceSyntax_, SourceCode_, *ExprCtx_);
  808. }
  809. }
  810. Y_ENSURE(ExprRoot_, "Program not compiled yet");
  811. ExprStream_ = exprOut;
  812. Transformer_ = TTransformationPipeline(TypeCtx_)
  813. .AddServiceTransformers()
  814. .AddParametersEvaluation(*FunctionRegistry_)
  815. .AddPreTypeAnnotation()
  816. .AddExpressionEvaluation(*FunctionRegistry_)
  817. .AddIOAnnotation()
  818. .AddTypeAnnotation(TIssuesIds::CORE_TYPE_ANN, true)
  819. .Add(TExprOutputTransformer::Sync(ExprRoot_, exprOut, withTypes), "AstOutput")
  820. .Build();
  821. TFuture<void> openSession = OpenSession(username);
  822. if (!openSession.Initialized()) {
  823. return NThreading::MakeFuture<TStatus>(IGraphTransformer::TStatus::Error);
  824. }
  825. SaveExprRoot();
  826. return openSession.Apply([this](const TFuture<void>& f) {
  827. YQL_LOG_CTX_ROOT_SESSION_SCOPE(GetSessionId());
  828. try {
  829. f.GetValue();
  830. } catch (const std::exception& e) {
  831. YQL_LOG(ERROR) << "OpenSession error: " << e.what();
  832. ExprCtx_->IssueManager.RaiseIssue(ExceptionToIssue(e));
  833. return NThreading::MakeFuture<TStatus>(IGraphTransformer::TStatus::Error);
  834. }
  835. return AsyncTransformWithFallback(false);
  836. });
  837. }
  838. TProgram::TStatus TProgram::Optimize(
  839. const TString& username,
  840. IOutputStream* traceOut,
  841. IOutputStream* tracePlan,
  842. IOutputStream* exprOut,
  843. bool withTypes)
  844. {
  845. YQL_PROFILE_FUNC(TRACE);
  846. auto m = &TProgram::OptimizeAsync;
  847. return SyncExecution(this, m, username, traceOut, tracePlan, exprOut, withTypes);
  848. }
  849. TProgram::TFutureStatus TProgram::OptimizeAsync(
  850. const TString& username,
  851. IOutputStream* traceOut,
  852. IOutputStream* tracePlan,
  853. IOutputStream* exprOut,
  854. bool withTypes)
  855. {
  856. if (!ProvideAnnotationContext(username)->Initialize(*ExprCtx_) || !CollectUsedClusters()) {
  857. return NThreading::MakeFuture<TStatus>(IGraphTransformer::TStatus::Error);
  858. }
  859. TypeCtx_->IsReadOnly = true;
  860. TVector<TDataProviderInfo> dataProviders;
  861. with_lock (DataProvidersLock_) {
  862. dataProviders = DataProviders_;
  863. }
  864. for (const auto& dp : dataProviders) {
  865. if (!dp.RemoteClusterProvider || !dp.RemoteOptimize) {
  866. continue;
  867. }
  868. if (auto cluster = dp.RemoteClusterProvider(UsedClusters_, UsedProviders_, SourceSyntax_)) {
  869. return dp.RemoteOptimize(*cluster,
  870. SourceSyntax_, SourceCode_, nullptr,
  871. TypeCtx_, ExprRoot_, *ExprCtx_, ExternalQueryAst_, ExternalQueryPlan_);
  872. }
  873. }
  874. Y_ENSURE(ExprRoot_, "Program not compiled yet");
  875. ExprStream_ = exprOut;
  876. PlanStream_ = tracePlan;
  877. Transformer_ = TTransformationPipeline(TypeCtx_)
  878. .AddServiceTransformers()
  879. .AddParametersEvaluation(*FunctionRegistry_)
  880. .AddPreTypeAnnotation()
  881. .AddExpressionEvaluation(*FunctionRegistry_)
  882. .AddIOAnnotation()
  883. .AddTypeAnnotation(TIssuesIds::CORE_TYPE_ANN, true)
  884. .AddPostTypeAnnotation()
  885. .Add(TExprOutputTransformer::Sync(ExprRoot_, traceOut), "ExprOutput")
  886. .AddOptimization()
  887. .Add(TExprOutputTransformer::Sync(ExprRoot_, exprOut, withTypes), "AstOutput")
  888. .Build();
  889. TFuture<void> openSession = OpenSession(username);
  890. if (!openSession.Initialized())
  891. return NThreading::MakeFuture<TStatus>(IGraphTransformer::TStatus::Error);
  892. SaveExprRoot();
  893. return openSession.Apply([this](const TFuture<void>& f) {
  894. YQL_LOG_CTX_ROOT_SESSION_SCOPE(GetSessionId());
  895. try {
  896. f.GetValue();
  897. } catch (const std::exception& e) {
  898. YQL_LOG(ERROR) << "OpenSession error: " << e.what();
  899. ExprCtx_->IssueManager.RaiseIssue(ExceptionToIssue(e));
  900. return NThreading::MakeFuture<TStatus>(IGraphTransformer::TStatus::Error);
  901. }
  902. return AsyncTransformWithFallback(false);
  903. });
  904. }
  905. TProgram::TStatus TProgram::OptimizeWithConfig(
  906. const TString& username, const IPipelineConfigurator& pipelineConf)
  907. {
  908. YQL_PROFILE_FUNC(TRACE);
  909. auto m = &TProgram::OptimizeAsyncWithConfig;
  910. return SyncExecution(this, m, username, pipelineConf);
  911. }
  912. TProgram::TFutureStatus TProgram::OptimizeAsyncWithConfig(
  913. const TString& username, const IPipelineConfigurator& pipelineConf)
  914. {
  915. if (!ProvideAnnotationContext(username)->Initialize(*ExprCtx_) || !CollectUsedClusters()) {
  916. return NThreading::MakeFuture<TStatus>(IGraphTransformer::TStatus::Error);
  917. }
  918. TypeCtx_->IsReadOnly = true;
  919. TVector<TDataProviderInfo> dataProviders;
  920. with_lock (DataProvidersLock_) {
  921. dataProviders = DataProviders_;
  922. }
  923. for (const auto& dp : DataProviders_) {
  924. if (!dp.RemoteClusterProvider || !dp.RemoteOptimize) {
  925. continue;
  926. }
  927. if (auto cluster = dp.RemoteClusterProvider(UsedClusters_, UsedProviders_, SourceSyntax_)) {
  928. return dp.RemoteOptimize(*cluster,
  929. SourceSyntax_, SourceCode_, &pipelineConf,
  930. TypeCtx_, ExprRoot_, *ExprCtx_, ExternalQueryAst_, ExternalQueryPlan_);
  931. }
  932. }
  933. Y_ENSURE(ExprRoot_, "Program not compiled yet");
  934. TTransformationPipeline pipeline(TypeCtx_);
  935. pipelineConf.AfterCreate(&pipeline);
  936. pipeline.AddServiceTransformers();
  937. pipeline.AddParametersEvaluation(*FunctionRegistry_);
  938. pipeline.AddPreTypeAnnotation();
  939. pipeline.AddExpressionEvaluation(*FunctionRegistry_);
  940. pipeline.AddIOAnnotation();
  941. pipeline.AddTypeAnnotation(TIssuesIds::CORE_TYPE_ANN, true);
  942. pipeline.AddPostTypeAnnotation();
  943. pipelineConf.AfterTypeAnnotation(&pipeline);
  944. pipeline.AddOptimization();
  945. if (EnableRangeComputeFor_) {
  946. pipeline.Add(MakeExpandRangeComputeForTransformer(pipeline.GetTypeAnnotationContext()),
  947. "ExpandRangeComputeFor", TIssuesIds::CORE_EXEC);
  948. }
  949. pipeline.Add(CreatePlanInfoTransformer(*TypeCtx_), "PlanInfo");
  950. pipelineConf.AfterOptimize(&pipeline);
  951. Transformer_ = pipeline.Build();
  952. TFuture<void> openSession = OpenSession(username);
  953. if (!openSession.Initialized())
  954. return NThreading::MakeFuture<TStatus>(IGraphTransformer::TStatus::Error);
  955. SaveExprRoot();
  956. return openSession.Apply([this](const TFuture<void>& f) {
  957. YQL_LOG_CTX_ROOT_SESSION_SCOPE(GetSessionId());
  958. try {
  959. f.GetValue();
  960. } catch (const std::exception& e) {
  961. YQL_LOG(ERROR) << "OpenSession error: " << e.what();
  962. ExprCtx_->IssueManager.RaiseIssue(ExceptionToIssue(e));
  963. return NThreading::MakeFuture<TStatus>(IGraphTransformer::TStatus::Error);
  964. }
  965. return AsyncTransformWithFallback(false);
  966. });
  967. }
  968. TProgram::TStatus TProgram::LineageWithConfig(
  969. const TString& username, const IPipelineConfigurator& pipelineConf)
  970. {
  971. YQL_PROFILE_FUNC(TRACE);
  972. auto m = &TProgram::LineageAsyncWithConfig;
  973. return SyncExecution(this, m, username, pipelineConf);
  974. }
  975. TProgram::TFutureStatus TProgram::LineageAsyncWithConfig(
  976. const TString& username, const IPipelineConfigurator& pipelineConf)
  977. {
  978. if (!ProvideAnnotationContext(username)->Initialize(*ExprCtx_) || !CollectUsedClusters()) {
  979. return NThreading::MakeFuture<TStatus>(IGraphTransformer::TStatus::Error);
  980. }
  981. TypeCtx_->IsReadOnly = true;
  982. Y_ENSURE(ExprRoot_, "Program not compiled yet");
  983. TTransformationPipeline pipeline(TypeCtx_);
  984. pipelineConf.AfterCreate(&pipeline);
  985. pipeline.AddServiceTransformers();
  986. pipeline.AddParametersEvaluation(*FunctionRegistry_);
  987. pipeline.AddPreTypeAnnotation();
  988. pipeline.AddExpressionEvaluation(*FunctionRegistry_);
  989. pipeline.AddIOAnnotation();
  990. pipeline.AddTypeAnnotation(TIssuesIds::CORE_TYPE_ANN, true);
  991. pipeline.AddPostTypeAnnotation();
  992. pipelineConf.AfterTypeAnnotation(&pipeline);
  993. pipeline.AddLineageOptimization(LineageStr_);
  994. Transformer_ = pipeline.Build();
  995. TFuture<void> openSession = OpenSession(username);
  996. if (!openSession.Initialized())
  997. return NThreading::MakeFuture<TStatus>(IGraphTransformer::TStatus::Error);
  998. SaveExprRoot();
  999. return openSession.Apply([this](const TFuture<void>& f) {
  1000. YQL_LOG_CTX_ROOT_SESSION_SCOPE(GetSessionId());
  1001. try {
  1002. f.GetValue();
  1003. } catch (const std::exception& e) {
  1004. YQL_LOG(ERROR) << "OpenSession error: " << e.what();
  1005. ExprCtx_->IssueManager.RaiseIssue(ExceptionToIssue(e));
  1006. return NThreading::MakeFuture<TStatus>(IGraphTransformer::TStatus::Error);
  1007. }
  1008. return AsyncTransformWithFallback(false);
  1009. });
  1010. }
  1011. TProgram::TStatus TProgram::Run(
  1012. const TString& username,
  1013. IOutputStream* traceOut,
  1014. IOutputStream* tracePlan,
  1015. IOutputStream* exprOut,
  1016. bool withTypes)
  1017. {
  1018. YQL_PROFILE_FUNC(TRACE);
  1019. auto m = &TProgram::RunAsync;
  1020. return SyncExecution(this, m, username, traceOut, tracePlan, exprOut, withTypes);
  1021. }
  1022. TProgram::TFutureStatus TProgram::RunAsync(
  1023. const TString& username,
  1024. IOutputStream* traceOut,
  1025. IOutputStream* tracePlan,
  1026. IOutputStream* exprOut,
  1027. bool withTypes)
  1028. {
  1029. if (!ProvideAnnotationContext(username)->Initialize(*ExprCtx_) || !CollectUsedClusters()) {
  1030. return NThreading::MakeFuture<TStatus>(IGraphTransformer::TStatus::Error);
  1031. }
  1032. TypeCtx_->IsReadOnly = (HiddenMode_ != EHiddenMode::Disable);
  1033. TVector<TDataProviderInfo> dataProviders;
  1034. with_lock (DataProvidersLock_) {
  1035. dataProviders = DataProviders_;
  1036. }
  1037. for (const auto& dp : DataProviders_) {
  1038. if (!dp.RemoteClusterProvider || !dp.RemoteRun) {
  1039. continue;
  1040. }
  1041. if (auto cluster = dp.RemoteClusterProvider(UsedClusters_, UsedProviders_, SourceSyntax_)) {
  1042. return dp.RemoteRun(*cluster, SourceSyntax_, SourceCode_,
  1043. OutputFormat_, ResultFormat_, nullptr,
  1044. TypeCtx_, ExprRoot_, *ExprCtx_, ExternalQueryAst_, ExternalQueryPlan_, ExternalDiagnostics_,
  1045. ResultProviderConfig_);
  1046. }
  1047. }
  1048. Y_ENSURE(ExprRoot_, "Program not compiled yet");
  1049. ExprStream_ = exprOut;
  1050. PlanStream_ = tracePlan;
  1051. TTransformationPipeline pipeline(TypeCtx_);
  1052. pipeline.AddServiceTransformers();
  1053. pipeline.AddParametersEvaluation(*FunctionRegistry_);
  1054. pipeline.AddPreTypeAnnotation();
  1055. pipeline.AddExpressionEvaluation(*FunctionRegistry_);
  1056. pipeline.AddIOAnnotation();
  1057. pipeline.AddTypeAnnotation(TIssuesIds::CORE_TYPE_ANN, true);
  1058. pipeline.AddPostTypeAnnotation();
  1059. pipeline.Add(TExprOutputTransformer::Sync(ExprRoot_, traceOut), "ExprOutput");
  1060. pipeline.AddOptimization();
  1061. if (EnableRangeComputeFor_) {
  1062. pipeline.Add(MakeExpandRangeComputeForTransformer(pipeline.GetTypeAnnotationContext()),
  1063. "ExpandRangeComputeFor", TIssuesIds::CORE_EXEC);
  1064. }
  1065. pipeline.Add(TExprOutputTransformer::Sync(ExprRoot_, exprOut, withTypes), "AstOutput");
  1066. pipeline.Add(TPlanOutputTransformer::Sync(tracePlan, GetPlanBuilder(), OutputFormat_), "PlanOutput");
  1067. pipeline.AddRun(ProgressWriter_);
  1068. Transformer_ = pipeline.Build();
  1069. TFuture<void> openSession = OpenSession(username);
  1070. if (!openSession.Initialized()) {
  1071. return NThreading::MakeFuture<TStatus>(IGraphTransformer::TStatus::Error);
  1072. }
  1073. SaveExprRoot();
  1074. return openSession.Apply([this](const TFuture<void>& f) {
  1075. YQL_LOG_CTX_ROOT_SESSION_SCOPE(GetSessionId());
  1076. try {
  1077. f.GetValue();
  1078. } catch (const std::exception& e) {
  1079. YQL_LOG(ERROR) << "OpenSession error: " << e.what();
  1080. ExprCtx_->IssueManager.RaiseIssue(ExceptionToIssue(e));
  1081. return NThreading::MakeFuture<TStatus>(IGraphTransformer::TStatus::Error);
  1082. }
  1083. return AsyncTransformWithFallback(false);
  1084. });
  1085. }
  1086. TProgram::TStatus TProgram::RunWithConfig(
  1087. const TString& username, const IPipelineConfigurator& pipelineConf)
  1088. {
  1089. YQL_PROFILE_FUNC(TRACE);
  1090. auto m = &TProgram::RunAsyncWithConfig;
  1091. return SyncExecution(this, m, username, pipelineConf);
  1092. }
  1093. TProgram::TFutureStatus TProgram::RunAsyncWithConfig(
  1094. const TString& username, const IPipelineConfigurator& pipelineConf)
  1095. {
  1096. if (!ProvideAnnotationContext(username)->Initialize(*ExprCtx_) || !CollectUsedClusters()) {
  1097. return NThreading::MakeFuture<TStatus>(IGraphTransformer::TStatus::Error);
  1098. }
  1099. TypeCtx_->IsReadOnly = (HiddenMode_ != EHiddenMode::Disable);
  1100. TVector<TDataProviderInfo> dataProviders;
  1101. with_lock (DataProvidersLock_) {
  1102. dataProviders = DataProviders_;
  1103. }
  1104. for (const auto& dp : DataProviders_) {
  1105. if (!dp.RemoteClusterProvider || !dp.RemoteRun) {
  1106. continue;
  1107. }
  1108. if (auto cluster = dp.RemoteClusterProvider(UsedClusters_, UsedProviders_, SourceSyntax_)) {
  1109. return dp.RemoteRun(*cluster, SourceSyntax_, SourceCode_,
  1110. OutputFormat_, ResultFormat_, &pipelineConf,
  1111. TypeCtx_, ExprRoot_, *ExprCtx_, ExternalQueryAst_, ExternalQueryPlan_, ExternalDiagnostics_,
  1112. ResultProviderConfig_);
  1113. }
  1114. }
  1115. Y_ENSURE(ExprRoot_, "Program not compiled yet");
  1116. TTransformationPipeline pipeline(TypeCtx_);
  1117. pipelineConf.AfterCreate(&pipeline);
  1118. pipeline.AddServiceTransformers();
  1119. pipeline.AddParametersEvaluation(*FunctionRegistry_);
  1120. pipeline.AddPreTypeAnnotation();
  1121. pipeline.AddExpressionEvaluation(*FunctionRegistry_);
  1122. pipeline.AddIOAnnotation();
  1123. pipeline.AddTypeAnnotation(TIssuesIds::CORE_TYPE_ANN, true);
  1124. pipeline.AddPostTypeAnnotation();
  1125. pipelineConf.AfterTypeAnnotation(&pipeline);
  1126. pipeline.AddOptimization();
  1127. if (EnableRangeComputeFor_) {
  1128. pipeline.Add(MakeExpandRangeComputeForTransformer(pipeline.GetTypeAnnotationContext()),
  1129. "ExpandRangeComputeFor", TIssuesIds::CORE_EXEC);
  1130. }
  1131. pipelineConf.AfterOptimize(&pipeline);
  1132. pipeline.AddRun(ProgressWriter_);
  1133. Transformer_ = pipeline.Build();
  1134. TFuture<void> openSession = OpenSession(username);
  1135. if (!openSession.Initialized()) {
  1136. return NThreading::MakeFuture<TStatus>(IGraphTransformer::TStatus::Error);
  1137. }
  1138. SaveExprRoot();
  1139. return openSession.Apply([this](const TFuture<void>& f) {
  1140. YQL_LOG_CTX_ROOT_SESSION_SCOPE(GetSessionId());
  1141. try {
  1142. f.GetValue();
  1143. } catch (const std::exception& e) {
  1144. YQL_LOG(ERROR) << "OpenSession error: " << e.what();
  1145. ExprCtx_->IssueManager.RaiseIssue(ExceptionToIssue(e));
  1146. return NThreading::MakeFuture<TStatus>(IGraphTransformer::TStatus::Error);
  1147. }
  1148. return AsyncTransformWithFallback(false);
  1149. });
  1150. }
  1151. void TProgram::SaveExprRoot() {
  1152. TNodeOnNodeOwnedMap deepClones;
  1153. SavedExprRoot_ = ExprCtx_->DeepCopy(*ExprRoot_, *ExprCtx_, deepClones, /*internStrings*/false, /*copyTypes*/true, /*copyResult*/false, {});
  1154. }
  1155. std::optional<bool> TProgram::CheckFallbackIssues(const TIssues& issues) {
  1156. auto isFallback = std::optional<bool>();
  1157. auto checkIssue = [&](const TIssue& issue) {
  1158. if (issue.GetCode() == TIssuesIds::DQ_GATEWAY_ERROR) {
  1159. YQL_LOG(DEBUG) << "Gateway Error " << issue;
  1160. isFallback = false;
  1161. } else if (issue.GetCode() == TIssuesIds::DQ_GATEWAY_NEED_FALLBACK_ERROR) {
  1162. YQL_LOG(DEBUG) << "Gateway Fallback Error " << issue;
  1163. isFallback = true;
  1164. } else if (issue.GetCode() == TIssuesIds::DQ_OPTIMIZE_ERROR) {
  1165. YQL_LOG(DEBUG) << "Optimize Error " << issue;
  1166. isFallback = true;
  1167. } else if (issue.GetCode() >= TIssuesIds::YT_ACCESS_DENIED &&
  1168. issue.GetCode() <= TIssuesIds::YT_FOLDER_INPUT_IS_NOT_A_FOLDER &&
  1169. (issue.GetSeverity() == TSeverityIds::S_ERROR ||
  1170. issue.GetSeverity() == TSeverityIds::S_FATAL)) {
  1171. YQL_LOG(DEBUG) << "Yt Error " << issue;
  1172. isFallback = false;
  1173. }
  1174. };
  1175. std::function<void(const TIssuePtr& issue)> recursiveCheck = [&](const TIssuePtr& issue) {
  1176. checkIssue(*issue);
  1177. for (const auto& subissue : issue->GetSubIssues()) {
  1178. recursiveCheck(subissue);
  1179. }
  1180. };
  1181. for (const auto& issue : issues) {
  1182. checkIssue(issue);
  1183. // check subissues
  1184. for (const auto& subissue : issue.GetSubIssues()) {
  1185. recursiveCheck(subissue);
  1186. }
  1187. }
  1188. return isFallback;
  1189. }
  1190. TFuture<IGraphTransformer::TStatus> TProgram::AsyncTransformWithFallback(bool applyAsyncChanges)
  1191. {
  1192. return AsyncTransform(*Transformer_, ExprRoot_, *ExprCtx_, applyAsyncChanges).Apply([this](const TFuture<IGraphTransformer::TStatus>& res) {
  1193. auto status = res.GetValueSync();
  1194. if (status == IGraphTransformer::TStatus::Error
  1195. && !TypeCtx_->ForceDq
  1196. && SavedExprRoot_
  1197. && TypeCtx_->DqCaptured
  1198. && TypeCtx_->DqFallbackPolicy != EFallbackPolicy::Never)
  1199. {
  1200. auto issues = ExprCtx_->IssueManager.GetIssues();
  1201. bool isFallback = CheckFallbackIssues(issues).value_or(true);
  1202. if (!isFallback && TypeCtx_->DqFallbackPolicy != EFallbackPolicy::Always) {
  1203. // unrecoverable error
  1204. return res;
  1205. }
  1206. ExprRoot_ = SavedExprRoot_;
  1207. SavedExprRoot_ = nullptr;
  1208. UserDataStorage_->SetUserDataTable(std::move(SavedUserDataTable_));
  1209. ExprCtx_->IssueManager.Reset();
  1210. YQL_LOG(DEBUG) << "Fallback, Issues: " << issues.ToString();
  1211. ExprCtx_->Reset();
  1212. Transformer_->Rewind();
  1213. for (auto sink : TypeCtx_->DataSinks) {
  1214. sink->Reset();
  1215. }
  1216. for (auto source : TypeCtx_->DataSources) {
  1217. source->Reset();
  1218. }
  1219. TypeCtx_->Reset();
  1220. try {
  1221. CleanupLastSession().GetValueSync();
  1222. } catch (...) {
  1223. ExprCtx_->IssueManager.RaiseIssue(TIssue({}, "Failed to cleanup session: " + CurrentExceptionMessage()));
  1224. return NThreading::MakeFuture<IGraphTransformer::TStatus>(IGraphTransformer::TStatus::Error);
  1225. }
  1226. std::function<void(const TIssuePtr& issue)> toInfo = [&](const TIssuePtr& issue) {
  1227. if (issue->Severity == TSeverityIds::S_ERROR
  1228. || issue->Severity == TSeverityIds::S_FATAL
  1229. || issue->Severity == TSeverityIds::S_WARNING)
  1230. {
  1231. issue->Severity = TSeverityIds::S_INFO;
  1232. }
  1233. for (const auto& subissue : issue->GetSubIssues()) {
  1234. toInfo(subissue);
  1235. }
  1236. };
  1237. TIssue info("DQ cannot execute the query");
  1238. info.Severity = TSeverityIds::S_INFO;
  1239. for (auto& issue : issues) {
  1240. TIssuePtr newIssue = new TIssue(issue);
  1241. if (newIssue->Severity == TSeverityIds::S_ERROR
  1242. || issue.Severity == TSeverityIds::S_FATAL
  1243. || issue.Severity == TSeverityIds::S_WARNING)
  1244. {
  1245. newIssue->Severity = TSeverityIds::S_INFO;
  1246. }
  1247. for (auto& subissue : newIssue->GetSubIssues()) {
  1248. toInfo(subissue);
  1249. }
  1250. info.AddSubIssue(newIssue);
  1251. }
  1252. ExprCtx_->IssueManager.AddIssues({info});
  1253. ++FallbackCounter_;
  1254. // don't execute recapture again
  1255. ExprCtx_->Step.Done(TExprStep::Recapture);
  1256. AbortHidden_();
  1257. return AsyncTransformWithFallback(false);
  1258. }
  1259. if (status == IGraphTransformer::TStatus::Error && (TypeCtx_->DqFallbackPolicy == EFallbackPolicy::Never || TypeCtx_->ForceDq)) {
  1260. YQL_LOG(INFO) << "Fallback skipped due to per query policy";
  1261. }
  1262. return res;
  1263. });
  1264. }
  1265. TMaybe<TString> TProgram::GetQueryAst(TMaybe<size_t> memoryLimit) {
  1266. if (ExternalQueryAst_) {
  1267. return ExternalQueryAst_;
  1268. }
  1269. TStringStream astStream;
  1270. astStream.Reserve(DEFAULT_AST_BUF_SIZE);
  1271. if (ExprRoot_) {
  1272. std::unique_ptr<IAllocator> limitingAllocator;
  1273. TConvertToAstSettings settings;
  1274. settings.AnnotationFlags = TExprAnnotationFlags::None;
  1275. settings.RefAtoms = true;
  1276. settings.Allocator = TDefaultAllocator::Instance();
  1277. if (memoryLimit) {
  1278. limitingAllocator = MakeLimitingAllocator(*memoryLimit, TDefaultAllocator::Instance());
  1279. settings.Allocator = limitingAllocator.get();
  1280. }
  1281. auto ast = ConvertToAst(*ExprRoot_, *ExprCtx_, settings);
  1282. ast.Root->PrettyPrintTo(astStream, TAstPrintFlags::ShortQuote | TAstPrintFlags::PerLine);
  1283. return astStream.Str();
  1284. } else if (AstRoot_) {
  1285. AstRoot_->PrettyPrintTo(astStream, TAstPrintFlags::ShortQuote | TAstPrintFlags::PerLine);
  1286. return astStream.Str();
  1287. }
  1288. return Nothing();
  1289. }
  1290. TMaybe<TString> TProgram::GetQueryPlan(const TPlanSettings& settings) {
  1291. if (ExternalQueryPlan_) {
  1292. return ExternalQueryPlan_;
  1293. }
  1294. if (ExprRoot_) {
  1295. TStringStream planStream;
  1296. planStream.Reserve(DEFAULT_PLAN_BUF_SIZE);
  1297. NYson::TYsonWriter writer(&planStream, OutputFormat_);
  1298. PlanBuilder_->WritePlan(writer, ExprRoot_, settings);
  1299. return planStream.Str();
  1300. }
  1301. return Nothing();
  1302. }
  1303. TMaybe<TString> TProgram::GetDiagnostics() {
  1304. if (ExternalDiagnostics_) {
  1305. return ExternalDiagnostics_;
  1306. }
  1307. if (!TypeCtx_ || !TypeCtx_->Diagnostics) {
  1308. return Nothing();
  1309. }
  1310. if (!Transformer_) {
  1311. return Nothing();
  1312. }
  1313. TStringStream out;
  1314. NYson::TYsonWriter writer(&out, DiagnosticFormat_.GetOrElse(ResultFormat_));
  1315. writer.OnBeginMap();
  1316. writer.OnKeyedItem("Write");
  1317. writer.OnBeginList();
  1318. writer.OnListItem();
  1319. writer.OnBeginMap();
  1320. writer.OnKeyedItem("Data");
  1321. writer.OnBeginMap();
  1322. writer.OnKeyedItem("Diagnostics");
  1323. writer.OnBeginMap();
  1324. writer.OnKeyedItem("TransformStats");
  1325. auto transformStats = Transformer_->GetStatistics();
  1326. NCommon::TransformerStatsToYson("", transformStats, writer);
  1327. for (auto& datasink : TypeCtx_->DataSinks) {
  1328. writer.OnKeyedItem(datasink->GetName());
  1329. if (!datasink->CollectDiagnostics(writer)) {
  1330. writer.OnEntity();
  1331. }
  1332. }
  1333. writer.OnEndMap();
  1334. writer.OnEndMap();
  1335. writer.OnEndMap();
  1336. writer.OnEndList();
  1337. writer.OnEndMap();
  1338. return out.Str();
  1339. }
  1340. IGraphTransformer::TStatistics TProgram::GetRawDiagnostics() {
  1341. return Transformer_ ? Transformer_->GetStatistics() : IGraphTransformer::TStatistics::NotPresent();
  1342. }
  1343. TMaybe<TString> TProgram::GetTasksInfo() {
  1344. if (!TypeCtx_) {
  1345. return Nothing();
  1346. }
  1347. bool hasTasks = false;
  1348. TStringStream out;
  1349. NYson::TYsonWriter writer(&out, ResultFormat_);
  1350. writer.OnBeginMap();
  1351. writer.OnKeyedItem("Write");
  1352. writer.OnBeginList();
  1353. writer.OnListItem();
  1354. writer.OnBeginMap();
  1355. writer.OnKeyedItem("Tasks");
  1356. writer.OnBeginList();
  1357. for (auto& datasink : TypeCtx_->DataSinks) {
  1358. hasTasks = hasTasks || datasink->GetTasksInfo(writer);
  1359. }
  1360. writer.OnEndList();
  1361. writer.OnEndMap();
  1362. writer.OnEndList();
  1363. writer.OnEndMap();
  1364. if (hasTasks) {
  1365. return out.Str();
  1366. } else {
  1367. return Nothing();
  1368. }
  1369. }
  1370. TMaybe<TString> TProgram::GetStatistics(bool totalOnly, THashMap<TString, TStringBuf> extraYsons) {
  1371. if (!TypeCtx_) {
  1372. return Nothing();
  1373. }
  1374. TStringStream out;
  1375. NYson::TYsonWriter writer(&out, OutputFormat_);
  1376. // Header
  1377. writer.OnBeginMap();
  1378. writer.OnKeyedItem("ExecutionStatistics");
  1379. writer.OnBeginMap();
  1380. // Providers
  1381. THashSet<TStringBuf> processed;
  1382. for (auto& datasink : TypeCtx_->DataSinks) {
  1383. TStringStream providerOut;
  1384. NYson::TYsonWriter providerWriter(&providerOut);
  1385. if (datasink->CollectStatistics(providerWriter, totalOnly)) {
  1386. writer.OnKeyedItem(datasink->GetName());
  1387. writer.OnRaw(providerOut.Str());
  1388. processed.insert(datasink->GetName());
  1389. }
  1390. }
  1391. for (auto& datasource : TypeCtx_->DataSources) {
  1392. if (processed.insert(datasource->GetName()).second) {
  1393. TStringStream providerOut;
  1394. NYson::TYsonWriter providerWriter(&providerOut);
  1395. if (datasource->CollectStatistics(providerWriter, totalOnly)) {
  1396. writer.OnKeyedItem(datasource->GetName());
  1397. writer.OnRaw(providerOut.Str());
  1398. }
  1399. }
  1400. }
  1401. auto rusage = TRusage::Get();
  1402. // System stats
  1403. writer.OnKeyedItem("system");
  1404. writer.OnBeginMap();
  1405. writer.OnKeyedItem("MaxRSS");
  1406. writer.OnBeginMap();
  1407. writer.OnKeyedItem("max");
  1408. writer.OnInt64Scalar(rusage.MaxRss);
  1409. writer.OnEndMap();
  1410. writer.OnKeyedItem("MajorPageFaults");
  1411. writer.OnBeginMap();
  1412. writer.OnKeyedItem("count");
  1413. writer.OnInt64Scalar(rusage.MajorPageFaults);
  1414. writer.OnEndMap();
  1415. if (FallbackCounter_) {
  1416. writer.OnKeyedItem("Fallback");
  1417. writer.OnBeginMap();
  1418. writer.OnKeyedItem("count");
  1419. writer.OnInt64Scalar(FallbackCounter_);
  1420. writer.OnEndMap();
  1421. }
  1422. writer.OnEndMap(); // system
  1423. if (TypeCtx_->Modules) {
  1424. writer.OnKeyedItem("moduleResolver");
  1425. writer.OnBeginMap();
  1426. TypeCtx_->Modules->WriteStatistics(writer);
  1427. writer.OnEndMap();
  1428. }
  1429. // extra
  1430. for (const auto &[k, extraYson] : extraYsons) {
  1431. writer.OnKeyedItem(k);
  1432. writer.OnRaw(extraYson);
  1433. }
  1434. // Footer
  1435. writer.OnEndMap();
  1436. writer.OnEndMap();
  1437. return out.Str();
  1438. }
  1439. TMaybe<TString> TProgram::GetDiscoveredData() {
  1440. if (!TypeCtx_) {
  1441. return Nothing();
  1442. }
  1443. TStringStream out;
  1444. NYson::TYsonWriter writer(&out, OutputFormat_);
  1445. writer.OnBeginMap();
  1446. for (auto& datasource: TypeCtx_->DataSources) {
  1447. TStringStream providerOut;
  1448. NYson::TYsonWriter providerWriter(&providerOut);
  1449. if (datasource->CollectDiscoveredData(providerWriter)) {
  1450. writer.OnKeyedItem(datasource->GetName());
  1451. writer.OnRaw(providerOut.Str());
  1452. }
  1453. }
  1454. writer.OnEndMap();
  1455. return out.Str();
  1456. }
  1457. TMaybe<TString> TProgram::GetLineage() {
  1458. return LineageStr_;
  1459. }
  1460. TProgram::TFutureStatus TProgram::ContinueAsync() {
  1461. YQL_LOG_CTX_ROOT_SESSION_SCOPE(GetSessionId());
  1462. return AsyncTransformWithFallback(true);
  1463. }
  1464. NThreading::TFuture<void> TProgram::Abort()
  1465. {
  1466. return CloseLastSession();
  1467. }
  1468. TIssues TProgram::Issues() const {
  1469. TIssues result;
  1470. if (ExprCtx_) {
  1471. result.AddIssues(ExprCtx_->IssueManager.GetIssues());
  1472. }
  1473. result.AddIssues(FinalIssues_);
  1474. CheckFatalIssues(result);
  1475. return result;
  1476. }
  1477. TIssues TProgram::CompletedIssues() const {
  1478. TIssues result;
  1479. if (ExprCtx_) {
  1480. result.AddIssues(ExprCtx_->IssueManager.GetCompletedIssues());
  1481. }
  1482. result.AddIssues(FinalIssues_);
  1483. CheckFatalIssues(result);
  1484. return result;
  1485. }
  1486. TIssue MakeNoBlocksInfoIssue(const TVector<TString>& names, bool isTypes) {
  1487. TIssue result;
  1488. TString msg = TStringBuilder() << "Most frequent " << (isTypes ? "types " : "callables ")
  1489. << "which do not support block mode: " << JoinRange(", ", names.begin(), names.end());
  1490. result.SetMessage(msg);
  1491. result.SetCode(isTypes ? TIssuesIds::CORE_TOP_UNSUPPORTED_BLOCK_TYPES : TIssuesIds::CORE_TOP_UNSUPPORTED_BLOCK_CALLABLES, TSeverityIds::S_INFO);
  1492. return result;
  1493. }
  1494. void TProgram::FinalizeIssues() {
  1495. FinalIssues_.Clear();
  1496. if (TypeCtx_) {
  1497. static const size_t topCount = 10;
  1498. auto noBlockTypes = TypeCtx_->GetTopNoBlocksTypes(topCount);
  1499. if (!noBlockTypes.empty()) {
  1500. FinalIssues_.AddIssue(MakeNoBlocksInfoIssue(noBlockTypes, true));
  1501. }
  1502. auto noBlockCallables = TypeCtx_->GetTopNoBlocksCallables(topCount);
  1503. if (!noBlockCallables.empty()) {
  1504. FinalIssues_.AddIssue(MakeNoBlocksInfoIssue(noBlockCallables, false));
  1505. }
  1506. }
  1507. }
  1508. NThreading::TFuture<void> TProgram::CleanupLastSession() {
  1509. YQL_LOG_CTX_ROOT_SESSION_SCOPE(GetSessionId());
  1510. TString sessionId = GetSessionId();
  1511. if (sessionId.empty()) {
  1512. return MakeFuture();
  1513. }
  1514. TVector<TDataProviderInfo> dataProviders;
  1515. with_lock (DataProvidersLock_) {
  1516. dataProviders = DataProviders_;
  1517. }
  1518. TVector<NThreading::TFuture<void>> cleanupFutures;
  1519. cleanupFutures.reserve(dataProviders.size());
  1520. for (const auto& dp : dataProviders) {
  1521. if (dp.CleanupSession) {
  1522. dp.CleanupSession(sessionId);
  1523. }
  1524. if (dp.CleanupSessionAsync) {
  1525. cleanupFutures.push_back(dp.CleanupSessionAsync(sessionId));
  1526. }
  1527. }
  1528. return NThreading::WaitExceptionOrAll(cleanupFutures);
  1529. }
  1530. NThreading::TFuture<void> TProgram::CloseLastSession() {
  1531. YQL_LOG_CTX_ROOT_SESSION_SCOPE(GetSessionId());
  1532. TVector<TDataProviderInfo> dataProviders;
  1533. with_lock (DataProvidersLock_) {
  1534. dataProviders = DataProviders_;
  1535. }
  1536. auto promise = NThreading::NewPromise<void>();
  1537. TString sessionId;
  1538. with_lock(SessionIdLock_) {
  1539. // post-condition: SessionId_ will be empty
  1540. sessionId = std::move(SessionId_);
  1541. if (sessionId.empty()) {
  1542. return CloseLastSessionFuture_;
  1543. }
  1544. CloseLastSessionFuture_ = promise.GetFuture();
  1545. }
  1546. TVector<NThreading::TFuture<void>> closeFutures;
  1547. closeFutures.reserve(dataProviders.size());
  1548. for (const auto& dp : dataProviders) {
  1549. if (dp.CloseSession) {
  1550. dp.CloseSession(sessionId);
  1551. }
  1552. if (dp.CloseSessionAsync) {
  1553. closeFutures.push_back(dp.CloseSessionAsync(sessionId));
  1554. }
  1555. }
  1556. return NThreading::WaitExceptionOrAll(closeFutures)
  1557. .Apply([promise = std::move(promise)](const NThreading::TFuture<void>&) mutable {
  1558. promise.SetValue();
  1559. });
  1560. }
  1561. TString TProgram::ResultsAsString() const {
  1562. if (!ResultProviderConfig_)
  1563. return "";
  1564. TStringStream resultOut;
  1565. NYson::TYsonWriter yson(&resultOut, OutputFormat_);
  1566. yson.OnBeginList();
  1567. for (const auto& result: Results()) {
  1568. yson.OnListItem();
  1569. yson.OnRaw(result);
  1570. }
  1571. yson.OnEndList();
  1572. return resultOut.Str();
  1573. }
  1574. TTypeAnnotationContextPtr TProgram::BuildTypeAnnotationContext(const TString& username) {
  1575. auto typeAnnotationContext = MakeIntrusive<TTypeAnnotationContext>();
  1576. typeAnnotationContext->UserDataStorage = UserDataStorage_;
  1577. typeAnnotationContext->Credentials = Credentials_;
  1578. typeAnnotationContext->Modules = Modules_;
  1579. typeAnnotationContext->UrlListerManager = UrlListerManager_;
  1580. typeAnnotationContext->UdfResolver = UdfResolver_;
  1581. typeAnnotationContext->UdfIndex = UdfIndex_;
  1582. typeAnnotationContext->UdfIndexPackageSet = UdfIndexPackageSet_;
  1583. typeAnnotationContext->RandomProvider = RandomProvider_;
  1584. typeAnnotationContext->TimeProvider = TimeProvider_;
  1585. if (DiagnosticFormat_) {
  1586. typeAnnotationContext->Diagnostics = true;
  1587. }
  1588. typeAnnotationContext->ArrowResolver = ArrowResolver_;
  1589. typeAnnotationContext->FileStorage = FileStorage_;
  1590. typeAnnotationContext->QContext = QContext_;
  1591. typeAnnotationContext->HiddenMode = HiddenMode_;
  1592. if (UdfIndex_ && UdfIndexPackageSet_) {
  1593. // setup default versions at the beginning
  1594. // could be overridden by pragma later
  1595. UdfIndexPackageSet_->AddResourcesTo(UdfIndex_);
  1596. }
  1597. PlanBuilder_ = CreatePlanBuilder(*typeAnnotationContext);
  1598. THashSet<TString> providerNames;
  1599. TVector<TString> fullResultDataSinks;
  1600. TVector<std::function<TString(const TString&, const TString&)>> tokenResolvers;
  1601. for (const auto& dpi : DataProvidersInit_) {
  1602. auto dp = dpi(
  1603. username,
  1604. SessionId_,
  1605. GatewaysConfig_,
  1606. FunctionRegistry_,
  1607. RandomProvider_,
  1608. typeAnnotationContext,
  1609. ProgressWriter_,
  1610. OperationOptions_,
  1611. AbortHidden_,
  1612. QContext_
  1613. );
  1614. if (HiddenMode_ != EHiddenMode::Disable && !dp.SupportsHidden) {
  1615. continue;
  1616. }
  1617. providerNames.insert(dp.Names.begin(), dp.Names.end());
  1618. with_lock (DataProvidersLock_) {
  1619. DataProviders_.emplace_back(dp);
  1620. }
  1621. if (dp.Source) {
  1622. typeAnnotationContext->AddDataSource(dp.Names, dp.Source);
  1623. }
  1624. if (dp.Sink) {
  1625. typeAnnotationContext->AddDataSink(dp.Names, dp.Sink);
  1626. }
  1627. if (dp.TokenResolver) {
  1628. tokenResolvers.push_back(dp.TokenResolver);
  1629. }
  1630. if (dp.SupportFullResultDataSink) {
  1631. fullResultDataSinks.insert(fullResultDataSinks.end(), dp.Names.begin(), dp.Names.end());
  1632. }
  1633. }
  1634. TVector<TString> resultProviderDataSources;
  1635. if (providerNames.contains(YtProviderName)) {
  1636. resultProviderDataSources.push_back(TString(YtProviderName));
  1637. }
  1638. if (providerNames.contains(KikimrProviderName)) {
  1639. resultProviderDataSources.push_back(TString(KikimrProviderName));
  1640. }
  1641. if (providerNames.contains(RtmrProviderName)) {
  1642. resultProviderDataSources.push_back(TString(RtmrProviderName));
  1643. }
  1644. if (providerNames.contains(PqProviderName)) {
  1645. resultProviderDataSources.push_back(TString(PqProviderName));
  1646. }
  1647. if (providerNames.contains(DqProviderName)) {
  1648. resultProviderDataSources.push_back(TString(DqProviderName));
  1649. }
  1650. if (providerNames.contains(PureProviderName)) {
  1651. resultProviderDataSources.push_back(TString(PureProviderName));
  1652. }
  1653. if (!resultProviderDataSources.empty())
  1654. {
  1655. auto resultFormat = ResultFormat_;
  1656. auto writerFactory = [resultFormat] () { return CreateYsonResultWriter(resultFormat); };
  1657. ResultProviderConfig_ = MakeIntrusive<TResultProviderConfig>(*typeAnnotationContext,
  1658. *FunctionRegistry_, ResultType_, ToString((ui32)resultFormat), writerFactory);
  1659. ResultProviderConfig_->SupportsResultPosition = SupportsResultPosition_;
  1660. auto resultProvider = CreateResultProvider(ResultProviderConfig_);
  1661. typeAnnotationContext->AddDataSink(ResultProviderName, resultProvider);
  1662. typeAnnotationContext->AvailablePureResultDataSources = resultProviderDataSources;
  1663. }
  1664. if (!fullResultDataSinks.empty()) {
  1665. typeAnnotationContext->FullResultDataSink = fullResultDataSinks.front();
  1666. }
  1667. {
  1668. auto configProvider = CreateConfigProvider(*typeAnnotationContext, GatewaysConfig_, username);
  1669. typeAnnotationContext->AddDataSource(ConfigProviderName, configProvider);
  1670. }
  1671. tokenResolvers.push_back(BuildDefaultTokenResolver(typeAnnotationContext->Credentials));
  1672. typeAnnotationContext->UserDataStorage->SetTokenResolver(BuildCompositeTokenResolver(std::move(tokenResolvers)));
  1673. return typeAnnotationContext;
  1674. }
  1675. TFuture<void> TProgram::OpenSession(const TString& username)
  1676. {
  1677. TVector<TFuture<void>> openFutures;
  1678. with_lock (DataProvidersLock_) {
  1679. for (const auto& dp : DataProviders_) {
  1680. if (dp.OpenSession) {
  1681. auto future = dp.OpenSession(SessionId_, username, ProgressWriter_, OperationOptions_,
  1682. RandomProvider_, TimeProvider_);
  1683. openFutures.push_back(future);
  1684. }
  1685. }
  1686. }
  1687. return WaitExceptionOrAll(openFutures);
  1688. }
  1689. void TProgram::Print(IOutputStream* exprOut, IOutputStream* planOut, bool cleanPlan) {
  1690. TVector<TTransformStage> printTransformers;
  1691. const auto issueCode = TIssuesIds::DEFAULT_ERROR;
  1692. if (exprOut) {
  1693. printTransformers.push_back(TTransformStage(
  1694. TExprOutputTransformer::Sync(ExprRoot_, exprOut),
  1695. "ExprOutput",
  1696. issueCode));
  1697. }
  1698. if (planOut) {
  1699. if (cleanPlan) {
  1700. GetPlanBuilder().Clear();
  1701. }
  1702. printTransformers.push_back(TTransformStage(
  1703. TPlanOutputTransformer::Sync(planOut, GetPlanBuilder(), OutputFormat_),
  1704. "PlanOutput",
  1705. issueCode));
  1706. }
  1707. auto compositeTransformer = CreateCompositeGraphTransformer(printTransformers, false);
  1708. InstantTransform(*compositeTransformer, ExprRoot_, *ExprCtx_);
  1709. }
  1710. bool TProgram::HasActiveProcesses() {
  1711. with_lock (DataProvidersLock_) {
  1712. for (const auto& dp : DataProviders_) {
  1713. if (dp.HasActiveProcesses && dp.HasActiveProcesses()) {
  1714. return true;
  1715. }
  1716. }
  1717. }
  1718. return false;
  1719. }
  1720. bool TProgram::NeedWaitForActiveProcesses() {
  1721. with_lock (DataProvidersLock_) {
  1722. for (const auto& dp : DataProviders_) {
  1723. if (dp.HasActiveProcesses && dp.HasActiveProcesses() && dp.WaitForActiveProcesses) {
  1724. return true;
  1725. }
  1726. }
  1727. }
  1728. return false;
  1729. }
  1730. } // namespace NYql