interface.h 50 KB

1234567891011121314151617181920212223242526272829303132333435363738394041424344454647484950515253545556575859606162636465666768697071727374757677787980818283848586878889909192939495969798991001011021031041051061071081091101111121131141151161171181191201211221231241251261271281291301311321331341351361371381391401411421431441451461471481491501511521531541551561571581591601611621631641651661671681691701711721731741751761771781791801811821831841851861871881891901911921931941951961971981992002012022032042052062072082092102112122132142152162172182192202212222232242252262272282292302312322332342352362372382392402412422432442452462472482492502512522532542552562572582592602612622632642652662672682692702712722732742752762772782792802812822832842852862872882892902912922932942952962972982993003013023033043053063073083093103113123133143153163173183193203213223233243253263273283293303313323333343353363373383393403413423433443453463473483493503513523533543553563573583593603613623633643653663673683693703713723733743753763773783793803813823833843853863873883893903913923933943953963973983994004014024034044054064074084094104114124134144154164174184194204214224234244254264274284294304314324334344354364374384394404414424434444454464474484494504514524534544554564574584594604614624634644654664674684694704714724734744754764774784794804814824834844854864874884894904914924934944954964974984995005015025035045055065075085095105115125135145155165175185195205215225235245255265275285295305315325335345355365375385395405415425435445455465475485495505515525535545555565575585595605615625635645655665675685695705715725735745755765775785795805815825835845855865875885895905915925935945955965975985996006016026036046056066076086096106116126136146156166176186196206216226236246256266276286296306316326336346356366376386396406416426436446456466476486496506516526536546556566576586596606616626636646656666676686696706716726736746756766776786796806816826836846856866876886896906916926936946956966976986997007017027037047057067077087097107117127137147157167177187197207217227237247257267277287297307317327337347357367377387397407417427437447457467477487497507517527537547557567577587597607617627637647657667677687697707717727737747757767777787797807817827837847857867877887897907917927937947957967977987998008018028038048058068078088098108118128138148158168178188198208218228238248258268278288298308318328338348358368378388398408418428438448458468478488498508518528538548558568578588598608618628638648658668678688698708718728738748758768778788798808818828838848858868878888898908918928938948958968978988999009019029039049059069079089099109119129139149159169179189199209219229239249259269279289299309319329339349359369379389399409419429439449459469479489499509519529539549559569579589599609619629639649659669679689699709719729739749759769779789799809819829839849859869879889899909919929939949959969979989991000100110021003100410051006100710081009101010111012101310141015101610171018101910201021102210231024102510261027102810291030103110321033103410351036103710381039104010411042104310441045104610471048104910501051105210531054105510561057105810591060106110621063106410651066106710681069107010711072107310741075107610771078107910801081108210831084108510861087108810891090109110921093109410951096109710981099110011011102110311041105110611071108110911101111111211131114111511161117111811191120112111221123112411251126112711281129113011311132113311341135113611371138113911401141114211431144114511461147114811491150115111521153115411551156115711581159116011611162116311641165116611671168116911701171117211731174117511761177117811791180
  1. #pragma once
  2. #include "fwd.h"
  3. #include "wrappers.h"
  4. #include <yql/essentials/core/user_data/yql_user_data.h>
  5. #include <yql/essentials/public/udf/udf_value.h>
  6. #include <yql/essentials/public/udf/udf_counter.h>
  7. #include <yql/essentials/public/udf/udf_registrator.h>
  8. #include <yql/essentials/public/issue/yql_issue.h>
  9. #include <library/cpp/yson/node/node.h>
  10. #include <library/cpp/logger/priority.h>
  11. #include <util/generic/ptr.h>
  12. #include <util/generic/maybe.h>
  13. #include <util/generic/hash_set.h>
  14. #include <util/generic/string.h>
  15. #include <util/stream/output.h>
  16. class ITimeProvider;
  17. namespace NKikimr {
  18. namespace NMiniKQL {
  19. class TScopedAlloc;
  20. class IComputationGraph;
  21. class IFunctionRegistry;
  22. class TTypeEnvironment;
  23. class TType;
  24. class TStructType;
  25. }
  26. }
  27. namespace NYql {
  28. namespace NPureCalc {
  29. /**
  30. * SQL or s-expression translation error.
  31. */
  32. class TCompileError: public yexception {
  33. private:
  34. TString Yql_;
  35. TString Issues_;
  36. public:
  37. // TODO: maybe accept an actual list of issues here?
  38. // See https://a.yandex-team.ru/arc/review/439403/details#comment-778237
  39. TCompileError(TString yql, TString issues)
  40. : Yql_(std::move(yql))
  41. , Issues_(std::move(issues))
  42. {
  43. }
  44. public:
  45. /**
  46. * Get the sql query which caused the error (if there is one available).
  47. */
  48. const TString& GetYql() const {
  49. return Yql_;
  50. }
  51. /**
  52. * Get detailed description for all errors and warnings that happened during sql translation.
  53. */
  54. const TString& GetIssues() const {
  55. return Issues_;
  56. }
  57. };
  58. ////////////////////////////////////////////////////////////////////////////////////////////////////
  59. /**
  60. * A generic input stream of objects.
  61. */
  62. template <typename T>
  63. class IStream {
  64. public:
  65. virtual ~IStream() = default;
  66. public:
  67. /**
  68. * Pops and returns a next value in the stream. If the stream is finished, should return some sentinel object.
  69. *
  70. * Depending on return type, this function may not transfer object ownership to a user.
  71. * Thus, the stream may manage the returned object * itself.
  72. * That is, the returned object's lifetime may be bound to the input stream lifetime; it may be destroyed
  73. * upon calling Fetch() or upon destroying the stream, whichever happens first.
  74. */
  75. virtual T Fetch() = 0;
  76. };
  77. /**
  78. * Create a new stream which applies the given functor to the elements of the original stream.
  79. */
  80. template <typename TOld, typename TNew, typename TFunctor>
  81. inline THolder<IStream<TNew>> MapStream(THolder<IStream<TOld>> stream, TFunctor functor) {
  82. return THolder(new NPrivate::TMappingStream<TNew, TOld, TFunctor>(std::move(stream), std::move(functor)));
  83. };
  84. /**
  85. * Convert stream of objects into a stream of potentially incompatible objects.
  86. *
  87. * This conversion applies static cast to the output of the original stream. Use with caution!
  88. */
  89. /// @{
  90. template <
  91. typename TNew, typename TOld,
  92. std::enable_if_t<!std::is_same<TNew, TOld>::value>* = nullptr>
  93. inline THolder<IStream<TNew>> ConvertStreamUnsafe(THolder<IStream<TOld>> stream) {
  94. return MapStream<TOld, TNew>(std::move(stream), [](TOld x) -> TNew { return static_cast<TNew>(x); });
  95. }
  96. template <typename T>
  97. inline THolder<IStream<T>> ConvertStreamUnsafe(THolder<IStream<T>> stream) {
  98. return stream;
  99. }
  100. /// @}
  101. /**
  102. * Convert stream of objects into a stream of compatible objects.
  103. *
  104. * Note: each conversion adds one level of indirection so avoid them if possible.
  105. */
  106. template <typename TNew, typename TOld, std::enable_if_t<std::is_convertible<TOld, TNew>::value>* = nullptr>
  107. inline THolder<IStream<TNew>> ConvertStream(THolder<IStream<TOld>> stream) {
  108. return ConvertStreamUnsafe<TNew, TOld>(std::move(stream));
  109. }
  110. ////////////////////////////////////////////////////////////////////////////////////////////////////
  111. /**
  112. * A generic push consumer.
  113. */
  114. template <typename T>
  115. class IConsumer {
  116. public:
  117. virtual ~IConsumer() = default;
  118. public:
  119. /**
  120. * Feed an object to consumer.
  121. *
  122. * Depending on argument type, the consumer may not take ownership of the passed object;
  123. * in that case it is the caller responsibility to manage the object lifetime after passing it to this method.
  124. *
  125. * The passed object can be destroyed after the consumer returns from this function; the consumer should
  126. * not store pointer to the passed object or the passed object itself without taking all necessary precautions
  127. * to ensure that the pointer or the object stays valid after returning.
  128. */
  129. virtual void OnObject(T) = 0;
  130. /**
  131. * Close the consumer and run finalization logic. Calling OnObject after calling this function is an error.
  132. */
  133. virtual void OnFinish() = 0;
  134. };
  135. /**
  136. * Create a new consumer which applies the given functor to objects before .
  137. */
  138. template <typename TOld, typename TNew, typename TFunctor>
  139. inline THolder<IConsumer<TNew>> MapConsumer(THolder<IConsumer<TOld>> stream, TFunctor functor) {
  140. return THolder(new NPrivate::TMappingConsumer<TNew, TOld, TFunctor>(std::move(stream), std::move(functor)));
  141. };
  142. /**
  143. * Convert consumer of objects into a consumer of potentially incompatible objects.
  144. *
  145. * This conversion applies static cast to the input value. Use with caution.
  146. */
  147. /// @{
  148. template <
  149. typename TNew, typename TOld,
  150. std::enable_if_t<!std::is_same<TNew, TOld>::value>* = nullptr>
  151. inline THolder<IConsumer<TNew>> ConvertConsumerUnsafe(THolder<IConsumer<TOld>> consumer) {
  152. return MapConsumer<TOld, TNew>(std::move(consumer), [](TNew x) -> TOld { return static_cast<TOld>(x); });
  153. }
  154. template <typename T>
  155. inline THolder<IConsumer<T>> ConvertConsumerUnsafe(THolder<IConsumer<T>> consumer) {
  156. return consumer;
  157. }
  158. /// @}
  159. /**
  160. * Convert consumer of objects into a consumer of compatible objects.
  161. *
  162. * Note: each conversion adds one level of indirection so avoid them if possible.
  163. */
  164. template <typename TNew, typename TOld, std::enable_if_t<std::is_convertible<TNew, TOld>::value>* = nullptr>
  165. inline THolder<IConsumer<TNew>> ConvertConsumer(THolder<IConsumer<TOld>> consumer) {
  166. return ConvertConsumerUnsafe<TNew, TOld>(std::move(consumer));
  167. }
  168. /**
  169. * Create a consumer which holds a non-owning pointer to the given consumer
  170. * and passes all messages to the latter.
  171. */
  172. template <typename T, typename C>
  173. THolder<NPrivate::TNonOwningConsumer<T, C>> MakeNonOwningConsumer(C consumer) {
  174. return MakeHolder<NPrivate::TNonOwningConsumer<T, C>>(consumer);
  175. }
  176. ////////////////////////////////////////////////////////////////////////////////////////////////////
  177. /**
  178. * Logging options.
  179. */
  180. struct TLoggingOptions final {
  181. public:
  182. /// Logging level for messages generated during compilation.
  183. ELogPriority LogLevel_; // TODO: rename to LogLevel
  184. /// Where to write log messages.
  185. IOutputStream* LogDestination;
  186. public:
  187. TLoggingOptions();
  188. /**
  189. * Set a new logging level.
  190. *
  191. * @return reference to self, to allow method chaining.
  192. */
  193. TLoggingOptions& SetLogLevel(ELogPriority);
  194. /**
  195. * Set a new logging destination.
  196. *
  197. * @return reference to self, to allow method chaining.
  198. */
  199. TLoggingOptions& SetLogDestination(IOutputStream*);
  200. };
  201. /**
  202. * General options for program factory.
  203. */
  204. struct TProgramFactoryOptions final {
  205. public:
  206. /// Path to a directory with compiled UDFs. Leave empty to disable loading external UDFs.
  207. TString UdfsDir_; // TODO: rename to UDFDir
  208. /// List of available external resources, e.g. files, UDFs, libraries.
  209. TVector<NUserData::TUserData> UserData_; // TODO: rename to UserData
  210. /// LLVM settings. Assign "OFF" to disable LLVM, empty string for default settings.
  211. TString LLVMSettings;
  212. /// Block engine settings. Assign "force" to unconditionally enable
  213. /// it, "disable" for turn it off and "auto" to left the final
  214. /// decision to the platform heuristics.
  215. TString BlockEngineSettings;
  216. /// Output stream to dump the compiled and optimized expressions.
  217. IOutputStream* ExprOutputStream;
  218. /// Provider for generic counters which can be used to export statistics from UDFs.
  219. NKikimr::NUdf::ICountersProvider* CountersProvider;
  220. /// YT Type V3 flags for Skiff/Yson serialization.
  221. ui64 NativeYtTypeFlags;
  222. /// Seed for deterministic time provider
  223. TMaybe<ui64> DeterministicTimeProviderSeed;
  224. /// Use special system columns to support tables naming (supports non empty ``TablePath()``/``TableName()``)
  225. bool UseSystemColumns;
  226. /// Reuse allocated workers
  227. bool UseWorkerPool;
  228. public:
  229. TProgramFactoryOptions();
  230. public:
  231. /**
  232. * Set a new path to a directory with UDFs.
  233. *
  234. * @return reference to self, to allow method chaining.
  235. */
  236. TProgramFactoryOptions& SetUDFsDir(TStringBuf);
  237. /**
  238. * Add a new library to the UserData list.
  239. *
  240. * @param disposition where the resource resides, e.g. on filesystem, in memory, etc.
  241. * NB: URL disposition is not supported.
  242. * @param name name of the resource.
  243. * @param content depending on disposition, either path to the resource or its content.
  244. * @return reference to self, to allow method chaining.
  245. */
  246. TProgramFactoryOptions& AddLibrary(NUserData::EDisposition disposition, TStringBuf name, TStringBuf content);
  247. /**
  248. * Add a new file to the UserData list.
  249. *
  250. * @param disposition where the resource resides, e.g. on filesystem, in memory, etc.
  251. * NB: URL disposition is not supported.
  252. * @param name name of the resource.
  253. * @param content depending on disposition, either path to the resource or its content.
  254. * @return reference to self, to allow method chaining.
  255. */
  256. TProgramFactoryOptions& AddFile(NUserData::EDisposition disposition, TStringBuf name, TStringBuf content);
  257. /**
  258. * Add a new UDF to the UserData list.
  259. *
  260. * @param disposition where the resource resides, e.g. on filesystem, in memory, etc.
  261. * NB: URL disposition is not supported.
  262. * @param name name of the resource.
  263. * @param content depending on disposition, either path to the resource or its content.
  264. * @return reference to self, to allow method chaining.
  265. */
  266. TProgramFactoryOptions& AddUDF(NUserData::EDisposition disposition, TStringBuf name, TStringBuf content);
  267. /**
  268. * Set new LLVM settings.
  269. *
  270. * @return reference to self, to allow method chaining.
  271. */
  272. TProgramFactoryOptions& SetLLVMSettings(TStringBuf llvm_settings);
  273. /**
  274. * Set new block engine settings.
  275. *
  276. * @return reference to self, to allow method chaining.
  277. */
  278. TProgramFactoryOptions& SetBlockEngineSettings(TStringBuf blockEngineSettings);
  279. /**
  280. * Set the stream to dump the compiled and optimized expressions.
  281. *
  282. * @return reference to self, to allow method chaining.
  283. */
  284. TProgramFactoryOptions& SetExprOutputStream(IOutputStream* exprOutputStream);
  285. /**
  286. * Set new counters provider. Passed pointer should stay alive for as long as the processor factory
  287. * stays alive.
  288. *
  289. * @return reference to self, to allow method chaining.
  290. */
  291. TProgramFactoryOptions& SetCountersProvider(NKikimr::NUdf::ICountersProvider* countersProvider);
  292. /**
  293. * Set new YT Type V3 mode. Deprecated method. Use SetNativeYtTypeFlags instead
  294. *
  295. * @return reference to self, to allow method chaining.
  296. */
  297. TProgramFactoryOptions& SetUseNativeYtTypes(bool useNativeTypes);
  298. /**
  299. * Set YT Type V3 flags.
  300. *
  301. * @return reference to self, to allow method chaining.
  302. */
  303. TProgramFactoryOptions& SetNativeYtTypeFlags(ui64 nativeTypeFlags);
  304. /**
  305. * Set seed for deterministic time provider.
  306. *
  307. * @return reference to self, to allow method chaining.
  308. */
  309. TProgramFactoryOptions& SetDeterministicTimeProviderSeed(TMaybe<ui64> seed);
  310. /**
  311. * Set new flag whether to allow using system columns or not.
  312. *
  313. * @return reference to self, to allow method chaining.
  314. */
  315. TProgramFactoryOptions& SetUseSystemColumns(bool useSystemColumns);
  316. /**
  317. * Set new flag whether to allow reusing workers or not.
  318. *
  319. * @return reference to self, to allow method chaining.
  320. */
  321. TProgramFactoryOptions& SetUseWorkerPool(bool useWorkerPool);
  322. };
  323. ////////////////////////////////////////////////////////////////////////////////////////////////////
  324. /**
  325. * What exactly are we parsing: SQL or an s-expression.
  326. */
  327. enum class ETranslationMode {
  328. SQL /* "SQL" */,
  329. SExpr /* "s-expression" */,
  330. Mkql /* "mkql" */,
  331. PG /* PostgreSQL */
  332. };
  333. /**
  334. * A facility for compiling sql and s-expressions and making programs from them.
  335. */
  336. class IProgramFactory: public TThrRefBase {
  337. protected:
  338. virtual IPullStreamWorkerFactoryPtr MakePullStreamWorkerFactory(const TInputSpecBase&, const TOutputSpecBase&, TString, ETranslationMode, ui16) = 0;
  339. virtual IPullListWorkerFactoryPtr MakePullListWorkerFactory(const TInputSpecBase&, const TOutputSpecBase&, TString, ETranslationMode, ui16) = 0;
  340. virtual IPushStreamWorkerFactoryPtr MakePushStreamWorkerFactory(const TInputSpecBase&, const TOutputSpecBase&, TString, ETranslationMode, ui16) = 0;
  341. public:
  342. /**
  343. * Add new udf module. It's not specified whether adding new modules will affect existing programs
  344. * (theoretical answer is 'no').
  345. */
  346. virtual void AddUdfModule(const TStringBuf&, NKikimr::NUdf::TUniquePtr<NKikimr::NUdf::IUdfModule>&&) = 0;
  347. // TODO: support setting udf modules via factory options.
  348. /**
  349. * Set new counters provider, override one that was specified via factory options. Note that existing
  350. * programs will still reference the previous provider.
  351. */
  352. virtual void SetCountersProvider(NKikimr::NUdf::ICountersProvider*) = 0;
  353. // TODO: support setting providers via factory options.
  354. template <typename TInputSpec, typename TOutputSpec>
  355. THolder<TPullStreamProgram<TInputSpec, TOutputSpec>> MakePullStreamProgram(
  356. TInputSpec inputSpec, TOutputSpec outputSpec, TString query, ETranslationMode mode = ETranslationMode::SQL, ui16 syntaxVersion = 1
  357. ) {
  358. auto workerFactory = MakePullStreamWorkerFactory(inputSpec, outputSpec, std::move(query), mode, syntaxVersion);
  359. return MakeHolder<TPullStreamProgram<TInputSpec, TOutputSpec>>(std::move(inputSpec), std::move(outputSpec), workerFactory);
  360. }
  361. template <typename TInputSpec, typename TOutputSpec>
  362. THolder<TPullListProgram<TInputSpec, TOutputSpec>> MakePullListProgram(
  363. TInputSpec inputSpec, TOutputSpec outputSpec, TString query, ETranslationMode mode = ETranslationMode::SQL, ui16 syntaxVersion = 1
  364. ) {
  365. auto workerFactory = MakePullListWorkerFactory(inputSpec, outputSpec, std::move(query), mode, syntaxVersion);
  366. return MakeHolder<TPullListProgram<TInputSpec, TOutputSpec>>(std::move(inputSpec), std::move(outputSpec), workerFactory);
  367. }
  368. template <typename TInputSpec, typename TOutputSpec>
  369. THolder<TPushStreamProgram<TInputSpec, TOutputSpec>> MakePushStreamProgram(
  370. TInputSpec inputSpec, TOutputSpec outputSpec, TString query, ETranslationMode mode = ETranslationMode::SQL, ui16 syntaxVersion = 1
  371. ) {
  372. auto workerFactory = MakePushStreamWorkerFactory(inputSpec, outputSpec, std::move(query), mode, syntaxVersion);
  373. return MakeHolder<TPushStreamProgram<TInputSpec, TOutputSpec>>(std::move(inputSpec), std::move(outputSpec), workerFactory);
  374. }
  375. };
  376. ////////////////////////////////////////////////////////////////////////////////////////////////////
  377. /**
  378. * A facility for creating workers. Despite being a part of a public API, worker factory is not used directly.
  379. */
  380. class IWorkerFactory: public std::enable_shared_from_this<IWorkerFactory> {
  381. public:
  382. virtual ~IWorkerFactory() = default;
  383. /**
  384. * Get input column names for specified input that are actually used in the query.
  385. */
  386. virtual const THashSet<TString>& GetUsedColumns(ui32) const = 0;
  387. /**
  388. * Overload for single-input programs.
  389. */
  390. virtual const THashSet<TString>& GetUsedColumns() const = 0;
  391. /**
  392. * Make input type schema for specified input as deduced by program optimizer. This schema is equivalent
  393. * to one provided by input spec up to the order of the fields in structures.
  394. */
  395. virtual NYT::TNode MakeInputSchema(ui32) const = 0;
  396. /**
  397. * Overload for single-input programs.
  398. */
  399. virtual NYT::TNode MakeInputSchema() const = 0;
  400. /**
  401. * Make output type schema as deduced by program optimizer. If output spec provides its own schema, than
  402. * this schema is equivalent to one provided by output spec up to the order of the fields in structures.
  403. */
  404. /// @{
  405. /**
  406. * Overload for single-table output programs (i.e. output type is struct).
  407. */
  408. virtual NYT::TNode MakeOutputSchema() const = 0;
  409. /**
  410. * Overload for multi-table output programs (i.e. output type is variant over tuple).
  411. */
  412. virtual NYT::TNode MakeOutputSchema(ui32) const = 0;
  413. /**
  414. * Overload for multi-table output programs (i.e. output type is variant over struct).
  415. */
  416. virtual NYT::TNode MakeOutputSchema(TStringBuf) const = 0;
  417. /// @}
  418. /**
  419. * Make full output schema. For single-output programs returns struct type, for multi-output programs
  420. * returns variant type.
  421. *
  422. * Warning: calling this function may result in extended memory usage for large number of output tables.
  423. */
  424. virtual NYT::TNode MakeFullOutputSchema() const = 0;
  425. /**
  426. * Get compilation issues
  427. */
  428. virtual TIssues GetIssues() const = 0;
  429. /**
  430. * Get precompiled mkql program
  431. */
  432. virtual TString GetCompiledProgram() = 0;
  433. /**
  434. * Return a worker to the factory for possible reuse
  435. */
  436. virtual void ReturnWorker(IWorker* worker) = 0;
  437. };
  438. class TReleaseWorker {
  439. public:
  440. template <class T>
  441. static inline void Destroy(T* t) noexcept {
  442. t->Release();
  443. }
  444. };
  445. template <class T>
  446. using TWorkerHolder = THolder<T, TReleaseWorker>;
  447. /**
  448. * Factory for generating pull stream workers.
  449. */
  450. class IPullStreamWorkerFactory: public IWorkerFactory {
  451. public:
  452. /**
  453. * Create a new pull stream worker.
  454. */
  455. virtual TWorkerHolder<IPullStreamWorker> MakeWorker() = 0;
  456. };
  457. /**
  458. * Factory for generating pull list workers.
  459. */
  460. class IPullListWorkerFactory: public IWorkerFactory {
  461. public:
  462. /**
  463. * Create a new pull list worker.
  464. */
  465. virtual TWorkerHolder<IPullListWorker> MakeWorker() = 0;
  466. };
  467. /**
  468. * Factory for generating push stream workers.
  469. */
  470. class IPushStreamWorkerFactory: public IWorkerFactory {
  471. public:
  472. /**
  473. * Create a new push stream worker.
  474. */
  475. virtual TWorkerHolder<IPushStreamWorker> MakeWorker() = 0;
  476. };
  477. ////////////////////////////////////////////////////////////////////////////////////////////////////
  478. /**
  479. * Worker is a central part of any program instance. It contains current computation state
  480. * (called computation graph) and objects required to work with it, including an allocator for unboxed values.
  481. *
  482. * Usually, users do not interact with workers directly. They use program instance entry points such as streams
  483. * and consumers instead. The only case when one would have to to interact with workers is when implementing
  484. * custom io-specification.
  485. */
  486. class IWorker {
  487. protected:
  488. friend class TReleaseWorker;
  489. /**
  490. * Cleanup the worker and return to a worker factory for reuse
  491. */
  492. virtual void Release() = 0;
  493. public:
  494. virtual ~IWorker() = default;
  495. public:
  496. /**
  497. * Number of inputs for this program.
  498. */
  499. virtual ui32 GetInputsCount() const = 0;
  500. /**
  501. * MiniKQL input struct type of specified input for this program. Type is equivalent to the deduced input
  502. * schema (see IWorker::MakeInputSchema())
  503. *
  504. * If ``original`` is set to ``true``, returns type without virtual system columns.
  505. */
  506. virtual const NKikimr::NMiniKQL::TStructType* GetInputType(ui32, bool original = false) const = 0;
  507. /**
  508. * Overload for single-input programs.
  509. */
  510. virtual const NKikimr::NMiniKQL::TStructType* GetInputType(bool original = false) const = 0;
  511. /**
  512. * MiniKQL input struct type of the specified input for this program.
  513. * The returned type is the actual type of the specified input node.
  514. */
  515. virtual const NKikimr::NMiniKQL::TStructType* GetRawInputType(ui32) const = 0;
  516. /**
  517. * Overload for single-input programs.
  518. */
  519. virtual const NKikimr::NMiniKQL::TStructType* GetRawInputType() const = 0;
  520. /**
  521. * MiniKQL output struct type for this program. The returned type is equivalent to the deduced output
  522. * schema (see IWorker::MakeFullOutputSchema()).
  523. */
  524. virtual const NKikimr::NMiniKQL::TType* GetOutputType() const = 0;
  525. /**
  526. * MiniKQL output struct type for this program. The returned type is
  527. * the actual type of the root node.
  528. */
  529. virtual const NKikimr::NMiniKQL::TType* GetRawOutputType() const = 0;
  530. /**
  531. * Make input type schema for specified input as deduced by program optimizer. This schema is equivalent
  532. * to one provided by input spec up to the order of the fields in structures.
  533. */
  534. virtual NYT::TNode MakeInputSchema(ui32) const = 0;
  535. /**
  536. * Overload for single-input programs.
  537. */
  538. virtual NYT::TNode MakeInputSchema() const = 0;
  539. /**
  540. * Make output type schema as deduced by program optimizer. If output spec provides its own schema, than
  541. * this schema is equivalent to one provided by output spec up to the order of the fields in structures.
  542. */
  543. /// @{
  544. /**
  545. * Overload for single-table output programs (i.e. output type is struct).
  546. */
  547. virtual NYT::TNode MakeOutputSchema() const = 0;
  548. /**
  549. * Overload for multi-table output programs (i.e. output type is variant over tuple).
  550. */
  551. virtual NYT::TNode MakeOutputSchema(ui32) const = 0;
  552. /**
  553. * Overload for multi-table output programs (i.e. output type is variant over struct).
  554. */
  555. virtual NYT::TNode MakeOutputSchema(TStringBuf) const = 0;
  556. /// @}
  557. /**
  558. * Generates full output schema. For single-output programs returns struct type, for multi-output programs
  559. * returns variant type.
  560. *
  561. * Warning: calling this function may result in extended memory usage for large number of output tables.
  562. */
  563. virtual NYT::TNode MakeFullOutputSchema() const = 0;
  564. /**
  565. * Get scoped alloc used in this worker.
  566. */
  567. virtual NKikimr::NMiniKQL::TScopedAlloc& GetScopedAlloc() = 0;
  568. /**
  569. * Get computation graph.
  570. */
  571. virtual NKikimr::NMiniKQL::IComputationGraph& GetGraph() = 0;
  572. /**
  573. * Get function registry for this worker.
  574. */
  575. virtual const NKikimr::NMiniKQL::IFunctionRegistry& GetFunctionRegistry() const = 0;
  576. /**
  577. * Get type environment for this worker.
  578. */
  579. virtual NKikimr::NMiniKQL::TTypeEnvironment& GetTypeEnvironment() = 0;
  580. /**
  581. * Get llvm settings for this worker.
  582. */
  583. virtual const TString& GetLLVMSettings() const = 0;
  584. /**
  585. * Get YT Type V3 flags
  586. */
  587. virtual ui64 GetNativeYtTypeFlags() const = 0;
  588. /**
  589. * Get time provider
  590. */
  591. virtual ITimeProvider* GetTimeProvider() const = 0;
  592. };
  593. /**
  594. * Worker which operates in pull stream mode.
  595. */
  596. class IPullStreamWorker: public IWorker {
  597. public:
  598. /**
  599. * Set input computation graph node for specified input. The passed unboxed value should be a stream of
  600. * structs. It should be created via the allocator associated with this very worker.
  601. * This function can only be called once for each input.
  602. */
  603. virtual void SetInput(NKikimr::NUdf::TUnboxedValue&&, ui32) = 0;
  604. /**
  605. * Get the output computation graph node. The returned node will be a stream of structs or variants.
  606. * This function cannot be called before setting an input value.
  607. */
  608. virtual NKikimr::NUdf::TUnboxedValue& GetOutput() = 0;
  609. };
  610. /**
  611. * Worker which operates in pull list mode.
  612. */
  613. class IPullListWorker: public IWorker {
  614. public:
  615. /**
  616. * Set input computation graph node for specified input. The passed unboxed value should be a list of
  617. * structs. It should be created via the allocator associated with this very worker.
  618. * This function can only be called once for each index.
  619. */
  620. virtual void SetInput(NKikimr::NUdf::TUnboxedValue&&, ui32) = 0;
  621. /**
  622. * Get the output computation graph node. The returned node will be a list of structs or variants.
  623. * This function cannot be called before setting an input value.
  624. */
  625. virtual NKikimr::NUdf::TUnboxedValue& GetOutput() = 0;
  626. /**
  627. * Get iterator over the output list.
  628. */
  629. virtual NKikimr::NUdf::TUnboxedValue& GetOutputIterator() = 0;
  630. /**
  631. * Reset iterator to the beginning of the output list. After calling this function, GetOutputIterator()
  632. * will return a fresh iterator; all previously returned iterators will become invalid.
  633. */
  634. virtual void ResetOutputIterator() = 0;
  635. };
  636. /**
  637. * Worker which operates in push stream mode.
  638. */
  639. class IPushStreamWorker: public IWorker {
  640. public:
  641. /**
  642. * Set a consumer where the worker will relay its output. This function can only be called once.
  643. */
  644. virtual void SetConsumer(THolder<IConsumer<const NKikimr::NUdf::TUnboxedValue*>>) = 0;
  645. /**
  646. * Push new value to the graph, than feed all new output to the consumer. Values cannot be pushed before
  647. * assigning a consumer.
  648. */
  649. virtual void Push(NKikimr::NUdf::TUnboxedValue&&) = 0;
  650. /**
  651. * Send finish event and clear the computation graph. No new values will be accepted.
  652. */
  653. virtual void OnFinish() = 0;
  654. };
  655. ////////////////////////////////////////////////////////////////////////////////////////////////////
  656. /**
  657. * Input specifications describe format for program input. They carry information about input data schema
  658. * as well as the knowledge about how to convert input structures into unboxed values (data format which can be
  659. * processed by the YQL runtime).
  660. *
  661. * Input spec defines the arguments of the program's Apply method. For example, a program
  662. * with the protobuf input spec will accept a stream of protobuf messages while a program with the
  663. * yson spec will accept an input stream (binary or text one).
  664. *
  665. * See documentation for input and output spec traits for hints on how to implement a custom specs.
  666. */
  667. class TInputSpecBase {
  668. protected:
  669. mutable TVector<THashMap<TString, NYT::TNode>> AllVirtualColumns_;
  670. public:
  671. virtual ~TInputSpecBase() = default;
  672. public:
  673. /**
  674. * Get input data schemas in YQL format (NB: not a YT format). Each item of the returned vector must
  675. * describe a structure.
  676. *
  677. * Format of each item is approximately this one:
  678. *
  679. * @code
  680. * [
  681. * 'StructType',
  682. * [
  683. * ["Field1Name", ["DataType", "Int32"]],
  684. * ["Field2Name", ["DataType", "String"]],
  685. * ...
  686. * ]
  687. * ]
  688. * @endcode
  689. */
  690. virtual const TVector<NYT::TNode>& GetSchemas() const = 0;
  691. // TODO: make a neat schema builder
  692. /**
  693. * Get virtual columns for each input.
  694. *
  695. * Key of each mapping is column name, value is data schema in YQL format.
  696. */
  697. const TVector<THashMap<TString, NYT::TNode>>& GetAllVirtualColumns() const {
  698. if (AllVirtualColumns_.empty()) {
  699. AllVirtualColumns_ = TVector<THashMap<TString, NYT::TNode>>(GetSchemas().size());
  700. }
  701. return AllVirtualColumns_;
  702. }
  703. virtual bool ProvidesBlocks() const { return false; }
  704. };
  705. /**
  706. * Output specifications describe format for program output. Like input specifications, they cary knowledge
  707. * about program output type and how to convert unboxed values into that type.
  708. */
  709. class TOutputSpecBase {
  710. private:
  711. TMaybe<THashSet<TString>> OutputColumnsFilter_;
  712. public:
  713. virtual ~TOutputSpecBase() = default;
  714. public:
  715. /**
  716. * Get output data schema in YQL format (NB: not a YT format). The returned value must describe a structure
  717. * or a variant made of structures for fulti-table outputs (note: not all specs support multi-table output).
  718. *
  719. * See docs for the input spec's GetSchemas().
  720. *
  721. * Also TNode entity could be returned (NYT::TNode::CreateEntity()),
  722. * in which case output schema would be inferred from query and could be
  723. * obtained by Program::GetOutputSchema() call.
  724. */
  725. virtual const NYT::TNode& GetSchema() const = 0;
  726. /**
  727. * Get an output columns filter.
  728. *
  729. * Output columns filter is a set of column names that should be left in the output. All columns that are
  730. * not in this set will not be calculated. Depending on the output schema, they will be either removed
  731. * completely (for optional columns) or filled with defaults (for required columns).
  732. */
  733. const TMaybe<THashSet<TString>>& GetOutputColumnsFilter() const {
  734. return OutputColumnsFilter_;
  735. }
  736. /**
  737. * Set new output columns filter.
  738. */
  739. void SetOutputColumnsFilter(const TMaybe<THashSet<TString>>& outputColumnsFilter) {
  740. OutputColumnsFilter_ = outputColumnsFilter;
  741. }
  742. virtual bool AcceptsBlocks() const { return false; }
  743. };
  744. ////////////////////////////////////////////////////////////////////////////////////////////////////
  745. /**
  746. * Input spec traits provide information on how to process program input.
  747. *
  748. * Each input spec should create a template specialization for this class, in which it should provide several
  749. * static variables and functions.
  750. *
  751. * For example, a hypothetical example of implementing a JSON input spec would look like this:
  752. *
  753. * @code
  754. * class TJsonInputSpec: public TInputSpecBase {
  755. * // whatever magic you require for this spec
  756. * };
  757. *
  758. * template <>
  759. * class TInputSpecTraits<TJsonInputSpec> {
  760. * // write here four constants, one typedef and three static functions described below
  761. * };
  762. * @endcode
  763. *
  764. * @tparam T input spec type.
  765. */
  766. template <typename T>
  767. struct TInputSpecTraits {
  768. /// Safety flag which should be set to false in all template specializations of this class. Attempt to
  769. /// build a program using a spec with `IsPartial=true` will result in compilation error.
  770. static const constexpr bool IsPartial = true;
  771. /// Indicates whether this spec supports pull stream mode.
  772. static const constexpr bool SupportPullStreamMode = false;
  773. /// Indicates whether this spec supports pull list mode.
  774. static const constexpr bool SupportPullListMode = false;
  775. /// Indicates whether this spec supports push stream mode.
  776. static const constexpr bool SupportPushStreamMode = false;
  777. /// For push mode, indicates the return type of the builder's Process function.
  778. using TConsumerType = void;
  779. /// For pull stream mode, should take an input spec, a pull stream worker and whatever the user passed
  780. /// to the program's Apply function, create an unboxed values with a custom stream implementations
  781. /// and pass it to the worker's SetInput function for each input.
  782. template <typename ...A>
  783. static void PreparePullStreamWorker(const T&, IPullStreamWorker*, A&&...) {
  784. Y_UNREACHABLE();
  785. }
  786. /// For pull list mode, should take an input spec, a pull list worker and whatever the user passed
  787. /// to the program's Apply function, create an unboxed values with a custom list implementations
  788. /// and pass it to the worker's SetInput function for each input.
  789. template <typename ...A>
  790. static void PreparePullListWorker(const T&, IPullListWorker*, A&&...) {
  791. Y_UNREACHABLE();
  792. }
  793. /// For push stream mode, should take an input spec and a worker and create a consumer which will
  794. /// be returned to the user. The consumer should keep the worker alive until its own destruction.
  795. /// The return type of this function should exactly match the one defined in ConsumerType typedef.
  796. static TConsumerType MakeConsumer(const T&, TWorkerHolder<IPushStreamWorker>) {
  797. Y_UNREACHABLE();
  798. }
  799. };
  800. /**
  801. * Output spec traits provide information on how to process program output. Like with input specs, each output
  802. * spec requires an appropriate template specialization of this class.
  803. *
  804. * @tparam T output spec type.
  805. */
  806. template <typename T>
  807. struct TOutputSpecTraits {
  808. /// Safety flag which should be set to false in all template specializations of this class. Attempt to
  809. /// build a program using a spec with `IsPartial=false` will result in compilation error.
  810. static const constexpr bool IsPartial = true;
  811. /// Indicates whether this spec supports pull stream mode.
  812. static const constexpr bool SupportPullStreamMode = false;
  813. /// Indicates whether this spec supports pull list mode.
  814. static const constexpr bool SupportPullListMode = false;
  815. /// Indicates whether this spec supports push stream mode.
  816. static const constexpr bool SupportPushStreamMode = false;
  817. /// For pull stream mode, indicates the return type of the program's Apply function.
  818. using TPullStreamReturnType = void;
  819. /// For pull list mode, indicates the return type of the program's Apply function.
  820. using TPullListReturnType = void;
  821. /// For pull stream mode, should take an output spec and a worker and build a stream which will be returned
  822. /// to the user. The return type of this function must match the one specified in the PullStreamReturnType.
  823. static TPullStreamReturnType ConvertPullStreamWorkerToOutputType(const T&, TWorkerHolder<IPullStreamWorker>) {
  824. Y_UNREACHABLE();
  825. }
  826. /// For pull list mode, should take an output spec and a worker and build a list which will be returned
  827. /// to the user. The return type of this function must match the one specified in the PullListReturnType.
  828. static TPullListReturnType ConvertPullListWorkerToOutputType(const T&, TWorkerHolder<IPullListWorker>) {
  829. Y_UNREACHABLE();
  830. }
  831. /// For push stream mode, should take an output spec, a worker and whatever arguments the user passed
  832. /// to the program's Apply function, create a consumer for unboxed values and pass it to the worker's
  833. /// SetConsumer function.
  834. template <typename ...A>
  835. static void SetConsumerToWorker(const T&, IPushStreamWorker*, A&&...) {
  836. Y_UNREACHABLE();
  837. }
  838. };
  839. ////////////////////////////////////////////////////////////////////////////////////////////////////
  840. #define NOT_SPEC_MSG(spec_type) "passed class should be derived from " spec_type " spec base"
  841. #define PARTIAL_SPEC_MSG(spec_type) "this " spec_type " spec does not define its traits. Make sure you've passed " \
  842. "an " spec_type " spec and not some other object; also make sure you've included " \
  843. "all necessary headers. If you're developing a spec, make sure you have " \
  844. "a spec traits template specialization"
  845. #define UNSUPPORTED_MODE_MSG(spec_type, mode) "this " spec_type " spec does not support " mode " mode"
  846. class IProgram {
  847. public:
  848. virtual ~IProgram() = default;
  849. public:
  850. virtual const TInputSpecBase& GetInputSpecBase() const = 0;
  851. virtual const TOutputSpecBase& GetOutputSpecBase() const = 0;
  852. virtual const THashSet<TString>& GetUsedColumns(ui32) const = 0;
  853. virtual const THashSet<TString>& GetUsedColumns() const = 0;
  854. virtual NYT::TNode MakeInputSchema(ui32) const = 0;
  855. virtual NYT::TNode MakeInputSchema() const = 0;
  856. virtual NYT::TNode MakeOutputSchema() const = 0;
  857. virtual NYT::TNode MakeOutputSchema(ui32) const = 0;
  858. virtual NYT::TNode MakeOutputSchema(TStringBuf) const = 0;
  859. virtual NYT::TNode MakeFullOutputSchema() const = 0;
  860. virtual TIssues GetIssues() const = 0;
  861. virtual TString GetCompiledProgram() = 0;
  862. inline void MergeUsedColumns(THashSet<TString>& columns, ui32 inputIndex) {
  863. const auto& usedColumns = GetUsedColumns(inputIndex);
  864. columns.insert(usedColumns.begin(), usedColumns.end());
  865. }
  866. inline void MergeUsedColumns(THashSet<TString>& columns) {
  867. const auto& usedColumns = GetUsedColumns();
  868. columns.insert(usedColumns.begin(), usedColumns.end());
  869. }
  870. };
  871. template <typename TInputSpec, typename TOutputSpec, typename WorkerFactory>
  872. class TProgramCommon: public IProgram {
  873. static_assert(std::is_base_of<TInputSpecBase, TInputSpec>::value, NOT_SPEC_MSG("input"));
  874. static_assert(std::is_base_of<TOutputSpecBase, TOutputSpec>::value, NOT_SPEC_MSG("output"));
  875. protected:
  876. TInputSpec InputSpec_;
  877. TOutputSpec OutputSpec_;
  878. std::shared_ptr<WorkerFactory> WorkerFactory_;
  879. public:
  880. explicit TProgramCommon(
  881. TInputSpec inputSpec,
  882. TOutputSpec outputSpec,
  883. std::shared_ptr<WorkerFactory> workerFactory
  884. )
  885. : InputSpec_(inputSpec)
  886. , OutputSpec_(outputSpec)
  887. , WorkerFactory_(std::move(workerFactory))
  888. {
  889. }
  890. public:
  891. const TInputSpec& GetInputSpec() const {
  892. return InputSpec_;
  893. }
  894. const TOutputSpec& GetOutputSpec() const {
  895. return OutputSpec_;
  896. }
  897. const TInputSpecBase& GetInputSpecBase() const override {
  898. return InputSpec_;
  899. }
  900. const TOutputSpecBase& GetOutputSpecBase() const override {
  901. return OutputSpec_;
  902. }
  903. const THashSet<TString>& GetUsedColumns(ui32 inputIndex) const override {
  904. return WorkerFactory_->GetUsedColumns(inputIndex);
  905. }
  906. const THashSet<TString>& GetUsedColumns() const override {
  907. return WorkerFactory_->GetUsedColumns();
  908. }
  909. NYT::TNode MakeInputSchema(ui32 inputIndex) const override {
  910. return WorkerFactory_->MakeInputSchema(inputIndex);
  911. }
  912. NYT::TNode MakeInputSchema() const override {
  913. return WorkerFactory_->MakeInputSchema();
  914. }
  915. NYT::TNode MakeOutputSchema() const override {
  916. return WorkerFactory_->MakeOutputSchema();
  917. }
  918. NYT::TNode MakeOutputSchema(ui32 outputIndex) const override {
  919. return WorkerFactory_->MakeOutputSchema(outputIndex);
  920. }
  921. NYT::TNode MakeOutputSchema(TStringBuf outputName) const override {
  922. return WorkerFactory_->MakeOutputSchema(outputName);
  923. }
  924. NYT::TNode MakeFullOutputSchema() const override {
  925. return WorkerFactory_->MakeFullOutputSchema();
  926. }
  927. TIssues GetIssues() const override {
  928. return WorkerFactory_->GetIssues();
  929. }
  930. TString GetCompiledProgram() override {
  931. return WorkerFactory_->GetCompiledProgram();
  932. }
  933. };
  934. template <typename TInputSpec, typename TOutputSpec>
  935. class TPullStreamProgram final: public TProgramCommon<TInputSpec, TOutputSpec, IPullStreamWorkerFactory> {
  936. using TProgramCommon<TInputSpec, TOutputSpec, IPullStreamWorkerFactory>::WorkerFactory_;
  937. using TProgramCommon<TInputSpec, TOutputSpec, IPullStreamWorkerFactory>::InputSpec_;
  938. using TProgramCommon<TInputSpec, TOutputSpec, IPullStreamWorkerFactory>::OutputSpec_;
  939. public:
  940. using TProgramCommon<TInputSpec, TOutputSpec, IPullStreamWorkerFactory>::TProgramCommon;
  941. public:
  942. template <typename ...T>
  943. typename TOutputSpecTraits<TOutputSpec>::TPullStreamReturnType Apply(T&& ... t) {
  944. static_assert(!TInputSpecTraits<TInputSpec>::IsPartial, PARTIAL_SPEC_MSG("input"));
  945. static_assert(!TOutputSpecTraits<TOutputSpec>::IsPartial, PARTIAL_SPEC_MSG("output"));
  946. static_assert(TInputSpecTraits<TInputSpec>::SupportPullStreamMode, UNSUPPORTED_MODE_MSG("input", "pull stream"));
  947. static_assert(TOutputSpecTraits<TOutputSpec>::SupportPullStreamMode, UNSUPPORTED_MODE_MSG("output", "pull stream"));
  948. auto worker = WorkerFactory_->MakeWorker();
  949. TInputSpecTraits<TInputSpec>::PreparePullStreamWorker(InputSpec_, worker.Get(), std::forward<T>(t)...);
  950. return TOutputSpecTraits<TOutputSpec>::ConvertPullStreamWorkerToOutputType(OutputSpec_, std::move(worker));
  951. }
  952. };
  953. template <typename TInputSpec, typename TOutputSpec>
  954. class TPullListProgram final: public TProgramCommon<TInputSpec, TOutputSpec, IPullListWorkerFactory> {
  955. using TProgramCommon<TInputSpec, TOutputSpec, IPullListWorkerFactory>::WorkerFactory_;
  956. using TProgramCommon<TInputSpec, TOutputSpec, IPullListWorkerFactory>::InputSpec_;
  957. using TProgramCommon<TInputSpec, TOutputSpec, IPullListWorkerFactory>::OutputSpec_;
  958. public:
  959. using TProgramCommon<TInputSpec, TOutputSpec, IPullListWorkerFactory>::TProgramCommon;
  960. public:
  961. template <typename ...T>
  962. typename TOutputSpecTraits<TOutputSpec>::TPullListReturnType Apply(T&& ... t) {
  963. static_assert(!TInputSpecTraits<TInputSpec>::IsPartial, PARTIAL_SPEC_MSG("input"));
  964. static_assert(!TOutputSpecTraits<TOutputSpec>::IsPartial, PARTIAL_SPEC_MSG("output"));
  965. static_assert(TInputSpecTraits<TInputSpec>::SupportPullListMode, UNSUPPORTED_MODE_MSG("input", "pull list"));
  966. static_assert(TOutputSpecTraits<TOutputSpec>::SupportPullListMode, UNSUPPORTED_MODE_MSG("output", "pull list"));
  967. auto worker = WorkerFactory_->MakeWorker();
  968. TInputSpecTraits<TInputSpec>::PreparePullListWorker(InputSpec_, worker.Get(), std::forward<T>(t)...);
  969. return TOutputSpecTraits<TOutputSpec>::ConvertPullListWorkerToOutputType(OutputSpec_, std::move(worker));
  970. }
  971. };
  972. template <typename TInputSpec, typename TOutputSpec>
  973. class TPushStreamProgram final: public TProgramCommon<TInputSpec, TOutputSpec, IPushStreamWorkerFactory> {
  974. using TProgramCommon<TInputSpec, TOutputSpec, IPushStreamWorkerFactory>::WorkerFactory_;
  975. using TProgramCommon<TInputSpec, TOutputSpec, IPushStreamWorkerFactory>::InputSpec_;
  976. using TProgramCommon<TInputSpec, TOutputSpec, IPushStreamWorkerFactory>::OutputSpec_;
  977. public:
  978. using TProgramCommon<TInputSpec, TOutputSpec, IPushStreamWorkerFactory>::TProgramCommon;
  979. public:
  980. template <typename ...T>
  981. typename TInputSpecTraits<TInputSpec>::TConsumerType Apply(T&& ... t) {
  982. static_assert(!TInputSpecTraits<TInputSpec>::IsPartial, PARTIAL_SPEC_MSG("input"));
  983. static_assert(!TOutputSpecTraits<TOutputSpec>::IsPartial, PARTIAL_SPEC_MSG("output"));
  984. static_assert(TInputSpecTraits<TInputSpec>::SupportPushStreamMode, UNSUPPORTED_MODE_MSG("input", "push stream"));
  985. static_assert(TOutputSpecTraits<TOutputSpec>::SupportPushStreamMode, UNSUPPORTED_MODE_MSG("output", "push stream"));
  986. auto worker = WorkerFactory_->MakeWorker();
  987. TOutputSpecTraits<TOutputSpec>::SetConsumerToWorker(OutputSpec_, worker.Get(), std::forward<T>(t)...);
  988. return TInputSpecTraits<TInputSpec>::MakeConsumer(InputSpec_, std::move(worker));
  989. }
  990. };
  991. #undef NOT_SPEC_MSG
  992. #undef PARTIAL_SPEC_MSG
  993. #undef UNSUPPORTED_MODE_MSG
  994. ////////////////////////////////////////////////////////////////////////////////////////////////////
  995. /**
  996. * Configure global logging facilities. Affects all YQL modules.
  997. */
  998. void ConfigureLogging(const TLoggingOptions& = {});
  999. /**
  1000. * Create a new program factory.
  1001. * Custom logging initialization could be preformed by a call to the ConfigureLogging method beforehand.
  1002. * If the ConfigureLogging method has not been called the default logging initialization will be performed.
  1003. */
  1004. IProgramFactoryPtr MakeProgramFactory(const TProgramFactoryOptions& = {});
  1005. }
  1006. }
  1007. Y_DECLARE_OUT_SPEC(inline, NYql::NPureCalc::TCompileError, stream, value) {
  1008. stream << value.AsStrBuf() << Endl << "Issues:" << Endl << value.GetIssues() << Endl << Endl << "Yql:" << Endl <<value.GetYql();
  1009. }