yql_execution.cpp 36 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737738739740741742743744745746747748749750751752753754755756757758759760761762763764765766767768769770771772773774775776777778779780781782783784785786787788789790791792793794795796797798799800801802803804805806807808809810811812813814815816817818819820821822823824825826827828829830831832833834835836837838839840841842843844845846847848849850851852853854855856857858859860861862863864865866867868869870871872873874875876877878879880881882883884885886887888889890891892893894895896897898899900901902903904905906907908909910911912913914915916917918919920921922923924925926927928929930931932933934935936937938939940941942943944945946947948949950951952953954955956957958959960961962963
  1. #include "yql_execution.h"
  2. #include "yql_expr_optimize.h"
  3. #include "yql_opt_proposed_by_data.h"
  4. #include <yql/essentials/utils/log/log.h>
  5. #include <yql/essentials/utils/yql_panic.h>
  6. #include <util/string/builder.h>
  7. #include <util/string/join.h>
  8. #include <util/system/env.h>
  9. #include <util/generic/queue.h>
  10. namespace NYql {
  11. namespace {
  12. const bool RewriteSanityCheck = false;
  13. class TExecutionTransformer : public TGraphTransformerBase {
  14. public:
  15. struct TState : public TThrRefBase {
  16. TAdaptiveLock Lock;
  17. struct TItem : public TIntrusiveListItem<TItem> {
  18. TExprNode* Node = nullptr;
  19. IDataProvider* DataProvider = nullptr;
  20. NThreading::TFuture<void> Future;
  21. };
  22. using TQueueType = TIntrusiveListWithAutoDelete<TState::TItem, TDelete>;
  23. TQueueType Completed;
  24. TQueueType Inflight;
  25. NThreading::TPromise<void> Promise;
  26. bool HasResult = false;
  27. };
  28. using TStatePtr = TIntrusivePtr<TState>;
  29. TExecutionTransformer(TTypeAnnotationContext& types,
  30. TOperationProgressWriter writer,
  31. bool withFinalize)
  32. : Types(types)
  33. , Writer(writer)
  34. , WithFinalize(withFinalize)
  35. , DeterministicMode(GetEnv("YQL_DETERMINISTIC_MODE"))
  36. {
  37. Rewind();
  38. }
  39. TStatus DoTransform(TExprNode::TPtr input, TExprNode::TPtr& output, TExprContext& ctx) final {
  40. if (FinalizingTransformer) {
  41. YQL_CLOG(INFO, CoreExecution) << "FinalizingTransformer, root #" << input->UniqueId();
  42. auto status = FinalizingTransformer->Transform(input, output, ctx);
  43. YQL_CLOG(INFO, CoreExecution) << "FinalizingTransformer done, output #" << output->UniqueId() << ", status: " << status;
  44. return status;
  45. }
  46. YQL_CLOG(INFO, CoreExecution) << "Begin, root #" << input->UniqueId();
  47. output = input;
  48. if (RewriteSanityCheck) {
  49. VisitExpr(input, [&](const TExprNode::TPtr& localInput) {
  50. if (NewNodes.cend() != NewNodes.find(localInput.Get())) {
  51. Cerr << "found old node: #" << localInput->UniqueId() << "\n" << input->Dump();
  52. YQL_ENSURE(false);
  53. }
  54. return true;
  55. });
  56. }
  57. auto status = CollectUnusedNodes(*input, ctx);
  58. YQL_CLOG(INFO, CoreExecution) << "Collect unused nodes for root #" << input->UniqueId() << ", status: " << status;
  59. if (status != TStatus::Ok) {
  60. return status;
  61. }
  62. status = status.Combine(ExecuteNode(input, output, ctx, 0));
  63. for (auto node: FreshPendingNodes) {
  64. if (TExprNode::EState::ExecutionPending == node->GetState()) {
  65. node->SetState(TExprNode::EState::ConstrComplete);
  66. }
  67. }
  68. FreshPendingNodes.clear();
  69. if (!ReplaceNewNodes(output, ctx)) {
  70. return TStatus::Error;
  71. }
  72. YQL_CLOG(INFO, CoreExecution) << "Finish, output #" << output->UniqueId() << ", status: " << status;
  73. if (status != TStatus::Ok || !WithFinalize) {
  74. return status;
  75. }
  76. YQL_CLOG(INFO, CoreExecution) << "Creating finalizing transformer, output #" << output->UniqueId();
  77. FinalizingTransformer = CreateCompositeFinalizingTransformer(Types);
  78. return FinalizingTransformer->Transform(input, output, ctx);
  79. }
  80. NThreading::TFuture<void> DoGetAsyncFuture(const TExprNode& input) final {
  81. return FinalizingTransformer ?
  82. FinalizingTransformer->GetAsyncFuture(input) :
  83. State->Promise.GetFuture();
  84. }
  85. TStatus DoApplyAsyncChanges(TExprNode::TPtr input, TExprNode::TPtr& output, TExprContext& ctx) final {
  86. if (FinalizingTransformer) {
  87. return FinalizingTransformer->ApplyAsyncChanges(input, output, ctx);
  88. }
  89. output = input;
  90. TStatus combinedStatus = TStatus::Ok;
  91. TState::TQueueType completed;
  92. auto newPromise = NThreading::NewPromise();
  93. {
  94. TGuard<TAdaptiveLock> guard(State->Lock);
  95. completed.Swap(State->Completed);
  96. State->Promise.Swap(newPromise);
  97. State->HasResult = false;
  98. }
  99. for (auto& item : completed) {
  100. auto collectedIt = CollectingNodes.find(item.Node);
  101. if (collectedIt != CollectingNodes.end()) {
  102. YQL_CLOG(INFO, CoreExecution) << "Completed async cleanup for node #" << item.Node->UniqueId();
  103. TExprNode::TPtr callableOutput;
  104. auto status = item.DataProvider->GetTrackableNodeProcessor().GetCleanupTransformer().ApplyAsyncChanges(item.Node, callableOutput, ctx);
  105. combinedStatus = combinedStatus.Combine(status);
  106. CollectingNodes.erase(collectedIt);
  107. continue;
  108. }
  109. YQL_CLOG(INFO, CoreExecution) << "Completed async execution for node #" << item.Node->UniqueId();
  110. auto asyncIt = AsyncNodes.find(item.Node);
  111. YQL_ENSURE(asyncIt != AsyncNodes.end());
  112. TExprNode::TPtr callableOutput;
  113. auto status = item.DataProvider->GetCallableExecutionTransformer().ApplyAsyncChanges(item.Node, callableOutput, ctx);
  114. Y_ABORT_UNLESS(callableOutput);
  115. YQL_ENSURE(status != TStatus::Async);
  116. combinedStatus = combinedStatus.Combine(status);
  117. if (status.Level == TStatus::Error) {
  118. item.Node->SetState(TExprNode::EState::Error);
  119. } else if (status.Level == TStatus::Repeat) {
  120. if (callableOutput != item.Node) {
  121. YQL_CLOG(INFO, CoreExecution) << "Rewrite node #" << item.Node->UniqueId() << " to #" << callableOutput->UniqueId()
  122. << " in ApplyAsyncChanges()";
  123. NewNodes[item.Node] = callableOutput;
  124. combinedStatus = combinedStatus.Combine(TStatus(TStatus::Repeat, true));
  125. FinishNode(item.DataProvider->GetName(), *item.Node, *callableOutput);
  126. }
  127. }
  128. if (callableOutput == item.Node) {
  129. YQL_CLOG(INFO, CoreExecution) << "State is " << item.Node->GetState()
  130. << " after apply async changes for node #" << item.Node->UniqueId();
  131. }
  132. if (item.Node->GetState() == TExprNode::EState::ExecutionComplete ||
  133. item.Node->GetState() == TExprNode::EState::Error)
  134. {
  135. FinishNode(item.DataProvider->GetName(), *item.Node, *callableOutput);
  136. }
  137. AsyncNodes.erase(asyncIt);
  138. }
  139. if (!ReplaceNewNodes(output, ctx)) {
  140. return TStatus::Error;
  141. }
  142. if (!completed.Empty() && combinedStatus.Level == TStatus::Ok) {
  143. combinedStatus = TStatus::Repeat;
  144. }
  145. return combinedStatus;
  146. }
  147. bool ReplaceNewNodes(TExprNode::TPtr& output, TExprContext& ctx) {
  148. if (!NewNodes.empty()) {
  149. TOptimizeExprSettings settings(&Types);
  150. settings.VisitChanges = true;
  151. settings.VisitStarted = true;
  152. auto replaceStatus = OptimizeExpr(output, output, [&](const TExprNode::TPtr& input, TExprContext& ctx) -> TExprNode::TPtr {
  153. Y_UNUSED(ctx);
  154. const auto replace = NewNodes.find(input.Get());
  155. if (NewNodes.cend() != replace) {
  156. return replace->second;
  157. }
  158. return input;
  159. }, ctx, settings);
  160. if (!RewriteSanityCheck) {
  161. NewNodes.clear();
  162. }
  163. if (replaceStatus.Level == TStatus::Error) {
  164. return false;
  165. }
  166. }
  167. if (RewriteSanityCheck) {
  168. VisitExpr(output, [&](const TExprNode::TPtr& localInput) {
  169. if (NewNodes.cend() != NewNodes.find(localInput.Get())) {
  170. Cerr << "found old node: #" << localInput->UniqueId() << "\n" << output->Dump();
  171. YQL_ENSURE(false);
  172. }
  173. return true;
  174. });
  175. }
  176. return true;
  177. }
  178. void Rewind() override {
  179. State = MakeIntrusive<TState>();
  180. State->Promise = NThreading::NewPromise();
  181. State->HasResult = false;
  182. NewNodes.clear();
  183. FinalizingTransformer.Reset();
  184. TrackableNodes.clear();
  185. CollectingNodes.clear();
  186. ProvidersCache.clear();
  187. AsyncNodes.clear();
  188. }
  189. TStatus ExecuteNode(const TExprNode::TPtr& node, TExprNode::TPtr& output, TExprContext& ctx, ui32 depth) {
  190. output = node;
  191. bool changed = false;
  192. const auto knownNode = NewNodes.find(node.Get());
  193. if (NewNodes.cend() != knownNode) {
  194. output = knownNode->second;
  195. changed = true;
  196. }
  197. switch (output->GetState()) {
  198. case TExprNode::EState::Initial:
  199. case TExprNode::EState::TypeInProgress:
  200. case TExprNode::EState::TypePending:
  201. case TExprNode::EState::TypeComplete:
  202. case TExprNode::EState::ConstrInProgress:
  203. case TExprNode::EState::ConstrPending:
  204. return TStatus(TStatus::Repeat, true);
  205. case TExprNode::EState::ExecutionInProgress:
  206. return TStatus::Async;
  207. case TExprNode::EState::ExecutionPending:
  208. return ExecuteChildren(output, output, ctx, depth + 1);
  209. case TExprNode::EState::ConstrComplete:
  210. case TExprNode::EState::ExecutionRequired:
  211. break;
  212. case TExprNode::EState::ExecutionComplete:
  213. YQL_ENSURE(output->HasResult());
  214. OnNodeExecutionComplete(output, ctx);
  215. return changed ? TStatus(TStatus::Repeat, true) : TStatus(TStatus::Ok);
  216. case TExprNode::EState::Error:
  217. return TStatus::Error;
  218. default:
  219. YQL_ENSURE(false, "Unknown state");
  220. }
  221. switch (output->Type()) {
  222. case TExprNode::Atom:
  223. case TExprNode::Argument:
  224. case TExprNode::Arguments:
  225. case TExprNode::Lambda:
  226. ctx.AddError(TIssue(ctx.GetPosition(output->Pos()), TStringBuilder() << "Failed to execute node with type: " << output->Type()));
  227. output->SetState(TExprNode::EState::Error);
  228. return TStatus::Error;
  229. case TExprNode::List:
  230. case TExprNode::Callable:
  231. {
  232. auto prevOutput = output;
  233. auto status = output->Type() == TExprNode::Callable
  234. ? ExecuteCallable(output, output, ctx, depth)
  235. : ExecuteList(output, ctx);
  236. if (status.Level == TStatus::Error) {
  237. output->SetState(TExprNode::EState::Error);
  238. } else if (status.Level == TStatus::Ok) {
  239. output->SetState(TExprNode::EState::ExecutionComplete);
  240. OnNodeExecutionComplete(output, ctx);
  241. YQL_ENSURE(output->HasResult());
  242. } else if (status.Level == TStatus::Repeat) {
  243. if (!status.HasRestart) {
  244. output->SetState(TExprNode::EState::ExecutionPending);
  245. status = ExecuteChildren(output, output, ctx, depth + 1);
  246. if (TExprNode::EState::ExecutionPending == output->GetState()) {
  247. FreshPendingNodes.push_back(output.Get());
  248. }
  249. if (status.Level != TStatus::Repeat) {
  250. return status;
  251. }
  252. }
  253. if (output != prevOutput) {
  254. YQL_CLOG(INFO, CoreExecution) << "Rewrite node #" << node->UniqueId() << " to #" << output->UniqueId();
  255. NewNodes[node.Get()] = output;
  256. }
  257. return TStatus(TStatus::Repeat, output != prevOutput);
  258. } else if (status.Level == TStatus::Async) {
  259. output->SetState(TExprNode::EState::ExecutionInProgress);
  260. }
  261. return status;
  262. }
  263. case TExprNode::World:
  264. output->SetState(TExprNode::EState::ExecutionComplete);
  265. return TStatus::Ok;
  266. default:
  267. YQL_ENSURE(false, "Unknown type");
  268. }
  269. }
  270. TStatus ExecuteChildren(const TExprNode::TPtr& node, TExprNode::TPtr& output, TExprContext& ctx, ui32 depth) {
  271. TStatus combinedStatus = TStatus::Ok;
  272. TExprNode::TListType newChildren;
  273. bool newNode = false;
  274. for (auto& child : node->Children()) {
  275. auto newChild = child;
  276. if (child->GetState() == TExprNode::EState::ExecutionRequired) {
  277. auto childStatus = ExecuteNode(child, newChild, ctx, depth);
  278. if (childStatus.Level == TStatus::Error)
  279. return childStatus;
  280. combinedStatus = combinedStatus.Combine(childStatus);
  281. } else if (child->GetState() == TExprNode::EState::ExecutionInProgress) {
  282. combinedStatus = combinedStatus.Combine(TStatus::Async);
  283. } else if (child->GetState() == TExprNode::EState::ExecutionPending) {
  284. combinedStatus = combinedStatus.Combine(TStatus::Repeat);
  285. }
  286. newChildren.push_back(newChild);
  287. if (newChild != child) {
  288. newNode = true;
  289. }
  290. }
  291. if (combinedStatus.Level == TStatus::Ok) {
  292. Y_DEBUG_ABORT_UNLESS(!newNode);
  293. node->SetState(TExprNode::EState::ConstrComplete);
  294. return ExecuteNode(node, output, ctx, depth - 1);
  295. } else {
  296. if (combinedStatus.Level == TStatus::Error) {
  297. node->SetState(TExprNode::EState::Error);
  298. }
  299. if (newNode) {
  300. output = ctx.ChangeChildren(*node, std::move(newChildren));
  301. }
  302. return combinedStatus;
  303. }
  304. }
  305. TStatus ExecuteList(const TExprNode::TPtr& node, TExprContext& ctx) {
  306. IGraphTransformer::TStatus combinedStatus = IGraphTransformer::TStatus::Ok;
  307. for (ui32 i = 0; i < node->ChildrenSize(); ++i) {
  308. combinedStatus = combinedStatus.Combine(RequireChild(*node, i));
  309. }
  310. if (combinedStatus.Level != IGraphTransformer::TStatus::Ok) {
  311. return combinedStatus;
  312. }
  313. node->SetResult(ctx.NewWorld(node->Pos()));
  314. return TStatus::Ok;
  315. }
  316. IDataProvider* GetDataProvider(const TExprNode& node) const {
  317. IDataProvider* dataProvider = nullptr;
  318. for (const auto& x : Types.DataSources) {
  319. if (x->CanExecute(node)) {
  320. dataProvider = x.Get();
  321. break;
  322. }
  323. }
  324. if (!dataProvider) {
  325. for (const auto& x : Types.DataSinks) {
  326. if (x->CanExecute(node)) {
  327. dataProvider = x.Get();
  328. }
  329. }
  330. }
  331. return dataProvider;
  332. }
  333. TStatus ExecuteCallable(const TExprNode::TPtr& node, TExprNode::TPtr& output, TExprContext& ctx, ui32 depth) {
  334. YQL_CLOG(TRACE, CoreExecution) << '{' << depth << "}, callable #"
  335. << node->UniqueId() << " <" << node->Content() << '>';
  336. if (node->Content() == CommitName) {
  337. auto requireStatus = RequireChild(*node, 0);
  338. if (requireStatus.Level != TStatus::Ok) {
  339. return requireStatus;
  340. }
  341. auto category = node->Child(1)->Child(0)->Content();
  342. auto datasink = Types.DataSinkMap.FindPtr(category);
  343. YQL_ENSURE(datasink);
  344. output = node;
  345. auto status = (*datasink)->GetCallableExecutionTransformer().Transform(node, output, ctx);
  346. if (status.Level == TStatus::Async) {
  347. Y_DEBUG_ABORT_UNLESS(output == node);
  348. StartNode(category, *node);
  349. AddCallable(node, (*datasink).Get(), ctx);
  350. } else {
  351. if (output->GetState() == TExprNode::EState::ExecutionComplete ||
  352. output->GetState() == TExprNode::EState::Error)
  353. {
  354. StartNode(category, *node);
  355. FinishNode(category, *node, *output);
  356. }
  357. }
  358. return status;
  359. }
  360. if (node->Content() == SyncName) {
  361. return ExecuteList(node, ctx);
  362. }
  363. if (node->Content() == LeftName) {
  364. auto requireStatus = RequireChild(*node, 0);
  365. if (requireStatus.Level != TStatus::Ok) {
  366. return requireStatus;
  367. }
  368. node->SetResult(ctx.NewWorld(node->Pos()));
  369. return TStatus::Ok;
  370. }
  371. if (node->Content() == RightName) {
  372. auto requireStatus = RequireChild(*node, 0);
  373. if (requireStatus.Level != TStatus::Ok) {
  374. return requireStatus;
  375. }
  376. node->SetResult(ctx.NewWorld(node->Pos()));
  377. return TStatus::Ok;
  378. }
  379. IDataProvider* dataProvider = GetDataProvider(*node);
  380. if (!dataProvider) {
  381. ctx.AddError(TIssue(ctx.GetPosition(node->Pos()), TStringBuilder() << "Failed to execute callable with name: " << node->Content()));
  382. return TStatus::Error;
  383. }
  384. output = node;
  385. TStatus status = dataProvider->GetCallableExecutionTransformer().Transform(node, output, ctx);
  386. if (status.Level == TStatus::Async) {
  387. Y_DEBUG_ABORT_UNLESS(output == node);
  388. StartNode(dataProvider->GetName(), *node);
  389. AddCallable(node, dataProvider, ctx);
  390. } else {
  391. if (output->GetState() == TExprNode::EState::ExecutionComplete ||
  392. output->GetState() == TExprNode::EState::Error)
  393. {
  394. StartNode(dataProvider->GetName(), *node);
  395. FinishNode(dataProvider->GetName(), *node, *output);
  396. }
  397. }
  398. return status;
  399. }
  400. void AddCallable(const TExprNode::TPtr& node, IDataProvider* dataProvider, TExprContext& ctx) {
  401. Y_UNUSED(ctx);
  402. YQL_CLOG(INFO, CoreExecution) << "Register async execution for node #" << node->UniqueId();
  403. auto future = dataProvider->GetCallableExecutionTransformer().GetAsyncFuture(*node);
  404. AsyncNodes[node.Get()] = node;
  405. SubscribeAsyncFuture(node, dataProvider, future);
  406. }
  407. static void ProcessFutureResultQueue(TStatePtr state) {
  408. NThreading::TPromise<void> promiseToSet;
  409. bool hasResult = false;
  410. TGuard<TAdaptiveLock> guard(state->Lock);
  411. while (!state->Inflight.Empty()) {
  412. auto* first = state->Inflight.Front();
  413. if (first->Future.HasValue()) {
  414. state->Inflight.PopFront();
  415. state->Completed.PushBack(first);
  416. hasResult = true;
  417. } else {
  418. break;
  419. }
  420. }
  421. guard.Release();
  422. if (hasResult && !state->HasResult) {
  423. state->HasResult = true;
  424. promiseToSet = state->Promise;
  425. }
  426. if (promiseToSet.Initialized()) {
  427. promiseToSet.SetValue();
  428. }
  429. }
  430. static void ProcessAsyncFutureResult(TStatePtr state, TAutoPtr<TState::TItem> item) {
  431. NThreading::TPromise<void> promiseToSet;
  432. {
  433. TGuard<TAdaptiveLock> guard(state->Lock);
  434. state->Completed.PushBack(item.Release());
  435. if (!state->HasResult) {
  436. state->HasResult = true;
  437. promiseToSet = state->Promise;
  438. }
  439. }
  440. if (promiseToSet.Initialized()) {
  441. promiseToSet.SetValue();
  442. }
  443. }
  444. void SubscribeAsyncFuture(const TExprNode::TPtr& node, IDataProvider* dataProvider, const NThreading::TFuture<void>& future)
  445. {
  446. auto state = State;
  447. if (DeterministicMode) {
  448. TAutoPtr<TState::TItem> item = new TState::TItem;
  449. item->Node = node.Get(); item->DataProvider = dataProvider; item->Future = future;
  450. TGuard<TAdaptiveLock> guard(state->Lock);
  451. state->Inflight.PushBack(item.Release());
  452. }
  453. if (DeterministicMode) {
  454. future.Subscribe([state](const NThreading::TFuture<void>& future) {
  455. HandleFutureException(future);
  456. ProcessFutureResultQueue(state);
  457. });
  458. } else {
  459. future.Subscribe([state, node=node.Get(), dataProvider](const NThreading::TFuture<void>& future) {
  460. HandleFutureException(future);
  461. TAutoPtr<TState::TItem> item = new TState::TItem;
  462. item->Node = node; item->DataProvider = dataProvider;
  463. ProcessAsyncFutureResult(state, item.Release());
  464. });
  465. }
  466. }
  467. void StartNode(TStringBuf category, const TExprNode& node) {
  468. auto publicId = Types.TranslateOperationId(node.UniqueId());
  469. if (publicId) {
  470. auto x = Progresses.insert({ *publicId,
  471. TOperationProgress(TString(category), *publicId, TOperationProgress::EState::Started) });
  472. if (x.second) {
  473. Writer(x.first->second);
  474. }
  475. }
  476. }
  477. void FinishNode(TStringBuf category, const TExprNode& node, const TExprNode& newNode) {
  478. auto publicId = Types.TranslateOperationId(node.UniqueId());
  479. if (publicId) {
  480. if (newNode.UniqueId() != node.UniqueId()) {
  481. Types.NodeToOperationId[newNode.UniqueId()] = *publicId;
  482. }
  483. auto progIt = Progresses.find(*publicId);
  484. YQL_ENSURE(progIt != Progresses.end());
  485. auto newState = (node.GetState() == TExprNode::EState::ExecutionComplete)
  486. ? TOperationProgress::EState::Finished
  487. : TOperationProgress::EState::Failed;
  488. if (progIt->second.State != newState) {
  489. TString stage = progIt->second.Stage.first;
  490. progIt->second = TOperationProgress(TString(category), *publicId, newState, stage);
  491. Writer(progIt->second);
  492. }
  493. }
  494. }
  495. void OnNodeExecutionComplete(const TExprNode::TPtr& node, TExprContext& ctx) {
  496. auto nodeId = node->UniqueId();
  497. YQL_CLOG(INFO, CoreExecution) << "Node #" << nodeId << "<" << node->Content() << "> finished execution";
  498. auto dataProvider = GetDataProvider(*node);
  499. if (!dataProvider) {
  500. return;
  501. }
  502. TVector<ITrackableNodeProcessor::TExprNodeAndId> createdNodes;
  503. dataProvider->GetTrackableNodeProcessor().GetCreatedNodes(*node, createdNodes, ctx);
  504. TVector<TString> ids;
  505. for (const auto& c : createdNodes) {
  506. auto& info = TrackableNodes[c.Id];
  507. info.Provider = dataProvider;
  508. info.Node = c.Node;
  509. ids.push_back(c.Id);
  510. }
  511. YQL_CLOG(INFO, CoreExecution) << "Node #" << nodeId << "<" << node->Content() << "> created " << ids.size()
  512. << " trackable nodes: " << JoinSeq(", ", ids);
  513. }
  514. TStatus CollectUnusedNodes(const TExprNode& root, TExprContext& ctx) {
  515. if (TrackableNodes.empty()) {
  516. return TStatus::Ok;
  517. }
  518. YQL_CLOG(TRACE, CoreExecution) << "Collecting unused nodes on root #" << root.UniqueId();
  519. THashSet<ui64> visited;
  520. THashSet<TString> usedIds;
  521. VisitExpr(root, [&](const TExprNode& node) {
  522. if (node.GetState() == TExprNode::EState::ExecutionComplete) {
  523. return false;
  524. }
  525. auto nodeId = node.UniqueId();
  526. visited.insert(nodeId);
  527. TIntrusivePtr<IDataProvider> dataProvider;
  528. auto providerIt = ProvidersCache.find(nodeId);
  529. if (providerIt != ProvidersCache.end()) {
  530. YQL_ENSURE(providerIt->second);
  531. dataProvider = providerIt->second;
  532. } else {
  533. dataProvider = GetDataProvider(node);
  534. if (dataProvider) {
  535. ProvidersCache[nodeId] = dataProvider;
  536. }
  537. }
  538. if (dataProvider) {
  539. TVector<TString> usedNodes;
  540. dataProvider->GetTrackableNodeProcessor().GetUsedNodes(node, usedNodes);
  541. usedIds.insert(usedNodes.begin(), usedNodes.end());
  542. }
  543. return true;
  544. });
  545. for (auto i = ProvidersCache.begin(); i != ProvidersCache.end();) {
  546. if (visited.count(i->first) == 0) {
  547. ProvidersCache.erase(i++);
  548. } else {
  549. ++i;
  550. }
  551. }
  552. THashMap<TIntrusivePtr<IDataProvider>, TExprNode::TListType> toCollect;
  553. for (auto i = TrackableNodes.begin(); i != TrackableNodes.end();) {
  554. TString id = i->first;
  555. TTrackableNodeInfo info = i->second;
  556. if (!usedIds.contains(id)) {
  557. YQL_ENSURE(info.Node);
  558. YQL_ENSURE(info.Provider);
  559. YQL_CLOG(INFO, CoreExecution) << "Marking node " << id << " for collection";
  560. toCollect[info.Provider].push_back(info.Node);
  561. TrackableNodes.erase(i++);
  562. } else {
  563. ++i;
  564. }
  565. }
  566. TStatus collectStatus = TStatus::Ok;
  567. for (auto& i : toCollect) {
  568. const auto& provider = i.first;
  569. YQL_ENSURE(!i.second.empty());
  570. auto pos = i.second.front()->Pos();
  571. TExprNode::TPtr collectNode = ctx.NewList(pos, std::move(i.second));
  572. TExprNode::TPtr output;
  573. TStatus status = provider->GetTrackableNodeProcessor().GetCleanupTransformer().Transform(collectNode, output, ctx);
  574. YQL_ENSURE(status != TStatus::Repeat);
  575. collectStatus = collectStatus.Combine(status);
  576. if (status == TStatus::Error) {
  577. break;
  578. }
  579. if (status == TStatus::Async) {
  580. CollectingNodes[collectNode.Get()] = collectNode;
  581. auto future = provider->GetTrackableNodeProcessor().GetCleanupTransformer().GetAsyncFuture(*collectNode);
  582. SubscribeAsyncFuture(collectNode, provider.Get(), future);
  583. }
  584. }
  585. return collectStatus;
  586. }
  587. private:
  588. TTypeAnnotationContext& Types;
  589. TOperationProgressWriter Writer;
  590. const bool WithFinalize;
  591. TStatePtr State;
  592. TNodeOnNodeOwnedMap NewNodes;
  593. TAutoPtr<IGraphTransformer> FinalizingTransformer;
  594. THashMap<ui32, TOperationProgress> Progresses;
  595. struct TTrackableNodeInfo
  596. {
  597. TIntrusivePtr<IDataProvider> Provider;
  598. TExprNode::TPtr Node;
  599. };
  600. THashMap<TString, TTrackableNodeInfo> TrackableNodes;
  601. TNodeOnNodeOwnedMap CollectingNodes;
  602. THashMap<ui64, TIntrusivePtr<IDataProvider>> ProvidersCache;
  603. TExprNode::TListType FreshPendingNodes;
  604. bool DeterministicMode;
  605. TNodeOnNodeOwnedMap AsyncNodes;
  606. };
  607. IGraphTransformer::TStatus ValidateExecution(const TExprNode::TPtr& node, TExprContext& ctx, const TTypeAnnotationContext& types, TNodeSet& visited);
  608. IGraphTransformer::TStatus ValidateList(const TExprNode::TPtr& node, TExprContext& ctx, const TTypeAnnotationContext& types, TNodeSet& visited) {
  609. IGraphTransformer::TStatus combinedStatus = IGraphTransformer::TStatus::Ok;
  610. for (ui32 i = 0; i < node->ChildrenSize(); ++i) {
  611. combinedStatus = combinedStatus.Combine(ValidateExecution(node->ChildPtr(i), ctx, types, visited));
  612. }
  613. return combinedStatus;
  614. }
  615. IGraphTransformer::TStatus ValidateCallable(const TExprNode::TPtr& node, TExprContext& ctx, const TTypeAnnotationContext& types, TNodeSet& visited) {
  616. using TStatus = IGraphTransformer::TStatus;
  617. if (node->Content() == CommitName) {
  618. auto datasink = types.DataSinkMap.FindPtr(node->Child(1)->Child(0)->Content());
  619. if (!datasink) {
  620. ctx.AddError(TIssue(ctx.GetPosition(node->Pos()), TStringBuilder() << "Unknown datasink: " << node->Child(1)->Child(0)->Content()));
  621. return TStatus::Error;
  622. }
  623. return ValidateExecution(node->ChildPtr(0), ctx, types, visited);
  624. }
  625. if (node->Content() == SyncName) {
  626. return ValidateList(node, ctx, types, visited);
  627. }
  628. if (node->Content() == LeftName) {
  629. return ValidateExecution(node->ChildPtr(0), ctx, types, visited);
  630. }
  631. if (node->Content() == RightName) {
  632. return ValidateExecution(node->ChildPtr(0), ctx, types, visited);
  633. }
  634. IDataProvider* dataProvider = nullptr;
  635. for (auto& x : types.DataSources) {
  636. if (x->CanExecute(*node)) {
  637. dataProvider = x.Get();
  638. break;
  639. }
  640. }
  641. if (!dataProvider) {
  642. for (auto& x : types.DataSinks) {
  643. if (x->CanExecute(*node)) {
  644. dataProvider = x.Get();
  645. break;
  646. }
  647. }
  648. }
  649. if (!dataProvider) {
  650. ctx.AddError(TIssue(ctx.GetPosition(node->Pos()), TStringBuilder() << "Failed to execute callable with name: " << node->Content()
  651. << ", you possibly used cross provider/cluster operations or pulled not materialized result in refselect mode"));
  652. return TStatus::Error;
  653. }
  654. if (node->ChildrenSize() < 2) {
  655. ctx.AddError(TIssue(ctx.GetPosition(node->Pos()), TStringBuilder() << "Executable callable should have at least 2 children"));
  656. return TStatus::Error;
  657. }
  658. if (!dataProvider->ValidateExecution(*node, ctx)) {
  659. return TStatus::Error;
  660. }
  661. TExprNode::TListType childrenToCheck;
  662. dataProvider->GetRequiredChildren(*node, childrenToCheck);
  663. IGraphTransformer::TStatus combinedStatus = IGraphTransformer::TStatus::Ok;
  664. for (ui32 i = 0; i < childrenToCheck.size(); ++i) {
  665. combinedStatus = combinedStatus.Combine(ValidateExecution(childrenToCheck[i], ctx, types, visited));
  666. }
  667. return combinedStatus;
  668. }
  669. IGraphTransformer::TStatus ValidateExecution(const TExprNode::TPtr& node, TExprContext& ctx,
  670. const TTypeAnnotationContext& types, TNodeSet& visited) {
  671. using TStatus = IGraphTransformer::TStatus;
  672. if (node->GetState() == TExprNode::EState::ExecutionComplete) {
  673. return TStatus::Ok;
  674. }
  675. if (!node->GetTypeAnn()) {
  676. ctx.AddError(TIssue(ctx.GetPosition(node->Pos()), "Node has no type annotation"));
  677. return TStatus::Error;
  678. }
  679. TStatus status = TStatus::Ok;
  680. switch (node->Type()) {
  681. case TExprNode::Atom:
  682. case TExprNode::Argument:
  683. case TExprNode::Arguments:
  684. case TExprNode::Lambda:
  685. ctx.AddError(TIssue(ctx.GetPosition(node->Pos()), TStringBuilder() << "Failed to execute node with type: " << node->Type()));
  686. return TStatus::Error;
  687. case TExprNode::List:
  688. return ValidateList(node, ctx, types, visited);
  689. case TExprNode::Callable:
  690. if (visited.cend() != visited.find(node.Get())) {
  691. return TStatus::Ok;
  692. }
  693. status = ValidateCallable(node, ctx, types, visited);
  694. if (status.Level == TStatus::Ok) {
  695. visited.insert(node.Get());
  696. }
  697. break;
  698. case TExprNode::World:
  699. break;
  700. default:
  701. YQL_ENSURE(false, "Unknown type");
  702. }
  703. return status;
  704. }
  705. }
  706. TAutoPtr<IGraphTransformer> CreateExecutionTransformer(
  707. TTypeAnnotationContext& types,
  708. TOperationProgressWriter writer,
  709. bool withFinalize) {
  710. return new TExecutionTransformer(types, writer, withFinalize);
  711. }
  712. TAutoPtr<IGraphTransformer> CreateCheckExecutionTransformer(const TTypeAnnotationContext& types, bool checkWorld) {
  713. return CreateFunctorTransformer([&types, checkWorld](const TExprNode::TPtr& input, TExprNode::TPtr& output, TExprContext& ctx)
  714. -> IGraphTransformer::TStatus {
  715. output = input;
  716. if (checkWorld) {
  717. TNodeSet visited;
  718. auto status = ValidateExecution(input, ctx, types, visited);
  719. if (status.Level != IGraphTransformer::TStatus::Ok) {
  720. return status;
  721. }
  722. }
  723. TParentsMap parentsMap;
  724. THashSet<TExprNode*> overWinNodes;
  725. GatherParents(*input, parentsMap);
  726. bool hasErrors = false;
  727. THashSet<TIssue> added;
  728. auto funcCheckExecution = [&](const THashSet<TStringBuf>& notAllowList, bool collectCalcOverWindow, const TExprNode::TPtr& node) {
  729. if (node->IsCallable("ErrorType")) {
  730. hasErrors = true;
  731. const auto err = node->GetTypeAnn()->Cast<TTypeExprType>()->GetType()->Cast<TErrorExprType>()->GetError();
  732. if (added.insert(err).second) {
  733. ctx.AddError(err);
  734. }
  735. } else if (node->IsCallable("TablePath") || node->IsCallable("TableRecord")) {
  736. auto issue = TIssue(ctx.GetPosition(node->Pos()), TStringBuilder() << node->Content() << " will be empty");
  737. SetIssueCode(EYqlIssueCode::TIssuesIds_EIssueCode_CORE_FREE_TABLE_PATH_RECORD, issue);
  738. if (!ctx.AddWarning(issue)) {
  739. hasErrors = true;
  740. }
  741. } else if (node->IsCallable("UnsafeTimestampCast")) {
  742. auto issue = TIssue(ctx.GetPosition(node->Pos()), "Unsafe conversion integral value to Timestamp, consider using date types");
  743. SetIssueCode(EYqlIssueCode::TIssuesIds_EIssueCode_CORE_CAST_INTEGRAL_TO_TIMESTAMP_UNSAFE, issue);
  744. if (!ctx.AddWarning(issue)) {
  745. hasErrors = true;
  746. }
  747. } else if (collectCalcOverWindow && node->IsCallable({"CalcOverWindow", "CalcOverSessionWindow", "CalcOverWindowGroup"})) {
  748. overWinNodes.emplace(node.Get());
  749. return false;
  750. } else if (node->IsCallable(notAllowList)) {
  751. hasErrors = true;
  752. const auto err = TIssue(ctx.GetPosition(node->Pos()), TStringBuilder() << "Can't execute " << node->Content());
  753. if (added.insert(err).second) {
  754. ctx.AddError(err);
  755. }
  756. } else if (node->Type() != TExprNode::Lambda &&
  757. (node->GetTypeAnn()->GetKind() == ETypeAnnotationKind::Stream || node->GetTypeAnn()->GetKind() == ETypeAnnotationKind::Flow)) {
  758. auto parentsIt = parentsMap.find(node.Get());
  759. if (parentsIt != parentsMap.end()) {
  760. ui32 usageCount = 0;
  761. for (auto& x : parentsIt->second) {
  762. if (x->IsCallable("DependsOn")) {
  763. continue;
  764. }
  765. for (auto& y : x->Children()) {
  766. if (y.Get() == node.Get()) {
  767. ++usageCount;
  768. }
  769. }
  770. }
  771. if (usageCount > 1 && !node->GetConstraint<TEmptyConstraintNode>()) {
  772. hasErrors = true;
  773. const auto err = TIssue(ctx.GetPosition(node->Pos()), "Multiple stream clients");
  774. if (added.insert(err).second) {
  775. ctx.AddError(err);
  776. }
  777. }
  778. }
  779. }
  780. return true;
  781. };
  782. static const THashSet<TStringBuf> noExecutionList = {"InstanceOf", "Lag", "Lead", "RowNumber", "Rank", "DenseRank", "PercentRank", "CumeDist", "NTile"};
  783. static const THashSet<TStringBuf> noExecutionListForCalcOverWindow = {"InstanceOf"};
  784. VisitExpr(input, [funcCheckExecution](const TExprNode::TPtr& node) {
  785. bool collectCalcOverWindow = true;
  786. return funcCheckExecution(noExecutionList, collectCalcOverWindow, node);
  787. });
  788. for (auto overWin: overWinNodes) {
  789. VisitExpr(overWin, [funcCheckExecution](const TExprNode::TPtr& node) {
  790. bool collectCalcOverWindow = false;
  791. return funcCheckExecution(noExecutionListForCalcOverWindow, collectCalcOverWindow, node);
  792. });
  793. }
  794. return hasErrors ? IGraphTransformer::TStatus::Error : IGraphTransformer::TStatus::Ok;
  795. });
  796. };
  797. IGraphTransformer::TStatus RequireChild(const TExprNode& node, ui32 index) {
  798. switch (node.Child(index)->GetState()) {
  799. case TExprNode::EState::Error:
  800. case TExprNode::EState::ExecutionComplete:
  801. return IGraphTransformer::TStatus::Ok;
  802. case TExprNode::EState::ExecutionInProgress:
  803. case TExprNode::EState::ExecutionPending:
  804. return IGraphTransformer::TStatus::Repeat;
  805. default:
  806. break;
  807. }
  808. node.Child(index)->SetState(TExprNode::EState::ExecutionRequired);
  809. return IGraphTransformer::TStatus::Repeat;
  810. }
  811. }
  812. template<>
  813. void Out<NYql::TOperationProgress::EState>(class IOutputStream &o, NYql::TOperationProgress::EState x) {
  814. #define YQL_OPERATION_PROGRESS_STATE_MAP_TO_STRING_IMPL(name, ...) \
  815. case NYql::TOperationProgress::EState::name: \
  816. o << #name; \
  817. return;
  818. switch (x) {
  819. YQL_OPERATION_PROGRESS_STATE_MAP(YQL_OPERATION_PROGRESS_STATE_MAP_TO_STRING_IMPL)
  820. default:
  821. o << static_cast<int>(x);
  822. return;
  823. }
  824. }