worker.h 7.3 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178
  1. #pragma once
  2. #include <yql/essentials/public/purecalc/common/interface.h>
  3. #include <yql/essentials/public/udf/udf_value.h>
  4. #include <yql/essentials/ast/yql_expr.h>
  5. #include <yql/essentials/core/yql_user_data.h>
  6. #include <yql/essentials/minikql/mkql_alloc.h>
  7. #include <yql/essentials/minikql/mkql_node.h>
  8. #include <yql/essentials/minikql/mkql_node_visitor.h>
  9. #include <yql/essentials/minikql/computation/mkql_computation_node.h>
  10. #include <yql/essentials/providers/common/mkql/yql_provider_mkql.h>
  11. #include <memory>
  12. namespace NYql {
  13. namespace NPureCalc {
  14. struct TWorkerGraph {
  15. TWorkerGraph(
  16. const TExprNode::TPtr& exprRoot,
  17. TExprContext& exprCtx,
  18. const TString& serializedProgram,
  19. const NKikimr::NMiniKQL::IFunctionRegistry& funcRegistry,
  20. const TUserDataTable& userData,
  21. const TVector<const TStructExprType*>& inputTypes,
  22. const TVector<const TStructExprType*>& originalInputTypes,
  23. const TVector<const TStructExprType*>& rawInputTypes,
  24. const TTypeAnnotationNode* outputType,
  25. const TTypeAnnotationNode* rawOutputType,
  26. const TString& LLVMSettings,
  27. NKikimr::NUdf::ICountersProvider* countersProvider,
  28. ui64 nativeYtTypeFlags,
  29. TMaybe<ui64> deterministicTimeProviderSeed
  30. );
  31. ~TWorkerGraph();
  32. NKikimr::NMiniKQL::TScopedAlloc ScopedAlloc_;
  33. NKikimr::NMiniKQL::TTypeEnvironment Env_;
  34. const NKikimr::NMiniKQL::IFunctionRegistry& FuncRegistry_;
  35. TIntrusivePtr<IRandomProvider> RandomProvider_;
  36. TIntrusivePtr<ITimeProvider> TimeProvider_;
  37. NKikimr::NMiniKQL::IComputationPattern::TPtr ComputationPattern_;
  38. THolder<NKikimr::NMiniKQL::IComputationGraph> ComputationGraph_;
  39. TString LLVMSettings_;
  40. ui64 NativeYtTypeFlags_;
  41. TMaybe<TString> TimestampColumn_;
  42. const NKikimr::NMiniKQL::TType* OutputType_;
  43. const NKikimr::NMiniKQL::TType* RawOutputType_;
  44. TVector<NKikimr::NMiniKQL::IComputationExternalNode*> SelfNodes_;
  45. TVector<const NKikimr::NMiniKQL::TStructType*> InputTypes_;
  46. TVector<const NKikimr::NMiniKQL::TStructType*> OriginalInputTypes_;
  47. TVector<const NKikimr::NMiniKQL::TStructType*> RawInputTypes_;
  48. };
  49. template <typename TBase>
  50. class TWorker: public TBase {
  51. public:
  52. using TWorkerFactoryPtr = std::weak_ptr<IWorkerFactory>;
  53. private:
  54. // Worker factory implementation should stay alive for this worker to operate correctly.
  55. TWorkerFactoryPtr WorkerFactory_;
  56. protected:
  57. TWorkerGraph Graph_;
  58. public:
  59. TWorker(
  60. TWorkerFactoryPtr factory,
  61. const TExprNode::TPtr& exprRoot,
  62. TExprContext& exprCtx,
  63. const TString& serializedProgram,
  64. const NKikimr::NMiniKQL::IFunctionRegistry& funcRegistry,
  65. const TUserDataTable& userData,
  66. const TVector<const TStructExprType*>& inputTypes,
  67. const TVector<const TStructExprType*>& originalInputTypes,
  68. const TVector<const TStructExprType*>& rawInputTypes,
  69. const TTypeAnnotationNode* outputType,
  70. const TTypeAnnotationNode* rawOutputType,
  71. const TString& LLVMSettings,
  72. NKikimr::NUdf::ICountersProvider* countersProvider,
  73. ui64 nativeYtTypeFlags,
  74. TMaybe<ui64> deterministicTimeProviderSeed
  75. );
  76. public:
  77. ui32 GetInputsCount() const override;
  78. const NKikimr::NMiniKQL::TStructType* GetInputType(ui32, bool) const override;
  79. const NKikimr::NMiniKQL::TStructType* GetInputType(bool) const override;
  80. const NKikimr::NMiniKQL::TStructType* GetRawInputType(ui32) const override;
  81. const NKikimr::NMiniKQL::TStructType* GetRawInputType() const override;
  82. const NKikimr::NMiniKQL::TType* GetOutputType() const override;
  83. const NKikimr::NMiniKQL::TType* GetRawOutputType() const override;
  84. NYT::TNode MakeInputSchema() const override;
  85. NYT::TNode MakeInputSchema(ui32) const override;
  86. NYT::TNode MakeOutputSchema() const override;
  87. NYT::TNode MakeOutputSchema(ui32) const override;
  88. NYT::TNode MakeOutputSchema(TStringBuf) const override;
  89. NYT::TNode MakeFullOutputSchema() const override;
  90. NKikimr::NMiniKQL::TScopedAlloc& GetScopedAlloc() override;
  91. NKikimr::NMiniKQL::IComputationGraph& GetGraph() override;
  92. const NKikimr::NMiniKQL::IFunctionRegistry& GetFunctionRegistry() const override;
  93. NKikimr::NMiniKQL::TTypeEnvironment& GetTypeEnvironment() override;
  94. const TString& GetLLVMSettings() const override;
  95. ui64 GetNativeYtTypeFlags() const override;
  96. ITimeProvider* GetTimeProvider() const override;
  97. protected:
  98. void Release() override;
  99. };
  100. class TPullStreamWorker final: public TWorker<IPullStreamWorker> {
  101. private:
  102. NKikimr::NUdf::TUnboxedValue Output_ = NKikimr::NUdf::TUnboxedValue::Invalid();
  103. TVector<bool> HasInput_;
  104. inline bool CheckAllInputsSet() {
  105. return AllOf(HasInput_, [](bool x) { return x; });
  106. }
  107. public:
  108. using TWorker::TWorker;
  109. ~TPullStreamWorker();
  110. public:
  111. void SetInput(NKikimr::NUdf::TUnboxedValue&&, ui32) override;
  112. NKikimr::NUdf::TUnboxedValue& GetOutput() override;
  113. protected:
  114. void Release() override;
  115. };
  116. class TPullListWorker final: public TWorker<IPullListWorker> {
  117. private:
  118. NKikimr::NUdf::TUnboxedValue Output_ = NKikimr::NUdf::TUnboxedValue::Invalid();
  119. NKikimr::NUdf::TUnboxedValue OutputIterator_ = NKikimr::NUdf::TUnboxedValue::Invalid();
  120. TVector<bool> HasInput_;
  121. inline bool CheckAllInputsSet() {
  122. return AllOf(HasInput_, [](bool x) { return x; });
  123. }
  124. public:
  125. using TWorker::TWorker;
  126. ~TPullListWorker();
  127. public:
  128. void SetInput(NKikimr::NUdf::TUnboxedValue&&, ui32) override;
  129. NKikimr::NUdf::TUnboxedValue& GetOutput() override;
  130. NKikimr::NUdf::TUnboxedValue& GetOutputIterator() override;
  131. void ResetOutputIterator() override;
  132. protected:
  133. void Release() override;
  134. };
  135. class TPushStreamWorker final: public TWorker<IPushStreamWorker> {
  136. private:
  137. THolder<IConsumer<const NKikimr::NUdf::TUnboxedValue*>> Consumer_{};
  138. bool Finished_ = false;
  139. NKikimr::NMiniKQL::IComputationExternalNode* SelfNode_ = nullptr;
  140. public:
  141. using TWorker::TWorker;
  142. private:
  143. void FeedToConsumer();
  144. NYql::NUdf::IBoxedValue* GetPushStream() const;
  145. public:
  146. void SetConsumer(THolder<IConsumer<const NKikimr::NUdf::TUnboxedValue*>>) override;
  147. void Push(NKikimr::NUdf::TUnboxedValue&&) override;
  148. void OnFinish() override;
  149. protected:
  150. void Release() override;
  151. };
  152. }
  153. }