spec.h 5.0 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130
  1. #pragma once
  2. #include <yql/essentials/public/purecalc/common/interface.h>
  3. #include <arrow/compute/kernel.h>
  4. namespace NYql {
  5. namespace NPureCalc {
  6. /**
  7. * Processing mode for working with Apache Arrow batches inputs.
  8. *
  9. * In this mode purecalc accept pointers to abstract Arrow ExecBatches and
  10. * processes them. All Datums in batches should respect the given YT schema
  11. * (the one you pass to the constructor of the input spec).
  12. *
  13. * All working modes are implemented. In pull list and pull stream modes a
  14. * program would accept a pointer to a single stream object or vector of
  15. * pointers of stream objects of Arrow ExecBatch pointers. In push mode, a
  16. * program will return a consumer of pointers to Arrow ExecBatch.
  17. *
  18. * The program synopsis follows:
  19. *
  20. * @code
  21. * ... TPullListProgram::Apply(IStream<arrow::compute::ExecBatch*>*);
  22. * ... TPullListProgram::Apply(TVector<IStream<arrow::compute::ExecBatch*>*>);
  23. * ... TPullStreamProgram::Apply(IStream<arrow::compute::ExecBatch*>*);
  24. * ... TPullStreamProgram::Apply(TVector<IStream<arrow::compute::ExecBatch*>*>);
  25. * TConsumer<arrow::compute::ExecBatch*> TPushStreamProgram::Apply(...);
  26. * @endcode
  27. */
  28. class TArrowInputSpec: public TInputSpecBase {
  29. private:
  30. const TVector<NYT::TNode> Schemas_;
  31. public:
  32. explicit TArrowInputSpec(const TVector<NYT::TNode>& schemas);
  33. const TVector<NYT::TNode>& GetSchemas() const override;
  34. const NYT::TNode& GetSchema(ui32 index) const;
  35. bool ProvidesBlocks() const override { return true; }
  36. };
  37. /**
  38. * Processing mode for working with Apache Arrow batches outputs.
  39. *
  40. * In this mode purecalc yields pointers to abstract Arrow ExecBatches. All
  41. * Datums in generated batches respects the given YT schema.
  42. *
  43. * Note that one should not expect that the returned pointer will be valid
  44. * forever; in can (and will) become outdated once a new output is
  45. * requested/pushed.
  46. *
  47. * All working modes are implemented. In pull stream and pull list modes a
  48. * program will return a pointer to a stream of pointers to Arrow ExecBatches.
  49. * In push mode, it will accept a single consumer of pointers to Arrow ExecBatch.
  50. *
  51. * The program synopsis follows:
  52. *
  53. * @code
  54. * IStream<arrow::compute::ExecBatch*> TPullStreamProgram::Apply(...);
  55. * IStream<arrow::compute::ExecBatch*> TPullListProgram::Apply(...);
  56. * ... TPushStreamProgram::Apply(TConsumer<arrow::compute::ExecBatch*>);
  57. * @endcode
  58. */
  59. class TArrowOutputSpec: public TOutputSpecBase {
  60. private:
  61. const NYT::TNode Schema_;
  62. public:
  63. explicit TArrowOutputSpec(const NYT::TNode& schema);
  64. const NYT::TNode& GetSchema() const override;
  65. bool AcceptsBlocks() const override { return true; }
  66. };
  67. template <>
  68. struct TInputSpecTraits<TArrowInputSpec> {
  69. static const constexpr bool IsPartial = false;
  70. static const constexpr bool SupportPullListMode = true;
  71. static const constexpr bool SupportPullStreamMode = true;
  72. static const constexpr bool SupportPushStreamMode = true;
  73. using TInputItemType = arrow::compute::ExecBatch*;
  74. using IInputStream = IStream<TInputItemType>;
  75. using TConsumerType = THolder<IConsumer<TInputItemType>>;
  76. static void PreparePullListWorker(const TArrowInputSpec&, IPullListWorker*,
  77. IInputStream*);
  78. static void PreparePullListWorker(const TArrowInputSpec&, IPullListWorker*,
  79. THolder<IInputStream>);
  80. static void PreparePullListWorker(const TArrowInputSpec&, IPullListWorker*,
  81. const TVector<IInputStream*>&);
  82. static void PreparePullListWorker(const TArrowInputSpec&, IPullListWorker*,
  83. TVector<THolder<IInputStream>>&&);
  84. static void PreparePullStreamWorker(const TArrowInputSpec&, IPullStreamWorker*,
  85. IInputStream*);
  86. static void PreparePullStreamWorker(const TArrowInputSpec&, IPullStreamWorker*,
  87. THolder<IInputStream>);
  88. static void PreparePullStreamWorker(const TArrowInputSpec&, IPullStreamWorker*,
  89. const TVector<IInputStream*>&);
  90. static void PreparePullStreamWorker(const TArrowInputSpec&, IPullStreamWorker*,
  91. TVector<THolder<IInputStream>>&&);
  92. static TConsumerType MakeConsumer(const TArrowInputSpec&, TWorkerHolder<IPushStreamWorker>);
  93. };
  94. template <>
  95. struct TOutputSpecTraits<TArrowOutputSpec> {
  96. static const constexpr bool IsPartial = false;
  97. static const constexpr bool SupportPullListMode = true;
  98. static const constexpr bool SupportPullStreamMode = true;
  99. static const constexpr bool SupportPushStreamMode = true;
  100. using TOutputItemType = arrow::compute::ExecBatch*;
  101. using IOutputStream = IStream<TOutputItemType>;
  102. using TPullListReturnType = THolder<IOutputStream>;
  103. using TPullStreamReturnType = THolder<IOutputStream>;
  104. static const constexpr TOutputItemType StreamSentinel = nullptr;
  105. static TPullListReturnType ConvertPullListWorkerToOutputType(const TArrowOutputSpec&, TWorkerHolder<IPullListWorker>);
  106. static TPullStreamReturnType ConvertPullStreamWorkerToOutputType(const TArrowOutputSpec&, TWorkerHolder<IPullStreamWorker>);
  107. static void SetConsumerToWorker(const TArrowOutputSpec&, IPushStreamWorker*, THolder<IConsumer<TOutputItemType>>);
  108. };
  109. } // namespace NPureCalc
  110. } // namespace NYql