interface.h 50 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737738739740741742743744745746747748749750751752753754755756757758759760761762763764765766767768769770771772773774775776777778779780781782783784785786787788789790791792793794795796797798799800801802803804805806807808809810811812813814815816817818819820821822823824825826827828829830831832833834835836837838839840841842843844845846847848849850851852853854855856857858859860861862863864865866867868869870871872873874875876877878879880881882883884885886887888889890891892893894895896897898899900901902903904905906907908909910911912913914915916917918919920921922923924925926927928929930931932933934935936937938939940941942943944945946947948949950951952953954955956957958959960961962963964965966967968969970971972973974975976977978979980981982983984985986987988989990991992993994995996997998999100010011002100310041005100610071008100910101011101210131014101510161017101810191020102110221023102410251026102710281029103010311032103310341035103610371038103910401041104210431044104510461047104810491050105110521053105410551056105710581059106010611062106310641065106610671068106910701071107210731074107510761077107810791080108110821083108410851086108710881089109010911092109310941095109610971098109911001101110211031104110511061107110811091110111111121113111411151116111711181119112011211122112311241125112611271128112911301131113211331134113511361137113811391140114111421143114411451146114711481149115011511152115311541155115611571158115911601161116211631164116511661167116811691170117111721173117411751176117711781179118011811182118311841185118611871188
  1. #pragma once
  2. #include "fwd.h"
  3. #include "wrappers.h"
  4. #include <yql/essentials/core/user_data/yql_user_data.h>
  5. #include <yql/essentials/public/udf/udf_value.h>
  6. #include <yql/essentials/public/udf/udf_counter.h>
  7. #include <yql/essentials/public/udf/udf_registrator.h>
  8. #include <yql/essentials/public/issue/yql_issue.h>
  9. #include <library/cpp/yson/node/node.h>
  10. #include <library/cpp/logger/priority.h>
  11. #include <util/generic/ptr.h>
  12. #include <util/generic/maybe.h>
  13. #include <util/generic/hash_set.h>
  14. #include <util/generic/string.h>
  15. #include <util/stream/output.h>
  16. class ITimeProvider;
  17. namespace NKikimr {
  18. namespace NMiniKQL {
  19. class TScopedAlloc;
  20. class IComputationGraph;
  21. class IFunctionRegistry;
  22. class TTypeEnvironment;
  23. class TType;
  24. class TStructType;
  25. }
  26. }
  27. namespace NYql {
  28. namespace NPureCalc {
  29. /**
  30. * SQL or s-expression translation error.
  31. */
  32. class TCompileError: public yexception {
  33. private:
  34. TString Yql_;
  35. TString Issues_;
  36. public:
  37. // TODO: maybe accept an actual list of issues here?
  38. // See https://a.yandex-team.ru/arc/review/439403/details#comment-778237
  39. TCompileError(TString yql, TString issues)
  40. : Yql_(std::move(yql))
  41. , Issues_(std::move(issues))
  42. {
  43. }
  44. public:
  45. /**
  46. * Get the sql query which caused the error (if there is one available).
  47. */
  48. const TString& GetYql() const {
  49. return Yql_;
  50. }
  51. /**
  52. * Get detailed description for all errors and warnings that happened during sql translation.
  53. */
  54. const TString& GetIssues() const {
  55. return Issues_;
  56. }
  57. };
  58. ////////////////////////////////////////////////////////////////////////////////////////////////////
  59. /**
  60. * A generic input stream of objects.
  61. */
  62. template <typename T>
  63. class IStream {
  64. public:
  65. virtual ~IStream() = default;
  66. public:
  67. /**
  68. * Pops and returns a next value in the stream. If the stream is finished, should return some sentinel object.
  69. *
  70. * Depending on return type, this function may not transfer object ownership to a user.
  71. * Thus, the stream may manage the returned object * itself.
  72. * That is, the returned object's lifetime may be bound to the input stream lifetime; it may be destroyed
  73. * upon calling Fetch() or upon destroying the stream, whichever happens first.
  74. */
  75. virtual T Fetch() = 0;
  76. };
  77. /**
  78. * Create a new stream which applies the given functor to the elements of the original stream.
  79. */
  80. template <typename TOld, typename TNew, typename TFunctor>
  81. inline THolder<IStream<TNew>> MapStream(THolder<IStream<TOld>> stream, TFunctor functor) {
  82. return THolder(new NPrivate::TMappingStream<TNew, TOld, TFunctor>(std::move(stream), std::move(functor)));
  83. };
  84. /**
  85. * Convert stream of objects into a stream of potentially incompatible objects.
  86. *
  87. * This conversion applies static cast to the output of the original stream. Use with caution!
  88. */
  89. /// @{
  90. template <
  91. typename TNew, typename TOld,
  92. std::enable_if_t<!std::is_same<TNew, TOld>::value>* = nullptr>
  93. inline THolder<IStream<TNew>> ConvertStreamUnsafe(THolder<IStream<TOld>> stream) {
  94. return MapStream<TOld, TNew>(std::move(stream), [](TOld x) -> TNew { return static_cast<TNew>(x); });
  95. }
  96. template <typename T>
  97. inline THolder<IStream<T>> ConvertStreamUnsafe(THolder<IStream<T>> stream) {
  98. return stream;
  99. }
  100. /// @}
  101. /**
  102. * Convert stream of objects into a stream of compatible objects.
  103. *
  104. * Note: each conversion adds one level of indirection so avoid them if possible.
  105. */
  106. template <typename TNew, typename TOld, std::enable_if_t<std::is_convertible<TOld, TNew>::value>* = nullptr>
  107. inline THolder<IStream<TNew>> ConvertStream(THolder<IStream<TOld>> stream) {
  108. return ConvertStreamUnsafe<TNew, TOld>(std::move(stream));
  109. }
  110. ////////////////////////////////////////////////////////////////////////////////////////////////////
  111. /**
  112. * A generic push consumer.
  113. */
  114. template <typename T>
  115. class IConsumer {
  116. public:
  117. virtual ~IConsumer() = default;
  118. public:
  119. /**
  120. * Feed an object to consumer.
  121. *
  122. * Depending on argument type, the consumer may not take ownership of the passed object;
  123. * in that case it is the caller responsibility to manage the object lifetime after passing it to this method.
  124. *
  125. * The passed object can be destroyed after the consumer returns from this function; the consumer should
  126. * not store pointer to the passed object or the passed object itself without taking all necessary precautions
  127. * to ensure that the pointer or the object stays valid after returning.
  128. */
  129. virtual void OnObject(T) = 0;
  130. /**
  131. * Close the consumer and run finalization logic. Calling OnObject after calling this function is an error.
  132. */
  133. virtual void OnFinish() = 0;
  134. };
  135. /**
  136. * Create a new consumer which applies the given functor to objects before .
  137. */
  138. template <typename TOld, typename TNew, typename TFunctor>
  139. inline THolder<IConsumer<TNew>> MapConsumer(THolder<IConsumer<TOld>> stream, TFunctor functor) {
  140. return THolder(new NPrivate::TMappingConsumer<TNew, TOld, TFunctor>(std::move(stream), std::move(functor)));
  141. };
  142. /**
  143. * Convert consumer of objects into a consumer of potentially incompatible objects.
  144. *
  145. * This conversion applies static cast to the input value. Use with caution.
  146. */
  147. /// @{
  148. template <
  149. typename TNew, typename TOld,
  150. std::enable_if_t<!std::is_same<TNew, TOld>::value>* = nullptr>
  151. inline THolder<IConsumer<TNew>> ConvertConsumerUnsafe(THolder<IConsumer<TOld>> consumer) {
  152. return MapConsumer<TOld, TNew>(std::move(consumer), [](TNew x) -> TOld { return static_cast<TOld>(x); });
  153. }
  154. template <typename T>
  155. inline THolder<IConsumer<T>> ConvertConsumerUnsafe(THolder<IConsumer<T>> consumer) {
  156. return consumer;
  157. }
  158. /// @}
  159. /**
  160. * Convert consumer of objects into a consumer of compatible objects.
  161. *
  162. * Note: each conversion adds one level of indirection so avoid them if possible.
  163. */
  164. template <typename TNew, typename TOld, std::enable_if_t<std::is_convertible<TNew, TOld>::value>* = nullptr>
  165. inline THolder<IConsumer<TNew>> ConvertConsumer(THolder<IConsumer<TOld>> consumer) {
  166. return ConvertConsumerUnsafe<TNew, TOld>(std::move(consumer));
  167. }
  168. /**
  169. * Create a consumer which holds a non-owning pointer to the given consumer
  170. * and passes all messages to the latter.
  171. */
  172. template <typename T, typename C>
  173. THolder<NPrivate::TNonOwningConsumer<T, C>> MakeNonOwningConsumer(C consumer) {
  174. return MakeHolder<NPrivate::TNonOwningConsumer<T, C>>(consumer);
  175. }
  176. ////////////////////////////////////////////////////////////////////////////////////////////////////
  177. /**
  178. * Logging options.
  179. */
  180. struct TLoggingOptions final {
  181. public:
  182. /// Logging level for messages generated during compilation.
  183. ELogPriority LogLevel_; // TODO: rename to LogLevel
  184. /// Where to write log messages.
  185. IOutputStream* LogDestination;
  186. public:
  187. TLoggingOptions();
  188. /**
  189. * Set a new logging level.
  190. *
  191. * @return reference to self, to allow method chaining.
  192. */
  193. TLoggingOptions& SetLogLevel(ELogPriority);
  194. /**
  195. * Set a new logging destination.
  196. *
  197. * @return reference to self, to allow method chaining.
  198. */
  199. TLoggingOptions& SetLogDestination(IOutputStream*);
  200. };
  201. /**
  202. * General options for program factory.
  203. */
  204. struct TProgramFactoryOptions final {
  205. public:
  206. /// Path to a directory with compiled UDFs. Leave empty to disable loading external UDFs.
  207. TString UdfsDir_; // TODO: rename to UDFDir
  208. /// List of available external resources, e.g. files, UDFs, libraries.
  209. TVector<NUserData::TUserData> UserData_; // TODO: rename to UserData
  210. /// LLVM settings. Assign "OFF" to disable LLVM, empty string for default settings.
  211. TString LLVMSettings;
  212. /// Block engine settings. Assign "force" to unconditionally enable
  213. /// it, "disable" for turn it off and "auto" to left the final
  214. /// decision to the platform heuristics.
  215. TString BlockEngineSettings;
  216. /// Output stream to dump the compiled and optimized expressions.
  217. IOutputStream* ExprOutputStream;
  218. /// Provider for generic counters which can be used to export statistics from UDFs.
  219. NKikimr::NUdf::ICountersProvider* CountersProvider;
  220. /// YT Type V3 flags for Skiff/Yson serialization.
  221. ui64 NativeYtTypeFlags;
  222. /// Seed for deterministic time provider
  223. TMaybe<ui64> DeterministicTimeProviderSeed;
  224. /// Use special system columns to support tables naming (supports non empty ``TablePath()``/``TableName()``)
  225. bool UseSystemColumns;
  226. /// Reuse allocated workers
  227. bool UseWorkerPool;
  228. /// Use Antlr4 parser (for migration)
  229. bool UseAntlr4;
  230. public:
  231. TProgramFactoryOptions();
  232. public:
  233. /**
  234. * Set a new path to a directory with UDFs.
  235. *
  236. * @return reference to self, to allow method chaining.
  237. */
  238. TProgramFactoryOptions& SetUDFsDir(TStringBuf);
  239. /**
  240. * Add a new library to the UserData list.
  241. *
  242. * @param disposition where the resource resides, e.g. on filesystem, in memory, etc.
  243. * NB: URL disposition is not supported.
  244. * @param name name of the resource.
  245. * @param content depending on disposition, either path to the resource or its content.
  246. * @return reference to self, to allow method chaining.
  247. */
  248. TProgramFactoryOptions& AddLibrary(NUserData::EDisposition disposition, TStringBuf name, TStringBuf content);
  249. /**
  250. * Add a new file to the UserData list.
  251. *
  252. * @param disposition where the resource resides, e.g. on filesystem, in memory, etc.
  253. * NB: URL disposition is not supported.
  254. * @param name name of the resource.
  255. * @param content depending on disposition, either path to the resource or its content.
  256. * @return reference to self, to allow method chaining.
  257. */
  258. TProgramFactoryOptions& AddFile(NUserData::EDisposition disposition, TStringBuf name, TStringBuf content);
  259. /**
  260. * Add a new UDF to the UserData list.
  261. *
  262. * @param disposition where the resource resides, e.g. on filesystem, in memory, etc.
  263. * NB: URL disposition is not supported.
  264. * @param name name of the resource.
  265. * @param content depending on disposition, either path to the resource or its content.
  266. * @return reference to self, to allow method chaining.
  267. */
  268. TProgramFactoryOptions& AddUDF(NUserData::EDisposition disposition, TStringBuf name, TStringBuf content);
  269. /**
  270. * Set new LLVM settings.
  271. *
  272. * @return reference to self, to allow method chaining.
  273. */
  274. TProgramFactoryOptions& SetLLVMSettings(TStringBuf llvm_settings);
  275. /**
  276. * Set new block engine settings.
  277. *
  278. * @return reference to self, to allow method chaining.
  279. */
  280. TProgramFactoryOptions& SetBlockEngineSettings(TStringBuf blockEngineSettings);
  281. /**
  282. * Set the stream to dump the compiled and optimized expressions.
  283. *
  284. * @return reference to self, to allow method chaining.
  285. */
  286. TProgramFactoryOptions& SetExprOutputStream(IOutputStream* exprOutputStream);
  287. /**
  288. * Set new counters provider. Passed pointer should stay alive for as long as the processor factory
  289. * stays alive.
  290. *
  291. * @return reference to self, to allow method chaining.
  292. */
  293. TProgramFactoryOptions& SetCountersProvider(NKikimr::NUdf::ICountersProvider* countersProvider);
  294. /**
  295. * Set new YT Type V3 mode. Deprecated method. Use SetNativeYtTypeFlags instead
  296. *
  297. * @return reference to self, to allow method chaining.
  298. */
  299. TProgramFactoryOptions& SetUseNativeYtTypes(bool useNativeTypes);
  300. /**
  301. * Set YT Type V3 flags.
  302. *
  303. * @return reference to self, to allow method chaining.
  304. */
  305. TProgramFactoryOptions& SetNativeYtTypeFlags(ui64 nativeTypeFlags);
  306. /**
  307. * Set seed for deterministic time provider.
  308. *
  309. * @return reference to self, to allow method chaining.
  310. */
  311. TProgramFactoryOptions& SetDeterministicTimeProviderSeed(TMaybe<ui64> seed);
  312. /**
  313. * Set new flag whether to allow using system columns or not.
  314. *
  315. * @return reference to self, to allow method chaining.
  316. */
  317. TProgramFactoryOptions& SetUseSystemColumns(bool useSystemColumns);
  318. /**
  319. * Set new flag whether to allow reusing workers or not.
  320. *
  321. * @return reference to self, to allow method chaining.
  322. */
  323. TProgramFactoryOptions& SetUseWorkerPool(bool useWorkerPool);
  324. };
  325. ////////////////////////////////////////////////////////////////////////////////////////////////////
  326. /**
  327. * What exactly are we parsing: SQL or an s-expression.
  328. */
  329. enum class ETranslationMode {
  330. SQL /* "SQL" */,
  331. SExpr /* "s-expression" */,
  332. Mkql /* "mkql" */,
  333. PG /* PostgreSQL */
  334. };
  335. /**
  336. * A facility for compiling sql and s-expressions and making programs from them.
  337. */
  338. class IProgramFactory: public TThrRefBase {
  339. protected:
  340. virtual IPullStreamWorkerFactoryPtr MakePullStreamWorkerFactory(const TInputSpecBase&, const TOutputSpecBase&, TString, ETranslationMode, ui16) = 0;
  341. virtual IPullListWorkerFactoryPtr MakePullListWorkerFactory(const TInputSpecBase&, const TOutputSpecBase&, TString, ETranslationMode, ui16) = 0;
  342. virtual IPushStreamWorkerFactoryPtr MakePushStreamWorkerFactory(const TInputSpecBase&, const TOutputSpecBase&, TString, ETranslationMode, ui16) = 0;
  343. public:
  344. /**
  345. * Add new udf module. It's not specified whether adding new modules will affect existing programs
  346. * (theoretical answer is 'no').
  347. */
  348. virtual void AddUdfModule(const TStringBuf&, NKikimr::NUdf::TUniquePtr<NKikimr::NUdf::IUdfModule>&&) = 0;
  349. // TODO: support setting udf modules via factory options.
  350. /**
  351. * Set new counters provider, override one that was specified via factory options. Note that existing
  352. * programs will still reference the previous provider.
  353. */
  354. virtual void SetCountersProvider(NKikimr::NUdf::ICountersProvider*) = 0;
  355. // TODO: support setting providers via factory options.
  356. template <typename TInputSpec, typename TOutputSpec>
  357. THolder<TPullStreamProgram<TInputSpec, TOutputSpec>> MakePullStreamProgram(
  358. TInputSpec inputSpec, TOutputSpec outputSpec, TString query, ETranslationMode mode = ETranslationMode::SQL, ui16 syntaxVersion = 1
  359. ) {
  360. auto workerFactory = MakePullStreamWorkerFactory(inputSpec, outputSpec, std::move(query), mode, syntaxVersion);
  361. return MakeHolder<TPullStreamProgram<TInputSpec, TOutputSpec>>(std::move(inputSpec), std::move(outputSpec), workerFactory);
  362. }
  363. template <typename TInputSpec, typename TOutputSpec>
  364. THolder<TPullListProgram<TInputSpec, TOutputSpec>> MakePullListProgram(
  365. TInputSpec inputSpec, TOutputSpec outputSpec, TString query, ETranslationMode mode = ETranslationMode::SQL, ui16 syntaxVersion = 1
  366. ) {
  367. auto workerFactory = MakePullListWorkerFactory(inputSpec, outputSpec, std::move(query), mode, syntaxVersion);
  368. return MakeHolder<TPullListProgram<TInputSpec, TOutputSpec>>(std::move(inputSpec), std::move(outputSpec), workerFactory);
  369. }
  370. template <typename TInputSpec, typename TOutputSpec>
  371. THolder<TPushStreamProgram<TInputSpec, TOutputSpec>> MakePushStreamProgram(
  372. TInputSpec inputSpec, TOutputSpec outputSpec, TString query, ETranslationMode mode = ETranslationMode::SQL, ui16 syntaxVersion = 1
  373. ) {
  374. auto workerFactory = MakePushStreamWorkerFactory(inputSpec, outputSpec, std::move(query), mode, syntaxVersion);
  375. return MakeHolder<TPushStreamProgram<TInputSpec, TOutputSpec>>(std::move(inputSpec), std::move(outputSpec), workerFactory);
  376. }
  377. };
  378. ////////////////////////////////////////////////////////////////////////////////////////////////////
  379. /**
  380. * A facility for creating workers. Despite being a part of a public API, worker factory is not used directly.
  381. */
  382. class IWorkerFactory: public std::enable_shared_from_this<IWorkerFactory> {
  383. public:
  384. virtual ~IWorkerFactory() = default;
  385. /**
  386. * Get input column names for specified input that are actually used in the query.
  387. */
  388. virtual const THashSet<TString>& GetUsedColumns(ui32) const = 0;
  389. /**
  390. * Overload for single-input programs.
  391. */
  392. virtual const THashSet<TString>& GetUsedColumns() const = 0;
  393. /**
  394. * Make input type schema for specified input as deduced by program optimizer. This schema is equivalent
  395. * to one provided by input spec up to the order of the fields in structures.
  396. */
  397. virtual NYT::TNode MakeInputSchema(ui32) const = 0;
  398. /**
  399. * Overload for single-input programs.
  400. */
  401. virtual NYT::TNode MakeInputSchema() const = 0;
  402. /**
  403. * Make output type schema as deduced by program optimizer. If output spec provides its own schema, than
  404. * this schema is equivalent to one provided by output spec up to the order of the fields in structures.
  405. */
  406. /// @{
  407. /**
  408. * Overload for single-table output programs (i.e. output type is struct).
  409. */
  410. virtual NYT::TNode MakeOutputSchema() const = 0;
  411. /**
  412. * Overload for multi-table output programs (i.e. output type is variant over tuple).
  413. */
  414. virtual NYT::TNode MakeOutputSchema(ui32) const = 0;
  415. /**
  416. * Overload for multi-table output programs (i.e. output type is variant over struct).
  417. */
  418. virtual NYT::TNode MakeOutputSchema(TStringBuf) const = 0;
  419. /// @}
  420. /**
  421. * Make full output schema. For single-output programs returns struct type, for multi-output programs
  422. * returns variant type.
  423. *
  424. * Warning: calling this function may result in extended memory usage for large number of output tables.
  425. */
  426. virtual NYT::TNode MakeFullOutputSchema() const = 0;
  427. /**
  428. * Get compilation issues
  429. */
  430. virtual TIssues GetIssues() const = 0;
  431. /**
  432. * Get precompiled mkql program
  433. */
  434. virtual TString GetCompiledProgram() = 0;
  435. /**
  436. * Return a worker to the factory for possible reuse
  437. */
  438. virtual void ReturnWorker(IWorker* worker) = 0;
  439. };
  440. class TReleaseWorker {
  441. public:
  442. template <class T>
  443. static inline void Destroy(T* t) noexcept {
  444. t->Release();
  445. }
  446. };
  447. template <class T>
  448. using TWorkerHolder = THolder<T, TReleaseWorker>;
  449. /**
  450. * Factory for generating pull stream workers.
  451. */
  452. class IPullStreamWorkerFactory: public IWorkerFactory {
  453. public:
  454. /**
  455. * Create a new pull stream worker.
  456. */
  457. virtual TWorkerHolder<IPullStreamWorker> MakeWorker() = 0;
  458. };
  459. /**
  460. * Factory for generating pull list workers.
  461. */
  462. class IPullListWorkerFactory: public IWorkerFactory {
  463. public:
  464. /**
  465. * Create a new pull list worker.
  466. */
  467. virtual TWorkerHolder<IPullListWorker> MakeWorker() = 0;
  468. };
  469. /**
  470. * Factory for generating push stream workers.
  471. */
  472. class IPushStreamWorkerFactory: public IWorkerFactory {
  473. public:
  474. /**
  475. * Create a new push stream worker.
  476. */
  477. virtual TWorkerHolder<IPushStreamWorker> MakeWorker() = 0;
  478. };
  479. ////////////////////////////////////////////////////////////////////////////////////////////////////
  480. /**
  481. * Worker is a central part of any program instance. It contains current computation state
  482. * (called computation graph) and objects required to work with it, including an allocator for unboxed values.
  483. *
  484. * Usually, users do not interact with workers directly. They use program instance entry points such as streams
  485. * and consumers instead. The only case when one would have to to interact with workers is when implementing
  486. * custom io-specification.
  487. */
  488. class IWorker {
  489. protected:
  490. friend class TReleaseWorker;
  491. /**
  492. * Cleanup the worker and return to a worker factory for reuse
  493. */
  494. virtual void Release() = 0;
  495. public:
  496. virtual ~IWorker() = default;
  497. public:
  498. /**
  499. * Number of inputs for this program.
  500. */
  501. virtual ui32 GetInputsCount() const = 0;
  502. /**
  503. * MiniKQL input struct type of specified input for this program. Type is equivalent to the deduced input
  504. * schema (see IWorker::MakeInputSchema())
  505. *
  506. * If ``original`` is set to ``true``, returns type without virtual system columns.
  507. */
  508. virtual const NKikimr::NMiniKQL::TStructType* GetInputType(ui32, bool original = false) const = 0;
  509. /**
  510. * Overload for single-input programs.
  511. */
  512. virtual const NKikimr::NMiniKQL::TStructType* GetInputType(bool original = false) const = 0;
  513. /**
  514. * MiniKQL input struct type of the specified input for this program.
  515. * The returned type is the actual type of the specified input node.
  516. */
  517. virtual const NKikimr::NMiniKQL::TStructType* GetRawInputType(ui32) const = 0;
  518. /**
  519. * Overload for single-input programs.
  520. */
  521. virtual const NKikimr::NMiniKQL::TStructType* GetRawInputType() const = 0;
  522. /**
  523. * MiniKQL output struct type for this program. The returned type is equivalent to the deduced output
  524. * schema (see IWorker::MakeFullOutputSchema()).
  525. */
  526. virtual const NKikimr::NMiniKQL::TType* GetOutputType() const = 0;
  527. /**
  528. * MiniKQL output struct type for this program. The returned type is
  529. * the actual type of the root node.
  530. */
  531. virtual const NKikimr::NMiniKQL::TType* GetRawOutputType() const = 0;
  532. /**
  533. * Make input type schema for specified input as deduced by program optimizer. This schema is equivalent
  534. * to one provided by input spec up to the order of the fields in structures.
  535. */
  536. virtual NYT::TNode MakeInputSchema(ui32) const = 0;
  537. /**
  538. * Overload for single-input programs.
  539. */
  540. virtual NYT::TNode MakeInputSchema() const = 0;
  541. /**
  542. * Make output type schema as deduced by program optimizer. If output spec provides its own schema, than
  543. * this schema is equivalent to one provided by output spec up to the order of the fields in structures.
  544. */
  545. /// @{
  546. /**
  547. * Overload for single-table output programs (i.e. output type is struct).
  548. */
  549. virtual NYT::TNode MakeOutputSchema() const = 0;
  550. /**
  551. * Overload for multi-table output programs (i.e. output type is variant over tuple).
  552. */
  553. virtual NYT::TNode MakeOutputSchema(ui32) const = 0;
  554. /**
  555. * Overload for multi-table output programs (i.e. output type is variant over struct).
  556. */
  557. virtual NYT::TNode MakeOutputSchema(TStringBuf) const = 0;
  558. /// @}
  559. /**
  560. * Generates full output schema. For single-output programs returns struct type, for multi-output programs
  561. * returns variant type.
  562. *
  563. * Warning: calling this function may result in extended memory usage for large number of output tables.
  564. */
  565. virtual NYT::TNode MakeFullOutputSchema() const = 0;
  566. /**
  567. * Get scoped alloc used in this worker.
  568. */
  569. virtual NKikimr::NMiniKQL::TScopedAlloc& GetScopedAlloc() = 0;
  570. /**
  571. * Get computation graph.
  572. */
  573. virtual NKikimr::NMiniKQL::IComputationGraph& GetGraph() = 0;
  574. /**
  575. * Get function registry for this worker.
  576. */
  577. virtual const NKikimr::NMiniKQL::IFunctionRegistry& GetFunctionRegistry() const = 0;
  578. /**
  579. * Get type environment for this worker.
  580. */
  581. virtual NKikimr::NMiniKQL::TTypeEnvironment& GetTypeEnvironment() = 0;
  582. /**
  583. * Get llvm settings for this worker.
  584. */
  585. virtual const TString& GetLLVMSettings() const = 0;
  586. /**
  587. * Get YT Type V3 flags
  588. */
  589. virtual ui64 GetNativeYtTypeFlags() const = 0;
  590. /**
  591. * Get time provider
  592. */
  593. virtual ITimeProvider* GetTimeProvider() const = 0;
  594. /**
  595. * Release all input data from worker state
  596. */
  597. virtual void Invalidate() = 0;
  598. };
  599. /**
  600. * Worker which operates in pull stream mode.
  601. */
  602. class IPullStreamWorker: public IWorker {
  603. public:
  604. /**
  605. * Set input computation graph node for specified input. The passed unboxed value should be a stream of
  606. * structs. It should be created via the allocator associated with this very worker.
  607. * This function can only be called once for each input.
  608. */
  609. virtual void SetInput(NKikimr::NUdf::TUnboxedValue&&, ui32) = 0;
  610. /**
  611. * Get the output computation graph node. The returned node will be a stream of structs or variants.
  612. * This function cannot be called before setting an input value.
  613. */
  614. virtual NKikimr::NUdf::TUnboxedValue& GetOutput() = 0;
  615. };
  616. /**
  617. * Worker which operates in pull list mode.
  618. */
  619. class IPullListWorker: public IWorker {
  620. public:
  621. /**
  622. * Set input computation graph node for specified input. The passed unboxed value should be a list of
  623. * structs. It should be created via the allocator associated with this very worker.
  624. * This function can only be called once for each index.
  625. */
  626. virtual void SetInput(NKikimr::NUdf::TUnboxedValue&&, ui32) = 0;
  627. /**
  628. * Get the output computation graph node. The returned node will be a list of structs or variants.
  629. * This function cannot be called before setting an input value.
  630. */
  631. virtual NKikimr::NUdf::TUnboxedValue& GetOutput() = 0;
  632. /**
  633. * Get iterator over the output list.
  634. */
  635. virtual NKikimr::NUdf::TUnboxedValue& GetOutputIterator() = 0;
  636. /**
  637. * Reset iterator to the beginning of the output list. After calling this function, GetOutputIterator()
  638. * will return a fresh iterator; all previously returned iterators will become invalid.
  639. */
  640. virtual void ResetOutputIterator() = 0;
  641. };
  642. /**
  643. * Worker which operates in push stream mode.
  644. */
  645. class IPushStreamWorker: public IWorker {
  646. public:
  647. /**
  648. * Set a consumer where the worker will relay its output. This function can only be called once.
  649. */
  650. virtual void SetConsumer(THolder<IConsumer<const NKikimr::NUdf::TUnboxedValue*>>) = 0;
  651. /**
  652. * Push new value to the graph, than feed all new output to the consumer. Values cannot be pushed before
  653. * assigning a consumer.
  654. */
  655. virtual void Push(NKikimr::NUdf::TUnboxedValue&&) = 0;
  656. /**
  657. * Send finish event and clear the computation graph. No new values will be accepted.
  658. */
  659. virtual void OnFinish() = 0;
  660. };
  661. ////////////////////////////////////////////////////////////////////////////////////////////////////
  662. /**
  663. * Input specifications describe format for program input. They carry information about input data schema
  664. * as well as the knowledge about how to convert input structures into unboxed values (data format which can be
  665. * processed by the YQL runtime).
  666. *
  667. * Input spec defines the arguments of the program's Apply method. For example, a program
  668. * with the protobuf input spec will accept a stream of protobuf messages while a program with the
  669. * yson spec will accept an input stream (binary or text one).
  670. *
  671. * See documentation for input and output spec traits for hints on how to implement a custom specs.
  672. */
  673. class TInputSpecBase {
  674. protected:
  675. mutable TVector<THashMap<TString, NYT::TNode>> AllVirtualColumns_;
  676. public:
  677. virtual ~TInputSpecBase() = default;
  678. public:
  679. /**
  680. * Get input data schemas in YQL format (NB: not a YT format). Each item of the returned vector must
  681. * describe a structure.
  682. *
  683. * Format of each item is approximately this one:
  684. *
  685. * @code
  686. * [
  687. * 'StructType',
  688. * [
  689. * ["Field1Name", ["DataType", "Int32"]],
  690. * ["Field2Name", ["DataType", "String"]],
  691. * ...
  692. * ]
  693. * ]
  694. * @endcode
  695. */
  696. virtual const TVector<NYT::TNode>& GetSchemas() const = 0;
  697. // TODO: make a neat schema builder
  698. /**
  699. * Get virtual columns for each input.
  700. *
  701. * Key of each mapping is column name, value is data schema in YQL format.
  702. */
  703. const TVector<THashMap<TString, NYT::TNode>>& GetAllVirtualColumns() const {
  704. if (AllVirtualColumns_.empty()) {
  705. AllVirtualColumns_ = TVector<THashMap<TString, NYT::TNode>>(GetSchemas().size());
  706. }
  707. return AllVirtualColumns_;
  708. }
  709. virtual bool ProvidesBlocks() const { return false; }
  710. };
  711. /**
  712. * Output specifications describe format for program output. Like input specifications, they cary knowledge
  713. * about program output type and how to convert unboxed values into that type.
  714. */
  715. class TOutputSpecBase {
  716. private:
  717. TMaybe<THashSet<TString>> OutputColumnsFilter_;
  718. public:
  719. virtual ~TOutputSpecBase() = default;
  720. public:
  721. /**
  722. * Get output data schema in YQL format (NB: not a YT format). The returned value must describe a structure
  723. * or a variant made of structures for fulti-table outputs (note: not all specs support multi-table output).
  724. *
  725. * See docs for the input spec's GetSchemas().
  726. *
  727. * Also TNode entity could be returned (NYT::TNode::CreateEntity()),
  728. * in which case output schema would be inferred from query and could be
  729. * obtained by Program::GetOutputSchema() call.
  730. */
  731. virtual const NYT::TNode& GetSchema() const = 0;
  732. /**
  733. * Get an output columns filter.
  734. *
  735. * Output columns filter is a set of column names that should be left in the output. All columns that are
  736. * not in this set will not be calculated. Depending on the output schema, they will be either removed
  737. * completely (for optional columns) or filled with defaults (for required columns).
  738. */
  739. const TMaybe<THashSet<TString>>& GetOutputColumnsFilter() const {
  740. return OutputColumnsFilter_;
  741. }
  742. /**
  743. * Set new output columns filter.
  744. */
  745. void SetOutputColumnsFilter(const TMaybe<THashSet<TString>>& outputColumnsFilter) {
  746. OutputColumnsFilter_ = outputColumnsFilter;
  747. }
  748. virtual bool AcceptsBlocks() const { return false; }
  749. };
  750. ////////////////////////////////////////////////////////////////////////////////////////////////////
  751. /**
  752. * Input spec traits provide information on how to process program input.
  753. *
  754. * Each input spec should create a template specialization for this class, in which it should provide several
  755. * static variables and functions.
  756. *
  757. * For example, a hypothetical example of implementing a JSON input spec would look like this:
  758. *
  759. * @code
  760. * class TJsonInputSpec: public TInputSpecBase {
  761. * // whatever magic you require for this spec
  762. * };
  763. *
  764. * template <>
  765. * class TInputSpecTraits<TJsonInputSpec> {
  766. * // write here four constants, one typedef and three static functions described below
  767. * };
  768. * @endcode
  769. *
  770. * @tparam T input spec type.
  771. */
  772. template <typename T>
  773. struct TInputSpecTraits {
  774. /// Safety flag which should be set to false in all template specializations of this class. Attempt to
  775. /// build a program using a spec with `IsPartial=true` will result in compilation error.
  776. static const constexpr bool IsPartial = true;
  777. /// Indicates whether this spec supports pull stream mode.
  778. static const constexpr bool SupportPullStreamMode = false;
  779. /// Indicates whether this spec supports pull list mode.
  780. static const constexpr bool SupportPullListMode = false;
  781. /// Indicates whether this spec supports push stream mode.
  782. static const constexpr bool SupportPushStreamMode = false;
  783. /// For push mode, indicates the return type of the builder's Process function.
  784. using TConsumerType = void;
  785. /// For pull stream mode, should take an input spec, a pull stream worker and whatever the user passed
  786. /// to the program's Apply function, create an unboxed values with a custom stream implementations
  787. /// and pass it to the worker's SetInput function for each input.
  788. template <typename ...A>
  789. static void PreparePullStreamWorker(const T&, IPullStreamWorker*, A&&...) {
  790. Y_UNREACHABLE();
  791. }
  792. /// For pull list mode, should take an input spec, a pull list worker and whatever the user passed
  793. /// to the program's Apply function, create an unboxed values with a custom list implementations
  794. /// and pass it to the worker's SetInput function for each input.
  795. template <typename ...A>
  796. static void PreparePullListWorker(const T&, IPullListWorker*, A&&...) {
  797. Y_UNREACHABLE();
  798. }
  799. /// For push stream mode, should take an input spec and a worker and create a consumer which will
  800. /// be returned to the user. The consumer should keep the worker alive until its own destruction.
  801. /// The return type of this function should exactly match the one defined in ConsumerType typedef.
  802. static TConsumerType MakeConsumer(const T&, TWorkerHolder<IPushStreamWorker>) {
  803. Y_UNREACHABLE();
  804. }
  805. };
  806. /**
  807. * Output spec traits provide information on how to process program output. Like with input specs, each output
  808. * spec requires an appropriate template specialization of this class.
  809. *
  810. * @tparam T output spec type.
  811. */
  812. template <typename T>
  813. struct TOutputSpecTraits {
  814. /// Safety flag which should be set to false in all template specializations of this class. Attempt to
  815. /// build a program using a spec with `IsPartial=false` will result in compilation error.
  816. static const constexpr bool IsPartial = true;
  817. /// Indicates whether this spec supports pull stream mode.
  818. static const constexpr bool SupportPullStreamMode = false;
  819. /// Indicates whether this spec supports pull list mode.
  820. static const constexpr bool SupportPullListMode = false;
  821. /// Indicates whether this spec supports push stream mode.
  822. static const constexpr bool SupportPushStreamMode = false;
  823. /// For pull stream mode, indicates the return type of the program's Apply function.
  824. using TPullStreamReturnType = void;
  825. /// For pull list mode, indicates the return type of the program's Apply function.
  826. using TPullListReturnType = void;
  827. /// For pull stream mode, should take an output spec and a worker and build a stream which will be returned
  828. /// to the user. The return type of this function must match the one specified in the PullStreamReturnType.
  829. static TPullStreamReturnType ConvertPullStreamWorkerToOutputType(const T&, TWorkerHolder<IPullStreamWorker>) {
  830. Y_UNREACHABLE();
  831. }
  832. /// For pull list mode, should take an output spec and a worker and build a list which will be returned
  833. /// to the user. The return type of this function must match the one specified in the PullListReturnType.
  834. static TPullListReturnType ConvertPullListWorkerToOutputType(const T&, TWorkerHolder<IPullListWorker>) {
  835. Y_UNREACHABLE();
  836. }
  837. /// For push stream mode, should take an output spec, a worker and whatever arguments the user passed
  838. /// to the program's Apply function, create a consumer for unboxed values and pass it to the worker's
  839. /// SetConsumer function.
  840. template <typename ...A>
  841. static void SetConsumerToWorker(const T&, IPushStreamWorker*, A&&...) {
  842. Y_UNREACHABLE();
  843. }
  844. };
  845. ////////////////////////////////////////////////////////////////////////////////////////////////////
  846. #define NOT_SPEC_MSG(spec_type) "passed class should be derived from " spec_type " spec base"
  847. #define PARTIAL_SPEC_MSG(spec_type) "this " spec_type " spec does not define its traits. Make sure you've passed " \
  848. "an " spec_type " spec and not some other object; also make sure you've included " \
  849. "all necessary headers. If you're developing a spec, make sure you have " \
  850. "a spec traits template specialization"
  851. #define UNSUPPORTED_MODE_MSG(spec_type, mode) "this " spec_type " spec does not support " mode " mode"
  852. class IProgram {
  853. public:
  854. virtual ~IProgram() = default;
  855. public:
  856. virtual const TInputSpecBase& GetInputSpecBase() const = 0;
  857. virtual const TOutputSpecBase& GetOutputSpecBase() const = 0;
  858. virtual const THashSet<TString>& GetUsedColumns(ui32) const = 0;
  859. virtual const THashSet<TString>& GetUsedColumns() const = 0;
  860. virtual NYT::TNode MakeInputSchema(ui32) const = 0;
  861. virtual NYT::TNode MakeInputSchema() const = 0;
  862. virtual NYT::TNode MakeOutputSchema() const = 0;
  863. virtual NYT::TNode MakeOutputSchema(ui32) const = 0;
  864. virtual NYT::TNode MakeOutputSchema(TStringBuf) const = 0;
  865. virtual NYT::TNode MakeFullOutputSchema() const = 0;
  866. virtual TIssues GetIssues() const = 0;
  867. virtual TString GetCompiledProgram() = 0;
  868. inline void MergeUsedColumns(THashSet<TString>& columns, ui32 inputIndex) {
  869. const auto& usedColumns = GetUsedColumns(inputIndex);
  870. columns.insert(usedColumns.begin(), usedColumns.end());
  871. }
  872. inline void MergeUsedColumns(THashSet<TString>& columns) {
  873. const auto& usedColumns = GetUsedColumns();
  874. columns.insert(usedColumns.begin(), usedColumns.end());
  875. }
  876. };
  877. template <typename TInputSpec, typename TOutputSpec, typename WorkerFactory>
  878. class TProgramCommon: public IProgram {
  879. static_assert(std::is_base_of<TInputSpecBase, TInputSpec>::value, NOT_SPEC_MSG("input"));
  880. static_assert(std::is_base_of<TOutputSpecBase, TOutputSpec>::value, NOT_SPEC_MSG("output"));
  881. protected:
  882. TInputSpec InputSpec_;
  883. TOutputSpec OutputSpec_;
  884. std::shared_ptr<WorkerFactory> WorkerFactory_;
  885. public:
  886. explicit TProgramCommon(
  887. TInputSpec inputSpec,
  888. TOutputSpec outputSpec,
  889. std::shared_ptr<WorkerFactory> workerFactory
  890. )
  891. : InputSpec_(inputSpec)
  892. , OutputSpec_(outputSpec)
  893. , WorkerFactory_(std::move(workerFactory))
  894. {
  895. }
  896. public:
  897. const TInputSpec& GetInputSpec() const {
  898. return InputSpec_;
  899. }
  900. const TOutputSpec& GetOutputSpec() const {
  901. return OutputSpec_;
  902. }
  903. const TInputSpecBase& GetInputSpecBase() const override {
  904. return InputSpec_;
  905. }
  906. const TOutputSpecBase& GetOutputSpecBase() const override {
  907. return OutputSpec_;
  908. }
  909. const THashSet<TString>& GetUsedColumns(ui32 inputIndex) const override {
  910. return WorkerFactory_->GetUsedColumns(inputIndex);
  911. }
  912. const THashSet<TString>& GetUsedColumns() const override {
  913. return WorkerFactory_->GetUsedColumns();
  914. }
  915. NYT::TNode MakeInputSchema(ui32 inputIndex) const override {
  916. return WorkerFactory_->MakeInputSchema(inputIndex);
  917. }
  918. NYT::TNode MakeInputSchema() const override {
  919. return WorkerFactory_->MakeInputSchema();
  920. }
  921. NYT::TNode MakeOutputSchema() const override {
  922. return WorkerFactory_->MakeOutputSchema();
  923. }
  924. NYT::TNode MakeOutputSchema(ui32 outputIndex) const override {
  925. return WorkerFactory_->MakeOutputSchema(outputIndex);
  926. }
  927. NYT::TNode MakeOutputSchema(TStringBuf outputName) const override {
  928. return WorkerFactory_->MakeOutputSchema(outputName);
  929. }
  930. NYT::TNode MakeFullOutputSchema() const override {
  931. return WorkerFactory_->MakeFullOutputSchema();
  932. }
  933. TIssues GetIssues() const override {
  934. return WorkerFactory_->GetIssues();
  935. }
  936. TString GetCompiledProgram() override {
  937. return WorkerFactory_->GetCompiledProgram();
  938. }
  939. };
  940. template <typename TInputSpec, typename TOutputSpec>
  941. class TPullStreamProgram final: public TProgramCommon<TInputSpec, TOutputSpec, IPullStreamWorkerFactory> {
  942. using TProgramCommon<TInputSpec, TOutputSpec, IPullStreamWorkerFactory>::WorkerFactory_;
  943. using TProgramCommon<TInputSpec, TOutputSpec, IPullStreamWorkerFactory>::InputSpec_;
  944. using TProgramCommon<TInputSpec, TOutputSpec, IPullStreamWorkerFactory>::OutputSpec_;
  945. public:
  946. using TProgramCommon<TInputSpec, TOutputSpec, IPullStreamWorkerFactory>::TProgramCommon;
  947. public:
  948. template <typename ...T>
  949. typename TOutputSpecTraits<TOutputSpec>::TPullStreamReturnType Apply(T&& ... t) {
  950. static_assert(!TInputSpecTraits<TInputSpec>::IsPartial, PARTIAL_SPEC_MSG("input"));
  951. static_assert(!TOutputSpecTraits<TOutputSpec>::IsPartial, PARTIAL_SPEC_MSG("output"));
  952. static_assert(TInputSpecTraits<TInputSpec>::SupportPullStreamMode, UNSUPPORTED_MODE_MSG("input", "pull stream"));
  953. static_assert(TOutputSpecTraits<TOutputSpec>::SupportPullStreamMode, UNSUPPORTED_MODE_MSG("output", "pull stream"));
  954. auto worker = WorkerFactory_->MakeWorker();
  955. TInputSpecTraits<TInputSpec>::PreparePullStreamWorker(InputSpec_, worker.Get(), std::forward<T>(t)...);
  956. return TOutputSpecTraits<TOutputSpec>::ConvertPullStreamWorkerToOutputType(OutputSpec_, std::move(worker));
  957. }
  958. };
  959. template <typename TInputSpec, typename TOutputSpec>
  960. class TPullListProgram final: public TProgramCommon<TInputSpec, TOutputSpec, IPullListWorkerFactory> {
  961. using TProgramCommon<TInputSpec, TOutputSpec, IPullListWorkerFactory>::WorkerFactory_;
  962. using TProgramCommon<TInputSpec, TOutputSpec, IPullListWorkerFactory>::InputSpec_;
  963. using TProgramCommon<TInputSpec, TOutputSpec, IPullListWorkerFactory>::OutputSpec_;
  964. public:
  965. using TProgramCommon<TInputSpec, TOutputSpec, IPullListWorkerFactory>::TProgramCommon;
  966. public:
  967. template <typename ...T>
  968. typename TOutputSpecTraits<TOutputSpec>::TPullListReturnType Apply(T&& ... t) {
  969. static_assert(!TInputSpecTraits<TInputSpec>::IsPartial, PARTIAL_SPEC_MSG("input"));
  970. static_assert(!TOutputSpecTraits<TOutputSpec>::IsPartial, PARTIAL_SPEC_MSG("output"));
  971. static_assert(TInputSpecTraits<TInputSpec>::SupportPullListMode, UNSUPPORTED_MODE_MSG("input", "pull list"));
  972. static_assert(TOutputSpecTraits<TOutputSpec>::SupportPullListMode, UNSUPPORTED_MODE_MSG("output", "pull list"));
  973. auto worker = WorkerFactory_->MakeWorker();
  974. TInputSpecTraits<TInputSpec>::PreparePullListWorker(InputSpec_, worker.Get(), std::forward<T>(t)...);
  975. return TOutputSpecTraits<TOutputSpec>::ConvertPullListWorkerToOutputType(OutputSpec_, std::move(worker));
  976. }
  977. };
  978. template <typename TInputSpec, typename TOutputSpec>
  979. class TPushStreamProgram final: public TProgramCommon<TInputSpec, TOutputSpec, IPushStreamWorkerFactory> {
  980. using TProgramCommon<TInputSpec, TOutputSpec, IPushStreamWorkerFactory>::WorkerFactory_;
  981. using TProgramCommon<TInputSpec, TOutputSpec, IPushStreamWorkerFactory>::InputSpec_;
  982. using TProgramCommon<TInputSpec, TOutputSpec, IPushStreamWorkerFactory>::OutputSpec_;
  983. public:
  984. using TProgramCommon<TInputSpec, TOutputSpec, IPushStreamWorkerFactory>::TProgramCommon;
  985. public:
  986. template <typename ...T>
  987. typename TInputSpecTraits<TInputSpec>::TConsumerType Apply(T&& ... t) {
  988. static_assert(!TInputSpecTraits<TInputSpec>::IsPartial, PARTIAL_SPEC_MSG("input"));
  989. static_assert(!TOutputSpecTraits<TOutputSpec>::IsPartial, PARTIAL_SPEC_MSG("output"));
  990. static_assert(TInputSpecTraits<TInputSpec>::SupportPushStreamMode, UNSUPPORTED_MODE_MSG("input", "push stream"));
  991. static_assert(TOutputSpecTraits<TOutputSpec>::SupportPushStreamMode, UNSUPPORTED_MODE_MSG("output", "push stream"));
  992. auto worker = WorkerFactory_->MakeWorker();
  993. TOutputSpecTraits<TOutputSpec>::SetConsumerToWorker(OutputSpec_, worker.Get(), std::forward<T>(t)...);
  994. return TInputSpecTraits<TInputSpec>::MakeConsumer(InputSpec_, std::move(worker));
  995. }
  996. };
  997. #undef NOT_SPEC_MSG
  998. #undef PARTIAL_SPEC_MSG
  999. #undef UNSUPPORTED_MODE_MSG
  1000. ////////////////////////////////////////////////////////////////////////////////////////////////////
  1001. /**
  1002. * Configure global logging facilities. Affects all YQL modules.
  1003. */
  1004. void ConfigureLogging(const TLoggingOptions& = {});
  1005. /**
  1006. * Create a new program factory.
  1007. * Custom logging initialization could be preformed by a call to the ConfigureLogging method beforehand.
  1008. * If the ConfigureLogging method has not been called the default logging initialization will be performed.
  1009. */
  1010. IProgramFactoryPtr MakeProgramFactory(const TProgramFactoryOptions& = {});
  1011. }
  1012. }
  1013. Y_DECLARE_OUT_SPEC(inline, NYql::NPureCalc::TCompileError, stream, value) {
  1014. stream << value.AsStrBuf() << Endl << "Issues:" << Endl << value.GetIssues() << Endl << Endl << "Yql:" << Endl <<value.GetYql();
  1015. }