yql_graph_transformer.h 16 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460
  1. #pragma once
  2. #include <yql/essentials/ast/yql_expr.h>
  3. #include <yql/essentials/utils/yql_panic.h>
  4. #include <yql/essentials/core/issue/yql_issue.h>
  5. #include <library/cpp/threading/future/future.h>
  6. #include <util/generic/hash.h>
  7. #include <util/datetime/base.h>
  8. #include <functional>
  9. namespace NYql {
  10. class IGraphTransformer {
  11. public:
  12. struct TStatus {
  13. #define YQL_GT_STATUS_MAP(xx) \
  14. xx(Ok, 0) \
  15. xx(Repeat, 1) \
  16. xx(Async, 2) \
  17. xx(Error, 3)
  18. enum ELevel {
  19. YQL_GT_STATUS_MAP(ENUM_VALUE_GEN)
  20. };
  21. union {
  22. ui32 Raw;
  23. struct {
  24. ui32 Level : 4;
  25. ui32 HasRestart : 1;
  26. ui32 Padding : 27;
  27. };
  28. };
  29. bool operator== (const TStatus& other) const {
  30. return Raw == other.Raw;
  31. }
  32. bool operator!= (const TStatus& other) const {
  33. return Raw != other.Raw;
  34. }
  35. bool operator== (ELevel other) const {
  36. return Level == other;
  37. }
  38. bool operator!= (ELevel other) const {
  39. return Level != other;
  40. }
  41. TStatus(ELevel level, bool hasRestart = false)
  42. : Level(level)
  43. , HasRestart(hasRestart)
  44. , Padding(0)
  45. {}
  46. [[nodiscard]]
  47. TStatus Combine(TStatus other) const {
  48. const bool hasRestart = HasRestart || other.HasRestart;
  49. return TStatus((TStatus::ELevel)Max(Level, other.Level), hasRestart);
  50. }
  51. void Out(IOutputStream &out) const {
  52. out << (TStatus::ELevel)Level;
  53. if (HasRestart) {
  54. out << ", with restart";
  55. }
  56. }
  57. };
  58. struct TStatistics {
  59. TDuration TransformDuration;
  60. TDuration WaitDuration;
  61. i32 NewExprNodes;
  62. i32 NewTypeNodes;
  63. i32 NewConstraintNodes;
  64. ui32 Repeats;
  65. ui32 Restarts;
  66. TVector<std::pair<TString, TStatistics>> Stages;
  67. TStatistics()
  68. : TransformDuration(TDuration::Zero())
  69. , WaitDuration(TDuration::Zero())
  70. , NewExprNodes(0)
  71. , NewTypeNodes(0)
  72. , NewConstraintNodes(0)
  73. , Repeats(0)
  74. , Restarts(0)
  75. , Stages() {}
  76. static TStatistics NotPresent() { return TStatistics(); }
  77. static TStatistics Zero() { return TStatistics(); }
  78. };
  79. virtual ~IGraphTransformer() {}
  80. virtual TStatus Transform(TExprNode::TPtr input, TExprNode::TPtr& output, TExprContext& ctx) = 0;
  81. virtual NThreading::TFuture<void> GetAsyncFuture(const TExprNode& input) = 0;
  82. virtual TStatus ApplyAsyncChanges(TExprNode::TPtr input, TExprNode::TPtr& output, TExprContext& ctx) = 0;
  83. virtual void Rewind() = 0;
  84. virtual TStatistics GetStatistics() const { return TStatistics::NotPresent(); }
  85. };
  86. class TGraphTransformerBase : public IGraphTransformer {
  87. private:
  88. class TTransformScope {
  89. public:
  90. TTransformScope(TStatistics& statistics, const TExprContext* exprCtx)
  91. : Statistics_(statistics)
  92. , ExprCtx_(exprCtx)
  93. , TransformStart_(TInstant::Now())
  94. , ExprNodesSize_(exprCtx ? exprCtx->ExprNodes.size() : 0)
  95. , TypeNodesSize_(exprCtx ? exprCtx->TypeNodes.size() : 0)
  96. , ConstraintNodesSize_(exprCtx ? exprCtx->ConstraintNodes.size() : 0)
  97. {
  98. }
  99. ~TTransformScope() {
  100. Statistics_.TransformDuration += TInstant::Now() - TransformStart_;
  101. if (ExprCtx_) {
  102. Statistics_.NewExprNodes += ExprCtx_->ExprNodes.size() - ExprNodesSize_;
  103. Statistics_.NewTypeNodes += ExprCtx_->TypeNodes.size() - TypeNodesSize_;
  104. Statistics_.NewConstraintNodes += ExprCtx_->ConstraintNodes.size() - ConstraintNodesSize_;
  105. }
  106. }
  107. TStatus HandleStatus(const TStatus& status) {
  108. if (status == TStatus::Repeat) {
  109. Statistics_.Repeats++;
  110. }
  111. if (status.HasRestart) {
  112. Statistics_.Restarts++;
  113. }
  114. return status;
  115. }
  116. private:
  117. TStatistics& Statistics_;
  118. const TExprContext* ExprCtx_;
  119. TInstant TransformStart_;
  120. i64 ExprNodesSize_;
  121. i64 TypeNodesSize_;
  122. i64 ConstraintNodesSize_;
  123. };
  124. public:
  125. TGraphTransformerBase()
  126. : Statistics_(TStatistics::Zero())
  127. , AsyncStart_() {}
  128. TStatus Transform(TExprNode::TPtr input, TExprNode::TPtr& output, TExprContext& ctx) final {
  129. TTransformScope scope(Statistics_, &ctx);
  130. return scope.HandleStatus(DoTransform(input, output, ctx));
  131. }
  132. NThreading::TFuture<void> GetAsyncFuture(const TExprNode& input) final {
  133. TTransformScope scope(Statistics_, nullptr);
  134. AsyncStart_ = TInstant::Now();
  135. return DoGetAsyncFuture(input);
  136. }
  137. TStatus ApplyAsyncChanges(TExprNode::TPtr input, TExprNode::TPtr& output, TExprContext& ctx) final {
  138. TTransformScope scope(Statistics_, &ctx);
  139. Statistics_.WaitDuration += TInstant::Now() - AsyncStart_;
  140. return scope.HandleStatus(DoApplyAsyncChanges(input, output, ctx));
  141. }
  142. virtual TStatistics GetStatistics() const override { return Statistics_; }
  143. public:
  144. virtual TStatus DoTransform(TExprNode::TPtr input, TExprNode::TPtr& output, TExprContext& ctx) = 0;
  145. virtual NThreading::TFuture<void> DoGetAsyncFuture(const TExprNode& input) = 0;
  146. virtual TStatus DoApplyAsyncChanges(TExprNode::TPtr input, TExprNode::TPtr& output, TExprContext& ctx) = 0;
  147. protected:
  148. mutable TStatistics Statistics_;
  149. private:
  150. TInstant AsyncStart_;
  151. };
  152. TAutoPtr<IGraphTransformer> MakeSharedTransformerProxy(const std::shared_ptr<IGraphTransformer>& inner);
  153. struct TTransformStage {
  154. TString Name;
  155. EYqlIssueCode IssueCode;
  156. TString IssueMessage;
  157. TTransformStage(const TAutoPtr<IGraphTransformer>& transformer, const TString& name, EYqlIssueCode issueCode, const TString& issueMessage = {})
  158. : Name(name)
  159. , IssueCode(issueCode)
  160. , IssueMessage(issueMessage)
  161. , RawTransformer_(transformer.Get())
  162. , Transformer_(transformer)
  163. {}
  164. TTransformStage(IGraphTransformer& transformer, const TString& name, EYqlIssueCode issueCode, const TString& issueMessage = {})
  165. : Name(name)
  166. , IssueCode(issueCode)
  167. , IssueMessage(issueMessage)
  168. , RawTransformer_(&transformer)
  169. {}
  170. IGraphTransformer& GetTransformer() const
  171. {
  172. return *RawTransformer_;
  173. }
  174. private:
  175. IGraphTransformer* const RawTransformer_;
  176. const TAutoPtr<IGraphTransformer> Transformer_;
  177. };
  178. TAutoPtr<IGraphTransformer> CreateCompositeGraphTransformer(const TVector<TTransformStage>& stages, bool useIssueScopes);
  179. TAutoPtr<IGraphTransformer> CreateCompositeGraphTransformerWithNoArgChecks(const TVector<TTransformStage>& stages, bool useIssueScopes);
  180. TAutoPtr<IGraphTransformer> CreateChoiceGraphTransformer(
  181. const std::function<bool(const TExprNode::TPtr& input, TExprContext& ctx)>& condition,
  182. const TTransformStage& left,
  183. const TTransformStage& right);
  184. IGraphTransformer::TStatus SyncTransform(IGraphTransformer& transformer, TExprNode::TPtr& root, TExprContext& ctx);
  185. IGraphTransformer::TStatus InstantTransform(IGraphTransformer& transformer, TExprNode::TPtr& root, TExprContext& ctx, bool breakOnRestart = false);
  186. NThreading::TFuture<IGraphTransformer::TStatus> AsyncTransform(IGraphTransformer& transformer, TExprNode::TPtr& root, TExprContext& ctx, bool applyAsyncChanges);
  187. void AsyncTransform(IGraphTransformer& transformer, TExprNode::TPtr& root, TExprContext& ctx, bool applyAsyncChanges,
  188. std::function<void(const IGraphTransformer::TStatus&)> asyncCallback);
  189. IGraphTransformer::TStatus AsyncTransformStep(IGraphTransformer& transformer, TExprNode::TPtr& root,
  190. TExprContext& ctx, bool applyAsyncChanges);
  191. template <typename T>
  192. void HandleFutureException(const NThreading::TFuture<T>& future) {
  193. if (future.HasException()) {
  194. try {
  195. future.TryRethrow();
  196. } catch (...) {
  197. throw yexception() << "Unexpected future exception: " << CurrentExceptionMessage();
  198. }
  199. }
  200. }
  201. class TSyncTransformerBase : public TGraphTransformerBase {
  202. public:
  203. NThreading::TFuture<void> DoGetAsyncFuture(const TExprNode& input) final {
  204. Y_UNUSED(input);
  205. YQL_ENSURE(false, "Not supported");
  206. }
  207. TStatus DoApplyAsyncChanges(TExprNode::TPtr input, TExprNode::TPtr& output, TExprContext& ctx) final {
  208. Y_UNUSED(input);
  209. Y_UNUSED(output);
  210. Y_UNUSED(ctx);
  211. YQL_ENSURE(false, "Not supported");
  212. }
  213. };
  214. class TNullTransformer final: public TSyncTransformerBase {
  215. public:
  216. TStatus DoTransform(TExprNode::TPtr input, TExprNode::TPtr& output, TExprContext& ctx) final {
  217. output = input;
  218. Y_UNUSED(ctx);
  219. return IGraphTransformer::TStatus::Ok;
  220. }
  221. void Rewind() final {
  222. }
  223. };
  224. template <typename TFunctor>
  225. class TFunctorTransformer: public TSyncTransformerBase {
  226. public:
  227. TFunctorTransformer(TFunctor functor)
  228. : Functor_(std::move(functor)) {}
  229. TStatus DoTransform(TExprNode::TPtr input, TExprNode::TPtr& output, TExprContext& ctx) override {
  230. TStatus status = Functor_(input, output, ctx);
  231. YQL_ENSURE(status.Level != IGraphTransformer::TStatus::Async);
  232. return status;
  233. }
  234. void Rewind() override {
  235. }
  236. private:
  237. TFunctor Functor_;
  238. };
  239. template <typename TFunctor>
  240. class TSinglePassFunctorTransformer final: public TFunctorTransformer<TFunctor> {
  241. using TBase = TFunctorTransformer<TFunctor>;
  242. public:
  243. TSinglePassFunctorTransformer(TFunctor functor)
  244. : TFunctorTransformer<TFunctor>(std::move(functor))
  245. {}
  246. IGraphTransformer::TStatus DoTransform(TExprNode::TPtr input, TExprNode::TPtr& output, TExprContext& ctx) final {
  247. if (Pass_) {
  248. output = input;
  249. return IGraphTransformer::TStatus::Ok;
  250. }
  251. IGraphTransformer::TStatus status = TBase::DoTransform(input, output, ctx);
  252. if (IGraphTransformer::TStatus::Ok == status.Level) {
  253. Pass_ = true;
  254. }
  255. return status;
  256. }
  257. void Rewind() final {
  258. Pass_ = false;
  259. }
  260. private:
  261. bool Pass_ = false;
  262. };
  263. template <typename TFunctor>
  264. THolder<IGraphTransformer> CreateFunctorTransformer(TFunctor functor) {
  265. return MakeHolder<TFunctorTransformer<TFunctor>>(std::move(functor));
  266. }
  267. template <typename TFunctor>
  268. THolder<IGraphTransformer> CreateSinglePassFunctorTransformer(TFunctor functor) {
  269. return MakeHolder<TSinglePassFunctorTransformer<TFunctor>>(std::move(functor));
  270. }
  271. typedef std::function<IGraphTransformer::TStatus(const TExprNode::TPtr&, TExprNode::TPtr&, TExprContext&)> TAsyncTransformCallback;
  272. typedef NThreading::TFuture<TAsyncTransformCallback> TAsyncTransformCallbackFuture;
  273. template <typename TDerived>
  274. class TAsyncCallbackTransformer : public TGraphTransformerBase {
  275. public:
  276. // CallbackTransform should return std::pair<TStatus, TAsyncTransformCallbackFuture>
  277. TStatus DoTransform(TExprNode::TPtr input, TExprNode::TPtr& output, TExprContext& ctx) final {
  278. auto pair = static_cast<TDerived*>(this)->CallbackTransform(input, output, ctx);
  279. if (pair.first == TStatus::Async) {
  280. YQL_ENSURE(Callbacks_.emplace(input.Get(), pair.second).second);
  281. }
  282. return pair.first;
  283. }
  284. NThreading::TFuture<void> DoGetAsyncFuture(const TExprNode& input) final {
  285. const auto it = Callbacks_.find(&input);
  286. YQL_ENSURE(it != Callbacks_.cend());
  287. return it->second.IgnoreResult();
  288. }
  289. TStatus DoApplyAsyncChanges(TExprNode::TPtr input, TExprNode::TPtr& output, TExprContext& ctx) final {
  290. const auto it = Callbacks_.find(input.Get());
  291. YQL_ENSURE(it != Callbacks_.cend());
  292. auto& future = it->second;
  293. YQL_ENSURE(future.HasValue());
  294. const auto status = future.GetValue()(input, output, ctx);
  295. Callbacks_.erase(it);
  296. return status;
  297. }
  298. void Rewind() override {
  299. Callbacks_.clear();
  300. }
  301. private:
  302. TNodeMap<TAsyncTransformCallbackFuture> Callbacks_;
  303. };
  304. template <bool AlwaysRaiseIssues = true, typename TFuture, typename TCallback>
  305. std::pair<IGraphTransformer::TStatus, TAsyncTransformCallbackFuture>
  306. WrapFutureCallback(const TFuture& future, const TCallback& callback, const TString& message = "") {
  307. return std::make_pair(IGraphTransformer::TStatus::Async, future.Apply(
  308. [callback, message](const TFuture& completedFuture) {
  309. return TAsyncTransformCallback([completedFuture, callback, message](const TExprNode::TPtr& input,
  310. TExprNode::TPtr& output, TExprContext& ctx)
  311. {
  312. output = input;
  313. const auto& res = completedFuture.GetValue();
  314. TIssueScopeGuard issueScope(ctx.IssueManager, [&]() {
  315. return MakeIntrusive<TIssue>(
  316. ctx.GetPosition(input->Pos()),
  317. message.empty()
  318. ? TStringBuilder() << "Execution of node: " << input->Content()
  319. : message);
  320. });
  321. if constexpr (AlwaysRaiseIssues)
  322. res.ReportIssues(ctx.IssueManager);
  323. if (!res.Success()) {
  324. if constexpr (!AlwaysRaiseIssues)
  325. res.ReportIssues(ctx.IssueManager);
  326. input->SetState(TExprNode::EState::Error);
  327. return IGraphTransformer::TStatus(IGraphTransformer::TStatus::Error);
  328. }
  329. else {
  330. return callback(res, input, output, ctx);
  331. }
  332. });
  333. }));
  334. }
  335. template <typename TFuture, typename TResultExtractor>
  336. std::pair<IGraphTransformer::TStatus, TAsyncTransformCallbackFuture>
  337. WrapFuture(const TFuture& future, const TResultExtractor& extractor, const TString& message = "") {
  338. return WrapFutureCallback(future, [extractor](const NThreading::TFutureType<TFuture>& res, const TExprNode::TPtr& input, TExprNode::TPtr& /*output*/, TExprContext& ctx) {
  339. input->SetState(TExprNode::EState::ExecutionComplete);
  340. input->SetResult(extractor(res, input, ctx));
  341. return IGraphTransformer::TStatus(IGraphTransformer::TStatus::Ok);
  342. }, message);
  343. }
  344. template <typename TFuture, typename TResultExtractor>
  345. std::pair<IGraphTransformer::TStatus, TAsyncTransformCallbackFuture>
  346. WrapModifyFuture(const TFuture& future, const TResultExtractor& extractor, const TString& message = "") {
  347. return WrapFutureCallback(future, [extractor](const NThreading::TFutureType<TFuture>& res, const TExprNode::TPtr& input, TExprNode::TPtr& output, TExprContext& ctx) {
  348. TExprNode::TPtr resultNode = extractor(res, input, output, ctx);
  349. input->SetState(TExprNode::EState::ExecutionComplete);
  350. output->SetResult(std::move(resultNode));
  351. if (input != output) {
  352. return IGraphTransformer::TStatus(IGraphTransformer::TStatus::Repeat, true);
  353. }
  354. return IGraphTransformer::TStatus(IGraphTransformer::TStatus::Ok);
  355. }, message);
  356. }
  357. inline std::pair<IGraphTransformer::TStatus, TAsyncTransformCallbackFuture> SyncStatus(IGraphTransformer::TStatus status) {
  358. return std::make_pair(status, TAsyncTransformCallbackFuture());
  359. }
  360. inline std::pair<IGraphTransformer::TStatus, TAsyncTransformCallbackFuture> SyncError() {
  361. return SyncStatus(IGraphTransformer::TStatus::Error);
  362. }
  363. inline std::pair<IGraphTransformer::TStatus, TAsyncTransformCallbackFuture> SyncOk() {
  364. return SyncStatus(IGraphTransformer::TStatus::Ok);
  365. }
  366. inline std::pair<IGraphTransformer::TStatus, TAsyncTransformCallbackFuture> SyncRepeat() {
  367. return SyncStatus(IGraphTransformer::TStatus::Repeat);
  368. }
  369. typedef std::unordered_map<TExprNode::TPtr, ui64, TExprNode::TPtrHash> TSyncMap;
  370. }
  371. template<>
  372. inline void Out<NYql::IGraphTransformer::TStatus>(
  373. IOutputStream &out, const NYql::IGraphTransformer::TStatus& status)
  374. {
  375. status.Out(out);
  376. }