program_factory.cpp 4.6 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158
  1. #include "program_factory.h"
  2. #include "logger_init.h"
  3. #include "names.h"
  4. #include "worker_factory.h"
  5. #include <yql/essentials/utils/log/log.h>
  6. using namespace NYql;
  7. using namespace NYql::NPureCalc;
  8. TProgramFactory::TProgramFactory(const TProgramFactoryOptions& options)
  9. : Options_(options)
  10. , ExprOutputStream_(Options_.ExprOutputStream)
  11. , CountersProvider_(nullptr)
  12. {
  13. EnsureLoggingInitialized();
  14. if (!TryFromString(Options_.BlockEngineSettings, BlockEngineMode_)) {
  15. ythrow TCompileError("", "") << "Unknown BlockEngineSettings value: expected "
  16. << GetEnumAllNames<EBlockEngineMode>()
  17. << ", but got: "
  18. << Options_.BlockEngineSettings;
  19. }
  20. NUserData::TUserData::UserDataToLibraries(Options_.UserData_, Modules_);
  21. UserData_ = GetYqlModuleResolver(ExprContext_, ModuleResolver_, Options_.UserData_, {}, {});
  22. if (!ModuleResolver_) {
  23. ythrow TCompileError("", ExprContext_.IssueManager.GetIssues().ToString()) << "failed to compile modules";
  24. }
  25. TVector<TString> UDFsPaths;
  26. for (const auto& item: Options_.UserData_) {
  27. if (
  28. item.Type_ == NUserData::EType::UDF &&
  29. item.Disposition_ == NUserData::EDisposition::FILESYSTEM
  30. ) {
  31. UDFsPaths.push_back(item.Content_);
  32. }
  33. }
  34. if (!Options_.UdfsDir_.empty()) {
  35. NKikimr::NMiniKQL::FindUdfsInDir(Options_.UdfsDir_, &UDFsPaths);
  36. }
  37. FuncRegistry_ = NKikimr::NMiniKQL::CreateFunctionRegistry(
  38. &NYql::NBacktrace::KikimrBackTrace, NKikimr::NMiniKQL::CreateBuiltinRegistry(), false, UDFsPaths)->Clone();
  39. NKikimr::NMiniKQL::FillStaticModules(*FuncRegistry_);
  40. }
  41. TProgramFactory::~TProgramFactory() {
  42. }
  43. void TProgramFactory::AddUdfModule(
  44. const TStringBuf& moduleName,
  45. NKikimr::NUdf::TUniquePtr<NKikimr::NUdf::IUdfModule>&& module
  46. ) {
  47. FuncRegistry_->AddModule(
  48. TString::Join(PurecalcUdfModulePrefix, moduleName), moduleName, std::move(module)
  49. );
  50. }
  51. void TProgramFactory::SetCountersProvider(NKikimr::NUdf::ICountersProvider* provider) {
  52. CountersProvider_ = provider;
  53. }
  54. IPullStreamWorkerFactoryPtr TProgramFactory::MakePullStreamWorkerFactory(
  55. const TInputSpecBase& inputSpec,
  56. const TOutputSpecBase& outputSpec,
  57. TString query,
  58. ETranslationMode mode,
  59. ui16 syntaxVersion
  60. ) {
  61. return std::make_shared<TPullStreamWorkerFactory>(TWorkerFactoryOptions(
  62. TIntrusivePtr<TProgramFactory>(this),
  63. inputSpec,
  64. outputSpec,
  65. query,
  66. FuncRegistry_,
  67. ModuleResolver_,
  68. UserData_,
  69. Modules_,
  70. Options_.LLVMSettings,
  71. BlockEngineMode_,
  72. ExprOutputStream_,
  73. CountersProvider_,
  74. mode,
  75. syntaxVersion,
  76. Options_.NativeYtTypeFlags,
  77. Options_.DeterministicTimeProviderSeed,
  78. Options_.UseSystemColumns,
  79. Options_.UseWorkerPool
  80. ));
  81. }
  82. IPullListWorkerFactoryPtr TProgramFactory::MakePullListWorkerFactory(
  83. const TInputSpecBase& inputSpec,
  84. const TOutputSpecBase& outputSpec,
  85. TString query,
  86. ETranslationMode mode,
  87. ui16 syntaxVersion
  88. ) {
  89. return std::make_shared<TPullListWorkerFactory>(TWorkerFactoryOptions(
  90. TIntrusivePtr<TProgramFactory>(this),
  91. inputSpec,
  92. outputSpec,
  93. query,
  94. FuncRegistry_,
  95. ModuleResolver_,
  96. UserData_,
  97. Modules_,
  98. Options_.LLVMSettings,
  99. BlockEngineMode_,
  100. ExprOutputStream_,
  101. CountersProvider_,
  102. mode,
  103. syntaxVersion,
  104. Options_.NativeYtTypeFlags,
  105. Options_.DeterministicTimeProviderSeed,
  106. Options_.UseSystemColumns,
  107. Options_.UseWorkerPool
  108. ));
  109. }
  110. IPushStreamWorkerFactoryPtr TProgramFactory::MakePushStreamWorkerFactory(
  111. const TInputSpecBase& inputSpec,
  112. const TOutputSpecBase& outputSpec,
  113. TString query,
  114. ETranslationMode mode,
  115. ui16 syntaxVersion
  116. ) {
  117. if (inputSpec.GetSchemas().size() > 1) {
  118. ythrow yexception() << "push stream mode doesn't support several inputs";
  119. }
  120. return std::make_shared<TPushStreamWorkerFactory>(TWorkerFactoryOptions(
  121. TIntrusivePtr<TProgramFactory>(this),
  122. inputSpec,
  123. outputSpec,
  124. query,
  125. FuncRegistry_,
  126. ModuleResolver_,
  127. UserData_,
  128. Modules_,
  129. Options_.LLVMSettings,
  130. BlockEngineMode_,
  131. ExprOutputStream_,
  132. CountersProvider_,
  133. mode,
  134. syntaxVersion,
  135. Options_.NativeYtTypeFlags,
  136. Options_.DeterministicTimeProviderSeed,
  137. Options_.UseSystemColumns,
  138. Options_.UseWorkerPool
  139. ));
  140. }