worker.h 7.4 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179
  1. #pragma once
  2. #include <yql/essentials/public/purecalc/common/interface.h>
  3. #include <yql/essentials/public/udf/udf_value.h>
  4. #include <yql/essentials/ast/yql_expr.h>
  5. #include <yql/essentials/core/yql_user_data.h>
  6. #include <yql/essentials/minikql/mkql_alloc.h>
  7. #include <yql/essentials/minikql/mkql_node.h>
  8. #include <yql/essentials/minikql/mkql_node_visitor.h>
  9. #include <yql/essentials/minikql/computation/mkql_computation_node.h>
  10. #include <yql/essentials/providers/common/mkql/yql_provider_mkql.h>
  11. #include <memory>
  12. namespace NYql {
  13. namespace NPureCalc {
  14. struct TWorkerGraph {
  15. TWorkerGraph(
  16. const TExprNode::TPtr& exprRoot,
  17. TExprContext& exprCtx,
  18. const TString& serializedProgram,
  19. const NKikimr::NMiniKQL::IFunctionRegistry& funcRegistry,
  20. const TUserDataTable& userData,
  21. const TVector<const TStructExprType*>& inputTypes,
  22. const TVector<const TStructExprType*>& originalInputTypes,
  23. const TVector<const TStructExprType*>& rawInputTypes,
  24. const TTypeAnnotationNode* outputType,
  25. const TTypeAnnotationNode* rawOutputType,
  26. const TString& LLVMSettings,
  27. NKikimr::NUdf::ICountersProvider* countersProvider,
  28. ui64 nativeYtTypeFlags,
  29. TMaybe<ui64> deterministicTimeProviderSeed
  30. );
  31. ~TWorkerGraph();
  32. NKikimr::NMiniKQL::TScopedAlloc ScopedAlloc_;
  33. NKikimr::NMiniKQL::TTypeEnvironment Env_;
  34. const NKikimr::NMiniKQL::IFunctionRegistry& FuncRegistry_;
  35. TIntrusivePtr<IRandomProvider> RandomProvider_;
  36. TIntrusivePtr<ITimeProvider> TimeProvider_;
  37. NKikimr::NMiniKQL::IComputationPattern::TPtr ComputationPattern_;
  38. THolder<NKikimr::NMiniKQL::IComputationGraph> ComputationGraph_;
  39. TString LLVMSettings_;
  40. ui64 NativeYtTypeFlags_;
  41. TMaybe<TString> TimestampColumn_;
  42. const NKikimr::NMiniKQL::TType* OutputType_;
  43. const NKikimr::NMiniKQL::TType* RawOutputType_;
  44. TVector<NKikimr::NMiniKQL::IComputationExternalNode*> SelfNodes_;
  45. TVector<const NKikimr::NMiniKQL::TStructType*> InputTypes_;
  46. TVector<const NKikimr::NMiniKQL::TStructType*> OriginalInputTypes_;
  47. TVector<const NKikimr::NMiniKQL::TStructType*> RawInputTypes_;
  48. };
  49. template <typename TBase>
  50. class TWorker: public TBase {
  51. public:
  52. using TWorkerFactoryPtr = std::weak_ptr<IWorkerFactory>;
  53. private:
  54. // Worker factory implementation should stay alive for this worker to operate correctly.
  55. TWorkerFactoryPtr WorkerFactory_;
  56. protected:
  57. TWorkerGraph Graph_;
  58. public:
  59. TWorker(
  60. TWorkerFactoryPtr factory,
  61. const TExprNode::TPtr& exprRoot,
  62. TExprContext& exprCtx,
  63. const TString& serializedProgram,
  64. const NKikimr::NMiniKQL::IFunctionRegistry& funcRegistry,
  65. const TUserDataTable& userData,
  66. const TVector<const TStructExprType*>& inputTypes,
  67. const TVector<const TStructExprType*>& originalInputTypes,
  68. const TVector<const TStructExprType*>& rawInputTypes,
  69. const TTypeAnnotationNode* outputType,
  70. const TTypeAnnotationNode* rawOutputType,
  71. const TString& LLVMSettings,
  72. NKikimr::NUdf::ICountersProvider* countersProvider,
  73. ui64 nativeYtTypeFlags,
  74. TMaybe<ui64> deterministicTimeProviderSeed
  75. );
  76. public:
  77. ui32 GetInputsCount() const override;
  78. const NKikimr::NMiniKQL::TStructType* GetInputType(ui32, bool) const override;
  79. const NKikimr::NMiniKQL::TStructType* GetInputType(bool) const override;
  80. const NKikimr::NMiniKQL::TStructType* GetRawInputType(ui32) const override;
  81. const NKikimr::NMiniKQL::TStructType* GetRawInputType() const override;
  82. const NKikimr::NMiniKQL::TType* GetOutputType() const override;
  83. const NKikimr::NMiniKQL::TType* GetRawOutputType() const override;
  84. NYT::TNode MakeInputSchema() const override;
  85. NYT::TNode MakeInputSchema(ui32) const override;
  86. NYT::TNode MakeOutputSchema() const override;
  87. NYT::TNode MakeOutputSchema(ui32) const override;
  88. NYT::TNode MakeOutputSchema(TStringBuf) const override;
  89. NYT::TNode MakeFullOutputSchema() const override;
  90. NKikimr::NMiniKQL::TScopedAlloc& GetScopedAlloc() override;
  91. NKikimr::NMiniKQL::IComputationGraph& GetGraph() override;
  92. const NKikimr::NMiniKQL::IFunctionRegistry& GetFunctionRegistry() const override;
  93. NKikimr::NMiniKQL::TTypeEnvironment& GetTypeEnvironment() override;
  94. const TString& GetLLVMSettings() const override;
  95. ui64 GetNativeYtTypeFlags() const override;
  96. ITimeProvider* GetTimeProvider() const override;
  97. void Invalidate() override;
  98. protected:
  99. void Release() override;
  100. };
  101. class TPullStreamWorker final: public TWorker<IPullStreamWorker> {
  102. private:
  103. NKikimr::NUdf::TUnboxedValue Output_ = NKikimr::NUdf::TUnboxedValue::Invalid();
  104. TVector<bool> HasInput_;
  105. inline bool CheckAllInputsSet() {
  106. return AllOf(HasInput_, [](bool x) { return x; });
  107. }
  108. public:
  109. using TWorker::TWorker;
  110. ~TPullStreamWorker();
  111. public:
  112. void SetInput(NKikimr::NUdf::TUnboxedValue&&, ui32) override;
  113. NKikimr::NUdf::TUnboxedValue& GetOutput() override;
  114. protected:
  115. void Release() override;
  116. };
  117. class TPullListWorker final: public TWorker<IPullListWorker> {
  118. private:
  119. NKikimr::NUdf::TUnboxedValue Output_ = NKikimr::NUdf::TUnboxedValue::Invalid();
  120. NKikimr::NUdf::TUnboxedValue OutputIterator_ = NKikimr::NUdf::TUnboxedValue::Invalid();
  121. TVector<bool> HasInput_;
  122. inline bool CheckAllInputsSet() {
  123. return AllOf(HasInput_, [](bool x) { return x; });
  124. }
  125. public:
  126. using TWorker::TWorker;
  127. ~TPullListWorker();
  128. public:
  129. void SetInput(NKikimr::NUdf::TUnboxedValue&&, ui32) override;
  130. NKikimr::NUdf::TUnboxedValue& GetOutput() override;
  131. NKikimr::NUdf::TUnboxedValue& GetOutputIterator() override;
  132. void ResetOutputIterator() override;
  133. protected:
  134. void Release() override;
  135. };
  136. class TPushStreamWorker final: public TWorker<IPushStreamWorker> {
  137. private:
  138. THolder<IConsumer<const NKikimr::NUdf::TUnboxedValue*>> Consumer_{};
  139. bool Finished_ = false;
  140. NKikimr::NMiniKQL::IComputationExternalNode* SelfNode_ = nullptr;
  141. public:
  142. using TWorker::TWorker;
  143. private:
  144. void FeedToConsumer();
  145. NYql::NUdf::IBoxedValue* GetPushStream() const;
  146. public:
  147. void SetConsumer(THolder<IConsumer<const NKikimr::NUdf::TUnboxedValue*>>) override;
  148. void Push(NKikimr::NUdf::TUnboxedValue&&) override;
  149. void OnFinish() override;
  150. protected:
  151. void Release() override;
  152. };
  153. }
  154. }