interface.h 50 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737738739740741742743744745746747748749750751752753754755756757758759760761762763764765766767768769770771772773774775776777778779780781782783784785786787788789790791792793794795796797798799800801802803804805806807808809810811812813814815816817818819820821822823824825826827828829830831832833834835836837838839840841842843844845846847848849850851852853854855856857858859860861862863864865866867868869870871872873874875876877878879880881882883884885886887888889890891892893894895896897898899900901902903904905906907908909910911912913914915916917918919920921922923924925926927928929930931932933934935936937938939940941942943944945946947948949950951952953954955956957958959960961962963964965966967968969970971972973974975976977978979980981982983984985986987988989990991992993994995996997998999100010011002100310041005100610071008100910101011101210131014101510161017101810191020102110221023102410251026102710281029103010311032103310341035103610371038103910401041104210431044104510461047104810491050105110521053105410551056105710581059106010611062106310641065106610671068106910701071107210731074107510761077107810791080108110821083108410851086108710881089109010911092109310941095109610971098109911001101110211031104110511061107110811091110111111121113111411151116111711181119112011211122112311241125112611271128112911301131113211331134113511361137113811391140114111421143114411451146114711481149115011511152115311541155115611571158115911601161116211631164116511661167116811691170117111721173117411751176117711781179118011811182118311841185
  1. #pragma once
  2. #include "fwd.h"
  3. #include "wrappers.h"
  4. #include <yql/essentials/core/user_data/yql_user_data.h>
  5. #include <yql/essentials/public/udf/udf_value.h>
  6. #include <yql/essentials/public/udf/udf_counter.h>
  7. #include <yql/essentials/public/udf/udf_registrator.h>
  8. #include <yql/essentials/public/issue/yql_issue.h>
  9. #include <library/cpp/yson/node/node.h>
  10. #include <library/cpp/logger/priority.h>
  11. #include <util/generic/ptr.h>
  12. #include <util/generic/maybe.h>
  13. #include <util/generic/hash_set.h>
  14. #include <util/generic/string.h>
  15. #include <util/stream/output.h>
  16. class ITimeProvider;
  17. namespace NKikimr {
  18. namespace NMiniKQL {
  19. class TScopedAlloc;
  20. class IComputationGraph;
  21. class IFunctionRegistry;
  22. class TTypeEnvironment;
  23. class TType;
  24. class TStructType;
  25. }
  26. }
  27. namespace NYql {
  28. namespace NPureCalc {
  29. /**
  30. * SQL or s-expression translation error.
  31. */
  32. class TCompileError: public yexception {
  33. private:
  34. TString Yql_;
  35. TString Issues_;
  36. public:
  37. // TODO: maybe accept an actual list of issues here?
  38. // See https://a.yandex-team.ru/arc/review/439403/details#comment-778237
  39. TCompileError(TString yql, TString issues)
  40. : Yql_(std::move(yql))
  41. , Issues_(std::move(issues))
  42. {
  43. }
  44. public:
  45. /**
  46. * Get the sql query which caused the error (if there is one available).
  47. */
  48. const TString& GetYql() const {
  49. return Yql_;
  50. }
  51. /**
  52. * Get detailed description for all errors and warnings that happened during sql translation.
  53. */
  54. const TString& GetIssues() const {
  55. return Issues_;
  56. }
  57. };
  58. ////////////////////////////////////////////////////////////////////////////////////////////////////
  59. /**
  60. * A generic input stream of objects.
  61. */
  62. template <typename T>
  63. class IStream {
  64. public:
  65. virtual ~IStream() = default;
  66. public:
  67. /**
  68. * Pops and returns a next value in the stream. If the stream is finished, should return some sentinel object.
  69. *
  70. * Depending on return type, this function may not transfer object ownership to a user.
  71. * Thus, the stream may manage the returned object * itself.
  72. * That is, the returned object's lifetime may be bound to the input stream lifetime; it may be destroyed
  73. * upon calling Fetch() or upon destroying the stream, whichever happens first.
  74. */
  75. virtual T Fetch() = 0;
  76. };
  77. /**
  78. * Create a new stream which applies the given functor to the elements of the original stream.
  79. */
  80. template <typename TOld, typename TNew, typename TFunctor>
  81. inline THolder<IStream<TNew>> MapStream(THolder<IStream<TOld>> stream, TFunctor functor) {
  82. return THolder(new NPrivate::TMappingStream<TNew, TOld, TFunctor>(std::move(stream), std::move(functor)));
  83. };
  84. /**
  85. * Convert stream of objects into a stream of potentially incompatible objects.
  86. *
  87. * This conversion applies static cast to the output of the original stream. Use with caution!
  88. */
  89. /// @{
  90. template <
  91. typename TNew, typename TOld,
  92. std::enable_if_t<!std::is_same<TNew, TOld>::value>* = nullptr>
  93. inline THolder<IStream<TNew>> ConvertStreamUnsafe(THolder<IStream<TOld>> stream) {
  94. return MapStream<TOld, TNew>(std::move(stream), [](TOld x) -> TNew { return static_cast<TNew>(x); });
  95. }
  96. template <typename T>
  97. inline THolder<IStream<T>> ConvertStreamUnsafe(THolder<IStream<T>> stream) {
  98. return stream;
  99. }
  100. /// @}
  101. /**
  102. * Convert stream of objects into a stream of compatible objects.
  103. *
  104. * Note: each conversion adds one level of indirection so avoid them if possible.
  105. */
  106. template <typename TNew, typename TOld, std::enable_if_t<std::is_convertible<TOld, TNew>::value>* = nullptr>
  107. inline THolder<IStream<TNew>> ConvertStream(THolder<IStream<TOld>> stream) {
  108. return ConvertStreamUnsafe<TNew, TOld>(std::move(stream));
  109. }
  110. ////////////////////////////////////////////////////////////////////////////////////////////////////
  111. /**
  112. * A generic push consumer.
  113. */
  114. template <typename T>
  115. class IConsumer {
  116. public:
  117. virtual ~IConsumer() = default;
  118. public:
  119. /**
  120. * Feed an object to consumer.
  121. *
  122. * Depending on argument type, the consumer may not take ownership of the passed object;
  123. * in that case it is the caller responsibility to manage the object lifetime after passing it to this method.
  124. *
  125. * The passed object can be destroyed after the consumer returns from this function; the consumer should
  126. * not store pointer to the passed object or the passed object itself without taking all necessary precautions
  127. * to ensure that the pointer or the object stays valid after returning.
  128. */
  129. virtual void OnObject(T) = 0;
  130. /**
  131. * Close the consumer and run finalization logic. Calling OnObject after calling this function is an error.
  132. */
  133. virtual void OnFinish() = 0;
  134. };
  135. /**
  136. * Create a new consumer which applies the given functor to objects before .
  137. */
  138. template <typename TOld, typename TNew, typename TFunctor>
  139. inline THolder<IConsumer<TNew>> MapConsumer(THolder<IConsumer<TOld>> stream, TFunctor functor) {
  140. return THolder(new NPrivate::TMappingConsumer<TNew, TOld, TFunctor>(std::move(stream), std::move(functor)));
  141. };
  142. /**
  143. * Convert consumer of objects into a consumer of potentially incompatible objects.
  144. *
  145. * This conversion applies static cast to the input value. Use with caution.
  146. */
  147. /// @{
  148. template <
  149. typename TNew, typename TOld,
  150. std::enable_if_t<!std::is_same<TNew, TOld>::value>* = nullptr>
  151. inline THolder<IConsumer<TNew>> ConvertConsumerUnsafe(THolder<IConsumer<TOld>> consumer) {
  152. return MapConsumer<TOld, TNew>(std::move(consumer), [](TNew x) -> TOld { return static_cast<TOld>(x); });
  153. }
  154. template <typename T>
  155. inline THolder<IConsumer<T>> ConvertConsumerUnsafe(THolder<IConsumer<T>> consumer) {
  156. return consumer;
  157. }
  158. /// @}
  159. /**
  160. * Convert consumer of objects into a consumer of compatible objects.
  161. *
  162. * Note: each conversion adds one level of indirection so avoid them if possible.
  163. */
  164. template <typename TNew, typename TOld, std::enable_if_t<std::is_convertible<TNew, TOld>::value>* = nullptr>
  165. inline THolder<IConsumer<TNew>> ConvertConsumer(THolder<IConsumer<TOld>> consumer) {
  166. return ConvertConsumerUnsafe<TNew, TOld>(std::move(consumer));
  167. }
  168. /**
  169. * Create a consumer which holds a non-owning pointer to the given consumer
  170. * and passes all messages to the latter.
  171. */
  172. template <typename T, typename C>
  173. THolder<NPrivate::TNonOwningConsumer<T, C>> MakeNonOwningConsumer(C consumer) {
  174. return MakeHolder<NPrivate::TNonOwningConsumer<T, C>>(consumer);
  175. }
  176. ////////////////////////////////////////////////////////////////////////////////////////////////////
  177. /**
  178. * Logging options.
  179. */
  180. struct TLoggingOptions final {
  181. public:
  182. /// Logging level for messages generated during compilation.
  183. ELogPriority LogLevel_; // TODO: rename to LogLevel
  184. /// Where to write log messages.
  185. IOutputStream* LogDestination;
  186. public:
  187. TLoggingOptions();
  188. /**
  189. * Set a new logging level.
  190. *
  191. * @return reference to self, to allow method chaining.
  192. */
  193. TLoggingOptions& SetLogLevel(ELogPriority);
  194. /**
  195. * Set a new logging destination.
  196. *
  197. * @return reference to self, to allow method chaining.
  198. */
  199. TLoggingOptions& SetLogDestination(IOutputStream*);
  200. };
  201. /**
  202. * General options for program factory.
  203. */
  204. struct TProgramFactoryOptions final {
  205. public:
  206. /// Path to a directory with compiled UDFs. Leave empty to disable loading external UDFs.
  207. TString UdfsDir_; // TODO: rename to UDFDir
  208. /// List of available external resources, e.g. files, UDFs, libraries.
  209. TVector<NUserData::TUserData> UserData_; // TODO: rename to UserData
  210. /// LLVM settings. Assign "OFF" to disable LLVM, empty string for default settings.
  211. TString LLVMSettings;
  212. /// Block engine settings. Assign "force" to unconditionally enable
  213. /// it, "disable" for turn it off and "auto" to left the final
  214. /// decision to the platform heuristics.
  215. TString BlockEngineSettings;
  216. /// Output stream to dump the compiled and optimized expressions.
  217. IOutputStream* ExprOutputStream;
  218. /// Provider for generic counters which can be used to export statistics from UDFs.
  219. NKikimr::NUdf::ICountersProvider* CountersProvider;
  220. /// YT Type V3 flags for Skiff/Yson serialization.
  221. ui64 NativeYtTypeFlags;
  222. /// Seed for deterministic time provider
  223. TMaybe<ui64> DeterministicTimeProviderSeed;
  224. /// Use special system columns to support tables naming (supports non empty ``TablePath()``/``TableName()``)
  225. bool UseSystemColumns;
  226. /// Reuse allocated workers
  227. bool UseWorkerPool;
  228. public:
  229. TProgramFactoryOptions();
  230. public:
  231. /**
  232. * Set a new path to a directory with UDFs.
  233. *
  234. * @return reference to self, to allow method chaining.
  235. */
  236. TProgramFactoryOptions& SetUDFsDir(TStringBuf);
  237. /**
  238. * Add a new library to the UserData list.
  239. *
  240. * @param disposition where the resource resides, e.g. on filesystem, in memory, etc.
  241. * NB: URL disposition is not supported.
  242. * @param name name of the resource.
  243. * @param content depending on disposition, either path to the resource or its content.
  244. * @return reference to self, to allow method chaining.
  245. */
  246. TProgramFactoryOptions& AddLibrary(NUserData::EDisposition disposition, TStringBuf name, TStringBuf content);
  247. /**
  248. * Add a new file to the UserData list.
  249. *
  250. * @param disposition where the resource resides, e.g. on filesystem, in memory, etc.
  251. * NB: URL disposition is not supported.
  252. * @param name name of the resource.
  253. * @param content depending on disposition, either path to the resource or its content.
  254. * @return reference to self, to allow method chaining.
  255. */
  256. TProgramFactoryOptions& AddFile(NUserData::EDisposition disposition, TStringBuf name, TStringBuf content);
  257. /**
  258. * Add a new UDF to the UserData list.
  259. *
  260. * @param disposition where the resource resides, e.g. on filesystem, in memory, etc.
  261. * NB: URL disposition is not supported.
  262. * @param name name of the resource.
  263. * @param content depending on disposition, either path to the resource or its content.
  264. * @return reference to self, to allow method chaining.
  265. */
  266. TProgramFactoryOptions& AddUDF(NUserData::EDisposition disposition, TStringBuf name, TStringBuf content);
  267. /**
  268. * Set new LLVM settings.
  269. *
  270. * @return reference to self, to allow method chaining.
  271. */
  272. TProgramFactoryOptions& SetLLVMSettings(TStringBuf llvm_settings);
  273. /**
  274. * Set new block engine settings.
  275. *
  276. * @return reference to self, to allow method chaining.
  277. */
  278. TProgramFactoryOptions& SetBlockEngineSettings(TStringBuf blockEngineSettings);
  279. /**
  280. * Set the stream to dump the compiled and optimized expressions.
  281. *
  282. * @return reference to self, to allow method chaining.
  283. */
  284. TProgramFactoryOptions& SetExprOutputStream(IOutputStream* exprOutputStream);
  285. /**
  286. * Set new counters provider. Passed pointer should stay alive for as long as the processor factory
  287. * stays alive.
  288. *
  289. * @return reference to self, to allow method chaining.
  290. */
  291. TProgramFactoryOptions& SetCountersProvider(NKikimr::NUdf::ICountersProvider* countersProvider);
  292. /**
  293. * Set new YT Type V3 mode. Deprecated method. Use SetNativeYtTypeFlags instead
  294. *
  295. * @return reference to self, to allow method chaining.
  296. */
  297. TProgramFactoryOptions& SetUseNativeYtTypes(bool useNativeTypes);
  298. /**
  299. * Set YT Type V3 flags.
  300. *
  301. * @return reference to self, to allow method chaining.
  302. */
  303. TProgramFactoryOptions& SetNativeYtTypeFlags(ui64 nativeTypeFlags);
  304. /**
  305. * Set seed for deterministic time provider.
  306. *
  307. * @return reference to self, to allow method chaining.
  308. */
  309. TProgramFactoryOptions& SetDeterministicTimeProviderSeed(TMaybe<ui64> seed);
  310. /**
  311. * Set new flag whether to allow using system columns or not.
  312. *
  313. * @return reference to self, to allow method chaining.
  314. */
  315. TProgramFactoryOptions& SetUseSystemColumns(bool useSystemColumns);
  316. /**
  317. * Set new flag whether to allow reusing workers or not.
  318. *
  319. * @return reference to self, to allow method chaining.
  320. */
  321. TProgramFactoryOptions& SetUseWorkerPool(bool useWorkerPool);
  322. };
  323. ////////////////////////////////////////////////////////////////////////////////////////////////////
  324. /**
  325. * What exactly are we parsing: SQL or an s-expression.
  326. */
  327. enum class ETranslationMode {
  328. SQL /* "SQL" */,
  329. SExpr /* "s-expression" */,
  330. Mkql /* "mkql" */,
  331. PG /* PostgreSQL */
  332. };
  333. /**
  334. * A facility for compiling sql and s-expressions and making programs from them.
  335. */
  336. class IProgramFactory: public TThrRefBase {
  337. protected:
  338. virtual IPullStreamWorkerFactoryPtr MakePullStreamWorkerFactory(const TInputSpecBase&, const TOutputSpecBase&, TString, ETranslationMode, ui16) = 0;
  339. virtual IPullListWorkerFactoryPtr MakePullListWorkerFactory(const TInputSpecBase&, const TOutputSpecBase&, TString, ETranslationMode, ui16) = 0;
  340. virtual IPushStreamWorkerFactoryPtr MakePushStreamWorkerFactory(const TInputSpecBase&, const TOutputSpecBase&, TString, ETranslationMode, ui16) = 0;
  341. public:
  342. /**
  343. * Add new udf module. It's not specified whether adding new modules will affect existing programs
  344. * (theoretical answer is 'no').
  345. */
  346. virtual void AddUdfModule(const TStringBuf&, NKikimr::NUdf::TUniquePtr<NKikimr::NUdf::IUdfModule>&&) = 0;
  347. // TODO: support setting udf modules via factory options.
  348. /**
  349. * Set new counters provider, override one that was specified via factory options. Note that existing
  350. * programs will still reference the previous provider.
  351. */
  352. virtual void SetCountersProvider(NKikimr::NUdf::ICountersProvider*) = 0;
  353. // TODO: support setting providers via factory options.
  354. template <typename TInputSpec, typename TOutputSpec>
  355. THolder<TPullStreamProgram<TInputSpec, TOutputSpec>> MakePullStreamProgram(
  356. TInputSpec inputSpec, TOutputSpec outputSpec, TString query, ETranslationMode mode = ETranslationMode::SQL, ui16 syntaxVersion = 1
  357. ) {
  358. auto workerFactory = MakePullStreamWorkerFactory(inputSpec, outputSpec, std::move(query), mode, syntaxVersion);
  359. return MakeHolder<TPullStreamProgram<TInputSpec, TOutputSpec>>(std::move(inputSpec), std::move(outputSpec), workerFactory);
  360. }
  361. template <typename TInputSpec, typename TOutputSpec>
  362. THolder<TPullListProgram<TInputSpec, TOutputSpec>> MakePullListProgram(
  363. TInputSpec inputSpec, TOutputSpec outputSpec, TString query, ETranslationMode mode = ETranslationMode::SQL, ui16 syntaxVersion = 1
  364. ) {
  365. auto workerFactory = MakePullListWorkerFactory(inputSpec, outputSpec, std::move(query), mode, syntaxVersion);
  366. return MakeHolder<TPullListProgram<TInputSpec, TOutputSpec>>(std::move(inputSpec), std::move(outputSpec), workerFactory);
  367. }
  368. template <typename TInputSpec, typename TOutputSpec>
  369. THolder<TPushStreamProgram<TInputSpec, TOutputSpec>> MakePushStreamProgram(
  370. TInputSpec inputSpec, TOutputSpec outputSpec, TString query, ETranslationMode mode = ETranslationMode::SQL, ui16 syntaxVersion = 1
  371. ) {
  372. auto workerFactory = MakePushStreamWorkerFactory(inputSpec, outputSpec, std::move(query), mode, syntaxVersion);
  373. return MakeHolder<TPushStreamProgram<TInputSpec, TOutputSpec>>(std::move(inputSpec), std::move(outputSpec), workerFactory);
  374. }
  375. };
  376. ////////////////////////////////////////////////////////////////////////////////////////////////////
  377. /**
  378. * A facility for creating workers. Despite being a part of a public API, worker factory is not used directly.
  379. */
  380. class IWorkerFactory: public std::enable_shared_from_this<IWorkerFactory> {
  381. public:
  382. virtual ~IWorkerFactory() = default;
  383. /**
  384. * Get input column names for specified input that are actually used in the query.
  385. */
  386. virtual const THashSet<TString>& GetUsedColumns(ui32) const = 0;
  387. /**
  388. * Overload for single-input programs.
  389. */
  390. virtual const THashSet<TString>& GetUsedColumns() const = 0;
  391. /**
  392. * Make input type schema for specified input as deduced by program optimizer. This schema is equivalent
  393. * to one provided by input spec up to the order of the fields in structures.
  394. */
  395. virtual NYT::TNode MakeInputSchema(ui32) const = 0;
  396. /**
  397. * Overload for single-input programs.
  398. */
  399. virtual NYT::TNode MakeInputSchema() const = 0;
  400. /**
  401. * Make output type schema as deduced by program optimizer. If output spec provides its own schema, than
  402. * this schema is equivalent to one provided by output spec up to the order of the fields in structures.
  403. */
  404. /// @{
  405. /**
  406. * Overload for single-table output programs (i.e. output type is struct).
  407. */
  408. virtual NYT::TNode MakeOutputSchema() const = 0;
  409. /**
  410. * Overload for multi-table output programs (i.e. output type is variant over tuple).
  411. */
  412. virtual NYT::TNode MakeOutputSchema(ui32) const = 0;
  413. /**
  414. * Overload for multi-table output programs (i.e. output type is variant over struct).
  415. */
  416. virtual NYT::TNode MakeOutputSchema(TStringBuf) const = 0;
  417. /// @}
  418. /**
  419. * Make full output schema. For single-output programs returns struct type, for multi-output programs
  420. * returns variant type.
  421. *
  422. * Warning: calling this function may result in extended memory usage for large number of output tables.
  423. */
  424. virtual NYT::TNode MakeFullOutputSchema() const = 0;
  425. /**
  426. * Get compilation issues
  427. */
  428. virtual TIssues GetIssues() const = 0;
  429. /**
  430. * Get precompiled mkql program
  431. */
  432. virtual TString GetCompiledProgram() = 0;
  433. /**
  434. * Return a worker to the factory for possible reuse
  435. */
  436. virtual void ReturnWorker(IWorker* worker) = 0;
  437. };
  438. class TReleaseWorker {
  439. public:
  440. template <class T>
  441. static inline void Destroy(T* t) noexcept {
  442. t->Release();
  443. }
  444. };
  445. template <class T>
  446. using TWorkerHolder = THolder<T, TReleaseWorker>;
  447. /**
  448. * Factory for generating pull stream workers.
  449. */
  450. class IPullStreamWorkerFactory: public IWorkerFactory {
  451. public:
  452. /**
  453. * Create a new pull stream worker.
  454. */
  455. virtual TWorkerHolder<IPullStreamWorker> MakeWorker() = 0;
  456. };
  457. /**
  458. * Factory for generating pull list workers.
  459. */
  460. class IPullListWorkerFactory: public IWorkerFactory {
  461. public:
  462. /**
  463. * Create a new pull list worker.
  464. */
  465. virtual TWorkerHolder<IPullListWorker> MakeWorker() = 0;
  466. };
  467. /**
  468. * Factory for generating push stream workers.
  469. */
  470. class IPushStreamWorkerFactory: public IWorkerFactory {
  471. public:
  472. /**
  473. * Create a new push stream worker.
  474. */
  475. virtual TWorkerHolder<IPushStreamWorker> MakeWorker() = 0;
  476. };
  477. ////////////////////////////////////////////////////////////////////////////////////////////////////
  478. /**
  479. * Worker is a central part of any program instance. It contains current computation state
  480. * (called computation graph) and objects required to work with it, including an allocator for unboxed values.
  481. *
  482. * Usually, users do not interact with workers directly. They use program instance entry points such as streams
  483. * and consumers instead. The only case when one would have to to interact with workers is when implementing
  484. * custom io-specification.
  485. */
  486. class IWorker {
  487. protected:
  488. friend class TReleaseWorker;
  489. /**
  490. * Cleanup the worker and return to a worker factory for reuse
  491. */
  492. virtual void Release() = 0;
  493. public:
  494. virtual ~IWorker() = default;
  495. public:
  496. /**
  497. * Number of inputs for this program.
  498. */
  499. virtual ui32 GetInputsCount() const = 0;
  500. /**
  501. * MiniKQL input struct type of specified input for this program. Type is equivalent to the deduced input
  502. * schema (see IWorker::MakeInputSchema())
  503. *
  504. * If ``original`` is set to ``true``, returns type without virtual system columns.
  505. */
  506. virtual const NKikimr::NMiniKQL::TStructType* GetInputType(ui32, bool original = false) const = 0;
  507. /**
  508. * Overload for single-input programs.
  509. */
  510. virtual const NKikimr::NMiniKQL::TStructType* GetInputType(bool original = false) const = 0;
  511. /**
  512. * MiniKQL input struct type of the specified input for this program.
  513. * The returned type is the actual type of the specified input node.
  514. */
  515. virtual const NKikimr::NMiniKQL::TStructType* GetRawInputType(ui32) const = 0;
  516. /**
  517. * Overload for single-input programs.
  518. */
  519. virtual const NKikimr::NMiniKQL::TStructType* GetRawInputType() const = 0;
  520. /**
  521. * MiniKQL output struct type for this program. The returned type is equivalent to the deduced output
  522. * schema (see IWorker::MakeFullOutputSchema()).
  523. */
  524. virtual const NKikimr::NMiniKQL::TType* GetOutputType() const = 0;
  525. /**
  526. * MiniKQL output struct type for this program. The returned type is
  527. * the actual type of the root node.
  528. */
  529. virtual const NKikimr::NMiniKQL::TType* GetRawOutputType() const = 0;
  530. /**
  531. * Make input type schema for specified input as deduced by program optimizer. This schema is equivalent
  532. * to one provided by input spec up to the order of the fields in structures.
  533. */
  534. virtual NYT::TNode MakeInputSchema(ui32) const = 0;
  535. /**
  536. * Overload for single-input programs.
  537. */
  538. virtual NYT::TNode MakeInputSchema() const = 0;
  539. /**
  540. * Make output type schema as deduced by program optimizer. If output spec provides its own schema, than
  541. * this schema is equivalent to one provided by output spec up to the order of the fields in structures.
  542. */
  543. /// @{
  544. /**
  545. * Overload for single-table output programs (i.e. output type is struct).
  546. */
  547. virtual NYT::TNode MakeOutputSchema() const = 0;
  548. /**
  549. * Overload for multi-table output programs (i.e. output type is variant over tuple).
  550. */
  551. virtual NYT::TNode MakeOutputSchema(ui32) const = 0;
  552. /**
  553. * Overload for multi-table output programs (i.e. output type is variant over struct).
  554. */
  555. virtual NYT::TNode MakeOutputSchema(TStringBuf) const = 0;
  556. /// @}
  557. /**
  558. * Generates full output schema. For single-output programs returns struct type, for multi-output programs
  559. * returns variant type.
  560. *
  561. * Warning: calling this function may result in extended memory usage for large number of output tables.
  562. */
  563. virtual NYT::TNode MakeFullOutputSchema() const = 0;
  564. /**
  565. * Get scoped alloc used in this worker.
  566. */
  567. virtual NKikimr::NMiniKQL::TScopedAlloc& GetScopedAlloc() = 0;
  568. /**
  569. * Get computation graph.
  570. */
  571. virtual NKikimr::NMiniKQL::IComputationGraph& GetGraph() = 0;
  572. /**
  573. * Get function registry for this worker.
  574. */
  575. virtual const NKikimr::NMiniKQL::IFunctionRegistry& GetFunctionRegistry() const = 0;
  576. /**
  577. * Get type environment for this worker.
  578. */
  579. virtual NKikimr::NMiniKQL::TTypeEnvironment& GetTypeEnvironment() = 0;
  580. /**
  581. * Get llvm settings for this worker.
  582. */
  583. virtual const TString& GetLLVMSettings() const = 0;
  584. /**
  585. * Get YT Type V3 flags
  586. */
  587. virtual ui64 GetNativeYtTypeFlags() const = 0;
  588. /**
  589. * Get time provider
  590. */
  591. virtual ITimeProvider* GetTimeProvider() const = 0;
  592. /**
  593. * Release all input data from worker state
  594. */
  595. virtual void Invalidate() = 0;
  596. };
  597. /**
  598. * Worker which operates in pull stream mode.
  599. */
  600. class IPullStreamWorker: public IWorker {
  601. public:
  602. /**
  603. * Set input computation graph node for specified input. The passed unboxed value should be a stream of
  604. * structs. It should be created via the allocator associated with this very worker.
  605. * This function can only be called once for each input.
  606. */
  607. virtual void SetInput(NKikimr::NUdf::TUnboxedValue&&, ui32) = 0;
  608. /**
  609. * Get the output computation graph node. The returned node will be a stream of structs or variants.
  610. * This function cannot be called before setting an input value.
  611. */
  612. virtual NKikimr::NUdf::TUnboxedValue& GetOutput() = 0;
  613. };
  614. /**
  615. * Worker which operates in pull list mode.
  616. */
  617. class IPullListWorker: public IWorker {
  618. public:
  619. /**
  620. * Set input computation graph node for specified input. The passed unboxed value should be a list of
  621. * structs. It should be created via the allocator associated with this very worker.
  622. * This function can only be called once for each index.
  623. */
  624. virtual void SetInput(NKikimr::NUdf::TUnboxedValue&&, ui32) = 0;
  625. /**
  626. * Get the output computation graph node. The returned node will be a list of structs or variants.
  627. * This function cannot be called before setting an input value.
  628. */
  629. virtual NKikimr::NUdf::TUnboxedValue& GetOutput() = 0;
  630. /**
  631. * Get iterator over the output list.
  632. */
  633. virtual NKikimr::NUdf::TUnboxedValue& GetOutputIterator() = 0;
  634. /**
  635. * Reset iterator to the beginning of the output list. After calling this function, GetOutputIterator()
  636. * will return a fresh iterator; all previously returned iterators will become invalid.
  637. */
  638. virtual void ResetOutputIterator() = 0;
  639. };
  640. /**
  641. * Worker which operates in push stream mode.
  642. */
  643. class IPushStreamWorker: public IWorker {
  644. public:
  645. /**
  646. * Set a consumer where the worker will relay its output. This function can only be called once.
  647. */
  648. virtual void SetConsumer(THolder<IConsumer<const NKikimr::NUdf::TUnboxedValue*>>) = 0;
  649. /**
  650. * Push new value to the graph, than feed all new output to the consumer. Values cannot be pushed before
  651. * assigning a consumer.
  652. */
  653. virtual void Push(NKikimr::NUdf::TUnboxedValue&&) = 0;
  654. /**
  655. * Send finish event and clear the computation graph. No new values will be accepted.
  656. */
  657. virtual void OnFinish() = 0;
  658. };
  659. ////////////////////////////////////////////////////////////////////////////////////////////////////
  660. /**
  661. * Input specifications describe format for program input. They carry information about input data schema
  662. * as well as the knowledge about how to convert input structures into unboxed values (data format which can be
  663. * processed by the YQL runtime).
  664. *
  665. * Input spec defines the arguments of the program's Apply method. For example, a program
  666. * with the protobuf input spec will accept a stream of protobuf messages while a program with the
  667. * yson spec will accept an input stream (binary or text one).
  668. *
  669. * See documentation for input and output spec traits for hints on how to implement a custom specs.
  670. */
  671. class TInputSpecBase {
  672. protected:
  673. mutable TVector<THashMap<TString, NYT::TNode>> AllVirtualColumns_;
  674. public:
  675. virtual ~TInputSpecBase() = default;
  676. public:
  677. /**
  678. * Get input data schemas in YQL format (NB: not a YT format). Each item of the returned vector must
  679. * describe a structure.
  680. *
  681. * Format of each item is approximately this one:
  682. *
  683. * @code
  684. * [
  685. * 'StructType',
  686. * [
  687. * ["Field1Name", ["DataType", "Int32"]],
  688. * ["Field2Name", ["DataType", "String"]],
  689. * ...
  690. * ]
  691. * ]
  692. * @endcode
  693. */
  694. virtual const TVector<NYT::TNode>& GetSchemas() const = 0;
  695. // TODO: make a neat schema builder
  696. /**
  697. * Get virtual columns for each input.
  698. *
  699. * Key of each mapping is column name, value is data schema in YQL format.
  700. */
  701. const TVector<THashMap<TString, NYT::TNode>>& GetAllVirtualColumns() const {
  702. if (AllVirtualColumns_.empty()) {
  703. AllVirtualColumns_ = TVector<THashMap<TString, NYT::TNode>>(GetSchemas().size());
  704. }
  705. return AllVirtualColumns_;
  706. }
  707. virtual bool ProvidesBlocks() const { return false; }
  708. };
  709. /**
  710. * Output specifications describe format for program output. Like input specifications, they cary knowledge
  711. * about program output type and how to convert unboxed values into that type.
  712. */
  713. class TOutputSpecBase {
  714. private:
  715. TMaybe<THashSet<TString>> OutputColumnsFilter_;
  716. public:
  717. virtual ~TOutputSpecBase() = default;
  718. public:
  719. /**
  720. * Get output data schema in YQL format (NB: not a YT format). The returned value must describe a structure
  721. * or a variant made of structures for fulti-table outputs (note: not all specs support multi-table output).
  722. *
  723. * See docs for the input spec's GetSchemas().
  724. *
  725. * Also TNode entity could be returned (NYT::TNode::CreateEntity()),
  726. * in which case output schema would be inferred from query and could be
  727. * obtained by Program::GetOutputSchema() call.
  728. */
  729. virtual const NYT::TNode& GetSchema() const = 0;
  730. /**
  731. * Get an output columns filter.
  732. *
  733. * Output columns filter is a set of column names that should be left in the output. All columns that are
  734. * not in this set will not be calculated. Depending on the output schema, they will be either removed
  735. * completely (for optional columns) or filled with defaults (for required columns).
  736. */
  737. const TMaybe<THashSet<TString>>& GetOutputColumnsFilter() const {
  738. return OutputColumnsFilter_;
  739. }
  740. /**
  741. * Set new output columns filter.
  742. */
  743. void SetOutputColumnsFilter(const TMaybe<THashSet<TString>>& outputColumnsFilter) {
  744. OutputColumnsFilter_ = outputColumnsFilter;
  745. }
  746. virtual bool AcceptsBlocks() const { return false; }
  747. };
  748. ////////////////////////////////////////////////////////////////////////////////////////////////////
  749. /**
  750. * Input spec traits provide information on how to process program input.
  751. *
  752. * Each input spec should create a template specialization for this class, in which it should provide several
  753. * static variables and functions.
  754. *
  755. * For example, a hypothetical example of implementing a JSON input spec would look like this:
  756. *
  757. * @code
  758. * class TJsonInputSpec: public TInputSpecBase {
  759. * // whatever magic you require for this spec
  760. * };
  761. *
  762. * template <>
  763. * class TInputSpecTraits<TJsonInputSpec> {
  764. * // write here four constants, one typedef and three static functions described below
  765. * };
  766. * @endcode
  767. *
  768. * @tparam T input spec type.
  769. */
  770. template <typename T>
  771. struct TInputSpecTraits {
  772. /// Safety flag which should be set to false in all template specializations of this class. Attempt to
  773. /// build a program using a spec with `IsPartial=true` will result in compilation error.
  774. static const constexpr bool IsPartial = true;
  775. /// Indicates whether this spec supports pull stream mode.
  776. static const constexpr bool SupportPullStreamMode = false;
  777. /// Indicates whether this spec supports pull list mode.
  778. static const constexpr bool SupportPullListMode = false;
  779. /// Indicates whether this spec supports push stream mode.
  780. static const constexpr bool SupportPushStreamMode = false;
  781. /// For push mode, indicates the return type of the builder's Process function.
  782. using TConsumerType = void;
  783. /// For pull stream mode, should take an input spec, a pull stream worker and whatever the user passed
  784. /// to the program's Apply function, create an unboxed values with a custom stream implementations
  785. /// and pass it to the worker's SetInput function for each input.
  786. template <typename ...A>
  787. static void PreparePullStreamWorker(const T&, IPullStreamWorker*, A&&...) {
  788. Y_UNREACHABLE();
  789. }
  790. /// For pull list mode, should take an input spec, a pull list worker and whatever the user passed
  791. /// to the program's Apply function, create an unboxed values with a custom list implementations
  792. /// and pass it to the worker's SetInput function for each input.
  793. template <typename ...A>
  794. static void PreparePullListWorker(const T&, IPullListWorker*, A&&...) {
  795. Y_UNREACHABLE();
  796. }
  797. /// For push stream mode, should take an input spec and a worker and create a consumer which will
  798. /// be returned to the user. The consumer should keep the worker alive until its own destruction.
  799. /// The return type of this function should exactly match the one defined in ConsumerType typedef.
  800. static TConsumerType MakeConsumer(const T&, TWorkerHolder<IPushStreamWorker>) {
  801. Y_UNREACHABLE();
  802. }
  803. };
  804. /**
  805. * Output spec traits provide information on how to process program output. Like with input specs, each output
  806. * spec requires an appropriate template specialization of this class.
  807. *
  808. * @tparam T output spec type.
  809. */
  810. template <typename T>
  811. struct TOutputSpecTraits {
  812. /// Safety flag which should be set to false in all template specializations of this class. Attempt to
  813. /// build a program using a spec with `IsPartial=false` will result in compilation error.
  814. static const constexpr bool IsPartial = true;
  815. /// Indicates whether this spec supports pull stream mode.
  816. static const constexpr bool SupportPullStreamMode = false;
  817. /// Indicates whether this spec supports pull list mode.
  818. static const constexpr bool SupportPullListMode = false;
  819. /// Indicates whether this spec supports push stream mode.
  820. static const constexpr bool SupportPushStreamMode = false;
  821. /// For pull stream mode, indicates the return type of the program's Apply function.
  822. using TPullStreamReturnType = void;
  823. /// For pull list mode, indicates the return type of the program's Apply function.
  824. using TPullListReturnType = void;
  825. /// For pull stream mode, should take an output spec and a worker and build a stream which will be returned
  826. /// to the user. The return type of this function must match the one specified in the PullStreamReturnType.
  827. static TPullStreamReturnType ConvertPullStreamWorkerToOutputType(const T&, TWorkerHolder<IPullStreamWorker>) {
  828. Y_UNREACHABLE();
  829. }
  830. /// For pull list mode, should take an output spec and a worker and build a list which will be returned
  831. /// to the user. The return type of this function must match the one specified in the PullListReturnType.
  832. static TPullListReturnType ConvertPullListWorkerToOutputType(const T&, TWorkerHolder<IPullListWorker>) {
  833. Y_UNREACHABLE();
  834. }
  835. /// For push stream mode, should take an output spec, a worker and whatever arguments the user passed
  836. /// to the program's Apply function, create a consumer for unboxed values and pass it to the worker's
  837. /// SetConsumer function.
  838. template <typename ...A>
  839. static void SetConsumerToWorker(const T&, IPushStreamWorker*, A&&...) {
  840. Y_UNREACHABLE();
  841. }
  842. };
  843. ////////////////////////////////////////////////////////////////////////////////////////////////////
  844. #define NOT_SPEC_MSG(spec_type) "passed class should be derived from " spec_type " spec base"
  845. #define PARTIAL_SPEC_MSG(spec_type) "this " spec_type " spec does not define its traits. Make sure you've passed " \
  846. "an " spec_type " spec and not some other object; also make sure you've included " \
  847. "all necessary headers. If you're developing a spec, make sure you have " \
  848. "a spec traits template specialization"
  849. #define UNSUPPORTED_MODE_MSG(spec_type, mode) "this " spec_type " spec does not support " mode " mode"
  850. class IProgram {
  851. public:
  852. virtual ~IProgram() = default;
  853. public:
  854. virtual const TInputSpecBase& GetInputSpecBase() const = 0;
  855. virtual const TOutputSpecBase& GetOutputSpecBase() const = 0;
  856. virtual const THashSet<TString>& GetUsedColumns(ui32) const = 0;
  857. virtual const THashSet<TString>& GetUsedColumns() const = 0;
  858. virtual NYT::TNode MakeInputSchema(ui32) const = 0;
  859. virtual NYT::TNode MakeInputSchema() const = 0;
  860. virtual NYT::TNode MakeOutputSchema() const = 0;
  861. virtual NYT::TNode MakeOutputSchema(ui32) const = 0;
  862. virtual NYT::TNode MakeOutputSchema(TStringBuf) const = 0;
  863. virtual NYT::TNode MakeFullOutputSchema() const = 0;
  864. virtual TIssues GetIssues() const = 0;
  865. virtual TString GetCompiledProgram() = 0;
  866. inline void MergeUsedColumns(THashSet<TString>& columns, ui32 inputIndex) {
  867. const auto& usedColumns = GetUsedColumns(inputIndex);
  868. columns.insert(usedColumns.begin(), usedColumns.end());
  869. }
  870. inline void MergeUsedColumns(THashSet<TString>& columns) {
  871. const auto& usedColumns = GetUsedColumns();
  872. columns.insert(usedColumns.begin(), usedColumns.end());
  873. }
  874. };
  875. template <typename TInputSpec, typename TOutputSpec, typename WorkerFactory>
  876. class TProgramCommon: public IProgram {
  877. static_assert(std::is_base_of<TInputSpecBase, TInputSpec>::value, NOT_SPEC_MSG("input"));
  878. static_assert(std::is_base_of<TOutputSpecBase, TOutputSpec>::value, NOT_SPEC_MSG("output"));
  879. protected:
  880. TInputSpec InputSpec_;
  881. TOutputSpec OutputSpec_;
  882. std::shared_ptr<WorkerFactory> WorkerFactory_;
  883. public:
  884. explicit TProgramCommon(
  885. TInputSpec inputSpec,
  886. TOutputSpec outputSpec,
  887. std::shared_ptr<WorkerFactory> workerFactory
  888. )
  889. : InputSpec_(inputSpec)
  890. , OutputSpec_(outputSpec)
  891. , WorkerFactory_(std::move(workerFactory))
  892. {
  893. }
  894. public:
  895. const TInputSpec& GetInputSpec() const {
  896. return InputSpec_;
  897. }
  898. const TOutputSpec& GetOutputSpec() const {
  899. return OutputSpec_;
  900. }
  901. const TInputSpecBase& GetInputSpecBase() const override {
  902. return InputSpec_;
  903. }
  904. const TOutputSpecBase& GetOutputSpecBase() const override {
  905. return OutputSpec_;
  906. }
  907. const THashSet<TString>& GetUsedColumns(ui32 inputIndex) const override {
  908. return WorkerFactory_->GetUsedColumns(inputIndex);
  909. }
  910. const THashSet<TString>& GetUsedColumns() const override {
  911. return WorkerFactory_->GetUsedColumns();
  912. }
  913. NYT::TNode MakeInputSchema(ui32 inputIndex) const override {
  914. return WorkerFactory_->MakeInputSchema(inputIndex);
  915. }
  916. NYT::TNode MakeInputSchema() const override {
  917. return WorkerFactory_->MakeInputSchema();
  918. }
  919. NYT::TNode MakeOutputSchema() const override {
  920. return WorkerFactory_->MakeOutputSchema();
  921. }
  922. NYT::TNode MakeOutputSchema(ui32 outputIndex) const override {
  923. return WorkerFactory_->MakeOutputSchema(outputIndex);
  924. }
  925. NYT::TNode MakeOutputSchema(TStringBuf outputName) const override {
  926. return WorkerFactory_->MakeOutputSchema(outputName);
  927. }
  928. NYT::TNode MakeFullOutputSchema() const override {
  929. return WorkerFactory_->MakeFullOutputSchema();
  930. }
  931. TIssues GetIssues() const override {
  932. return WorkerFactory_->GetIssues();
  933. }
  934. TString GetCompiledProgram() override {
  935. return WorkerFactory_->GetCompiledProgram();
  936. }
  937. };
  938. template <typename TInputSpec, typename TOutputSpec>
  939. class TPullStreamProgram final: public TProgramCommon<TInputSpec, TOutputSpec, IPullStreamWorkerFactory> {
  940. using TProgramCommon<TInputSpec, TOutputSpec, IPullStreamWorkerFactory>::WorkerFactory_;
  941. using TProgramCommon<TInputSpec, TOutputSpec, IPullStreamWorkerFactory>::InputSpec_;
  942. using TProgramCommon<TInputSpec, TOutputSpec, IPullStreamWorkerFactory>::OutputSpec_;
  943. public:
  944. using TProgramCommon<TInputSpec, TOutputSpec, IPullStreamWorkerFactory>::TProgramCommon;
  945. public:
  946. template <typename ...T>
  947. typename TOutputSpecTraits<TOutputSpec>::TPullStreamReturnType Apply(T&& ... t) {
  948. static_assert(!TInputSpecTraits<TInputSpec>::IsPartial, PARTIAL_SPEC_MSG("input"));
  949. static_assert(!TOutputSpecTraits<TOutputSpec>::IsPartial, PARTIAL_SPEC_MSG("output"));
  950. static_assert(TInputSpecTraits<TInputSpec>::SupportPullStreamMode, UNSUPPORTED_MODE_MSG("input", "pull stream"));
  951. static_assert(TOutputSpecTraits<TOutputSpec>::SupportPullStreamMode, UNSUPPORTED_MODE_MSG("output", "pull stream"));
  952. auto worker = WorkerFactory_->MakeWorker();
  953. TInputSpecTraits<TInputSpec>::PreparePullStreamWorker(InputSpec_, worker.Get(), std::forward<T>(t)...);
  954. return TOutputSpecTraits<TOutputSpec>::ConvertPullStreamWorkerToOutputType(OutputSpec_, std::move(worker));
  955. }
  956. };
  957. template <typename TInputSpec, typename TOutputSpec>
  958. class TPullListProgram final: public TProgramCommon<TInputSpec, TOutputSpec, IPullListWorkerFactory> {
  959. using TProgramCommon<TInputSpec, TOutputSpec, IPullListWorkerFactory>::WorkerFactory_;
  960. using TProgramCommon<TInputSpec, TOutputSpec, IPullListWorkerFactory>::InputSpec_;
  961. using TProgramCommon<TInputSpec, TOutputSpec, IPullListWorkerFactory>::OutputSpec_;
  962. public:
  963. using TProgramCommon<TInputSpec, TOutputSpec, IPullListWorkerFactory>::TProgramCommon;
  964. public:
  965. template <typename ...T>
  966. typename TOutputSpecTraits<TOutputSpec>::TPullListReturnType Apply(T&& ... t) {
  967. static_assert(!TInputSpecTraits<TInputSpec>::IsPartial, PARTIAL_SPEC_MSG("input"));
  968. static_assert(!TOutputSpecTraits<TOutputSpec>::IsPartial, PARTIAL_SPEC_MSG("output"));
  969. static_assert(TInputSpecTraits<TInputSpec>::SupportPullListMode, UNSUPPORTED_MODE_MSG("input", "pull list"));
  970. static_assert(TOutputSpecTraits<TOutputSpec>::SupportPullListMode, UNSUPPORTED_MODE_MSG("output", "pull list"));
  971. auto worker = WorkerFactory_->MakeWorker();
  972. TInputSpecTraits<TInputSpec>::PreparePullListWorker(InputSpec_, worker.Get(), std::forward<T>(t)...);
  973. return TOutputSpecTraits<TOutputSpec>::ConvertPullListWorkerToOutputType(OutputSpec_, std::move(worker));
  974. }
  975. };
  976. template <typename TInputSpec, typename TOutputSpec>
  977. class TPushStreamProgram final: public TProgramCommon<TInputSpec, TOutputSpec, IPushStreamWorkerFactory> {
  978. using TProgramCommon<TInputSpec, TOutputSpec, IPushStreamWorkerFactory>::WorkerFactory_;
  979. using TProgramCommon<TInputSpec, TOutputSpec, IPushStreamWorkerFactory>::InputSpec_;
  980. using TProgramCommon<TInputSpec, TOutputSpec, IPushStreamWorkerFactory>::OutputSpec_;
  981. public:
  982. using TProgramCommon<TInputSpec, TOutputSpec, IPushStreamWorkerFactory>::TProgramCommon;
  983. public:
  984. template <typename ...T>
  985. typename TInputSpecTraits<TInputSpec>::TConsumerType Apply(T&& ... t) {
  986. static_assert(!TInputSpecTraits<TInputSpec>::IsPartial, PARTIAL_SPEC_MSG("input"));
  987. static_assert(!TOutputSpecTraits<TOutputSpec>::IsPartial, PARTIAL_SPEC_MSG("output"));
  988. static_assert(TInputSpecTraits<TInputSpec>::SupportPushStreamMode, UNSUPPORTED_MODE_MSG("input", "push stream"));
  989. static_assert(TOutputSpecTraits<TOutputSpec>::SupportPushStreamMode, UNSUPPORTED_MODE_MSG("output", "push stream"));
  990. auto worker = WorkerFactory_->MakeWorker();
  991. TOutputSpecTraits<TOutputSpec>::SetConsumerToWorker(OutputSpec_, worker.Get(), std::forward<T>(t)...);
  992. return TInputSpecTraits<TInputSpec>::MakeConsumer(InputSpec_, std::move(worker));
  993. }
  994. };
  995. #undef NOT_SPEC_MSG
  996. #undef PARTIAL_SPEC_MSG
  997. #undef UNSUPPORTED_MODE_MSG
  998. ////////////////////////////////////////////////////////////////////////////////////////////////////
  999. /**
  1000. * Configure global logging facilities. Affects all YQL modules.
  1001. */
  1002. void ConfigureLogging(const TLoggingOptions& = {});
  1003. /**
  1004. * Create a new program factory.
  1005. * Custom logging initialization could be preformed by a call to the ConfigureLogging method beforehand.
  1006. * If the ConfigureLogging method has not been called the default logging initialization will be performed.
  1007. */
  1008. IProgramFactoryPtr MakeProgramFactory(const TProgramFactoryOptions& = {});
  1009. }
  1010. }
  1011. Y_DECLARE_OUT_SPEC(inline, NYql::NPureCalc::TCompileError, stream, value) {
  1012. stream << value.AsStrBuf() << Endl << "Issues:" << Endl << value.GetIssues() << Endl << Endl << "Yql:" << Endl <<value.GetYql();
  1013. }