program_factory.cpp 4.7 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163
  1. #include "program_factory.h"
  2. #include "logger_init.h"
  3. #include "names.h"
  4. #include "worker_factory.h"
  5. #include <yql/essentials/utils/log/log.h>
  6. using namespace NYql;
  7. using namespace NYql::NPureCalc;
  8. TProgramFactory::TProgramFactory(const TProgramFactoryOptions& options)
  9. : Options_(options)
  10. , ExprOutputStream_(Options_.ExprOutputStream)
  11. , CountersProvider_(nullptr)
  12. {
  13. EnsureLoggingInitialized();
  14. if (!TryFromString(Options_.BlockEngineSettings, BlockEngineMode_)) {
  15. ythrow TCompileError("", "") << "Unknown BlockEngineSettings value: expected "
  16. << GetEnumAllNames<EBlockEngineMode>()
  17. << ", but got: "
  18. << Options_.BlockEngineSettings;
  19. }
  20. NUserData::TUserData::UserDataToLibraries(Options_.UserData_, Modules_);
  21. UserData_ = GetYqlModuleResolver(ExprContext_, ModuleResolver_, Options_.UserData_, {}, {});
  22. if (!ModuleResolver_) {
  23. auto issues = ExprContext_.IssueManager.GetIssues();
  24. CheckFatalIssues(issues);
  25. ythrow TCompileError("", issues.ToString()) << "failed to compile modules";
  26. }
  27. TVector<TString> UDFsPaths;
  28. for (const auto& item: Options_.UserData_) {
  29. if (
  30. item.Type_ == NUserData::EType::UDF &&
  31. item.Disposition_ == NUserData::EDisposition::FILESYSTEM
  32. ) {
  33. UDFsPaths.push_back(item.Content_);
  34. }
  35. }
  36. if (!Options_.UdfsDir_.empty()) {
  37. NKikimr::NMiniKQL::FindUdfsInDir(Options_.UdfsDir_, &UDFsPaths);
  38. }
  39. FuncRegistry_ = NKikimr::NMiniKQL::CreateFunctionRegistry(
  40. &NYql::NBacktrace::KikimrBackTrace, NKikimr::NMiniKQL::CreateBuiltinRegistry(), false, UDFsPaths)->Clone();
  41. NKikimr::NMiniKQL::FillStaticModules(*FuncRegistry_);
  42. }
  43. TProgramFactory::~TProgramFactory() {
  44. }
  45. void TProgramFactory::AddUdfModule(
  46. const TStringBuf& moduleName,
  47. NKikimr::NUdf::TUniquePtr<NKikimr::NUdf::IUdfModule>&& module
  48. ) {
  49. FuncRegistry_->AddModule(
  50. TString::Join(PurecalcUdfModulePrefix, moduleName), moduleName, std::move(module)
  51. );
  52. }
  53. void TProgramFactory::SetCountersProvider(NKikimr::NUdf::ICountersProvider* provider) {
  54. CountersProvider_ = provider;
  55. }
  56. IPullStreamWorkerFactoryPtr TProgramFactory::MakePullStreamWorkerFactory(
  57. const TInputSpecBase& inputSpec,
  58. const TOutputSpecBase& outputSpec,
  59. TString query,
  60. ETranslationMode mode,
  61. ui16 syntaxVersion
  62. ) {
  63. return std::make_shared<TPullStreamWorkerFactory>(TWorkerFactoryOptions(
  64. TIntrusivePtr<TProgramFactory>(this),
  65. inputSpec,
  66. outputSpec,
  67. query,
  68. FuncRegistry_,
  69. ModuleResolver_,
  70. UserData_,
  71. Modules_,
  72. Options_.LLVMSettings,
  73. BlockEngineMode_,
  74. ExprOutputStream_,
  75. CountersProvider_,
  76. mode,
  77. syntaxVersion,
  78. Options_.NativeYtTypeFlags,
  79. Options_.DeterministicTimeProviderSeed,
  80. Options_.UseSystemColumns,
  81. Options_.UseWorkerPool,
  82. Options_.UseAntlr4
  83. ));
  84. }
  85. IPullListWorkerFactoryPtr TProgramFactory::MakePullListWorkerFactory(
  86. const TInputSpecBase& inputSpec,
  87. const TOutputSpecBase& outputSpec,
  88. TString query,
  89. ETranslationMode mode,
  90. ui16 syntaxVersion
  91. ) {
  92. return std::make_shared<TPullListWorkerFactory>(TWorkerFactoryOptions(
  93. TIntrusivePtr<TProgramFactory>(this),
  94. inputSpec,
  95. outputSpec,
  96. query,
  97. FuncRegistry_,
  98. ModuleResolver_,
  99. UserData_,
  100. Modules_,
  101. Options_.LLVMSettings,
  102. BlockEngineMode_,
  103. ExprOutputStream_,
  104. CountersProvider_,
  105. mode,
  106. syntaxVersion,
  107. Options_.NativeYtTypeFlags,
  108. Options_.DeterministicTimeProviderSeed,
  109. Options_.UseSystemColumns,
  110. Options_.UseWorkerPool,
  111. Options_.UseAntlr4
  112. ));
  113. }
  114. IPushStreamWorkerFactoryPtr TProgramFactory::MakePushStreamWorkerFactory(
  115. const TInputSpecBase& inputSpec,
  116. const TOutputSpecBase& outputSpec,
  117. TString query,
  118. ETranslationMode mode,
  119. ui16 syntaxVersion
  120. ) {
  121. if (inputSpec.GetSchemas().size() > 1) {
  122. ythrow yexception() << "push stream mode doesn't support several inputs";
  123. }
  124. return std::make_shared<TPushStreamWorkerFactory>(TWorkerFactoryOptions(
  125. TIntrusivePtr<TProgramFactory>(this),
  126. inputSpec,
  127. outputSpec,
  128. query,
  129. FuncRegistry_,
  130. ModuleResolver_,
  131. UserData_,
  132. Modules_,
  133. Options_.LLVMSettings,
  134. BlockEngineMode_,
  135. ExprOutputStream_,
  136. CountersProvider_,
  137. mode,
  138. syntaxVersion,
  139. Options_.NativeYtTypeFlags,
  140. Options_.DeterministicTimeProviderSeed,
  141. Options_.UseSystemColumns,
  142. Options_.UseWorkerPool,
  143. Options_.UseAntlr4
  144. ));
  145. }