program_factory.cpp 4.7 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160
  1. #include "program_factory.h"
  2. #include "logger_init.h"
  3. #include "names.h"
  4. #include "worker_factory.h"
  5. #include <yql/essentials/utils/log/log.h>
  6. using namespace NYql;
  7. using namespace NYql::NPureCalc;
  8. TProgramFactory::TProgramFactory(const TProgramFactoryOptions& options)
  9. : Options_(options)
  10. , ExprOutputStream_(Options_.ExprOutputStream)
  11. , CountersProvider_(nullptr)
  12. {
  13. EnsureLoggingInitialized();
  14. if (!TryFromString(Options_.BlockEngineSettings, BlockEngineMode_)) {
  15. ythrow TCompileError("", "") << "Unknown BlockEngineSettings value: expected "
  16. << GetEnumAllNames<EBlockEngineMode>()
  17. << ", but got: "
  18. << Options_.BlockEngineSettings;
  19. }
  20. NUserData::TUserData::UserDataToLibraries(Options_.UserData_, Modules_);
  21. UserData_ = GetYqlModuleResolver(ExprContext_, ModuleResolver_, Options_.UserData_, {}, {});
  22. if (!ModuleResolver_) {
  23. auto issues = ExprContext_.IssueManager.GetIssues();
  24. CheckFatalIssues(issues);
  25. ythrow TCompileError("", issues.ToString()) << "failed to compile modules";
  26. }
  27. TVector<TString> UDFsPaths;
  28. for (const auto& item: Options_.UserData_) {
  29. if (
  30. item.Type_ == NUserData::EType::UDF &&
  31. item.Disposition_ == NUserData::EDisposition::FILESYSTEM
  32. ) {
  33. UDFsPaths.push_back(item.Content_);
  34. }
  35. }
  36. if (!Options_.UdfsDir_.empty()) {
  37. NKikimr::NMiniKQL::FindUdfsInDir(Options_.UdfsDir_, &UDFsPaths);
  38. }
  39. FuncRegistry_ = NKikimr::NMiniKQL::CreateFunctionRegistry(
  40. &NYql::NBacktrace::KikimrBackTrace, NKikimr::NMiniKQL::CreateBuiltinRegistry(), false, UDFsPaths)->Clone();
  41. NKikimr::NMiniKQL::FillStaticModules(*FuncRegistry_);
  42. }
  43. TProgramFactory::~TProgramFactory() {
  44. }
  45. void TProgramFactory::AddUdfModule(
  46. const TStringBuf& moduleName,
  47. NKikimr::NUdf::TUniquePtr<NKikimr::NUdf::IUdfModule>&& module
  48. ) {
  49. FuncRegistry_->AddModule(
  50. TString::Join(PurecalcUdfModulePrefix, moduleName), moduleName, std::move(module)
  51. );
  52. }
  53. void TProgramFactory::SetCountersProvider(NKikimr::NUdf::ICountersProvider* provider) {
  54. CountersProvider_ = provider;
  55. }
  56. IPullStreamWorkerFactoryPtr TProgramFactory::MakePullStreamWorkerFactory(
  57. const TInputSpecBase& inputSpec,
  58. const TOutputSpecBase& outputSpec,
  59. TString query,
  60. ETranslationMode mode,
  61. ui16 syntaxVersion
  62. ) {
  63. return std::make_shared<TPullStreamWorkerFactory>(TWorkerFactoryOptions(
  64. TIntrusivePtr<TProgramFactory>(this),
  65. inputSpec,
  66. outputSpec,
  67. query,
  68. FuncRegistry_,
  69. ModuleResolver_,
  70. UserData_,
  71. Modules_,
  72. Options_.LLVMSettings,
  73. BlockEngineMode_,
  74. ExprOutputStream_,
  75. CountersProvider_,
  76. mode,
  77. syntaxVersion,
  78. Options_.NativeYtTypeFlags,
  79. Options_.DeterministicTimeProviderSeed,
  80. Options_.UseSystemColumns,
  81. Options_.UseWorkerPool
  82. ));
  83. }
  84. IPullListWorkerFactoryPtr TProgramFactory::MakePullListWorkerFactory(
  85. const TInputSpecBase& inputSpec,
  86. const TOutputSpecBase& outputSpec,
  87. TString query,
  88. ETranslationMode mode,
  89. ui16 syntaxVersion
  90. ) {
  91. return std::make_shared<TPullListWorkerFactory>(TWorkerFactoryOptions(
  92. TIntrusivePtr<TProgramFactory>(this),
  93. inputSpec,
  94. outputSpec,
  95. query,
  96. FuncRegistry_,
  97. ModuleResolver_,
  98. UserData_,
  99. Modules_,
  100. Options_.LLVMSettings,
  101. BlockEngineMode_,
  102. ExprOutputStream_,
  103. CountersProvider_,
  104. mode,
  105. syntaxVersion,
  106. Options_.NativeYtTypeFlags,
  107. Options_.DeterministicTimeProviderSeed,
  108. Options_.UseSystemColumns,
  109. Options_.UseWorkerPool
  110. ));
  111. }
  112. IPushStreamWorkerFactoryPtr TProgramFactory::MakePushStreamWorkerFactory(
  113. const TInputSpecBase& inputSpec,
  114. const TOutputSpecBase& outputSpec,
  115. TString query,
  116. ETranslationMode mode,
  117. ui16 syntaxVersion
  118. ) {
  119. if (inputSpec.GetSchemas().size() > 1) {
  120. ythrow yexception() << "push stream mode doesn't support several inputs";
  121. }
  122. return std::make_shared<TPushStreamWorkerFactory>(TWorkerFactoryOptions(
  123. TIntrusivePtr<TProgramFactory>(this),
  124. inputSpec,
  125. outputSpec,
  126. query,
  127. FuncRegistry_,
  128. ModuleResolver_,
  129. UserData_,
  130. Modules_,
  131. Options_.LLVMSettings,
  132. BlockEngineMode_,
  133. ExprOutputStream_,
  134. CountersProvider_,
  135. mode,
  136. syntaxVersion,
  137. Options_.NativeYtTypeFlags,
  138. Options_.DeterministicTimeProviderSeed,
  139. Options_.UseSystemColumns,
  140. Options_.UseWorkerPool
  141. ));
  142. }