spec.h 11 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257
  1. #pragma once
  2. #include <yql/essentials/public/purecalc/common/interface.h>
  3. #include <yql/essentials/public/purecalc/helpers/protobuf/schema_from_proto.h>
  4. #include <google/protobuf/message.h>
  5. #include <util/generic/maybe.h>
  6. namespace NYql {
  7. namespace NPureCalc {
  8. /**
  9. * Processing mode for working with raw protobuf message inputs.
  10. *
  11. * In this mode purecalc accept pointers to abstract protobuf messages and processes them using the reflection
  12. * mechanism. All passed messages should have the same descriptor (the one you pass to the constructor
  13. * of the input spec).
  14. *
  15. * All working modes are implemented. In pull stream and pull list modes a program would accept a single object
  16. * stream of const protobuf messages. In push mode, a program will return a consumer of const protobuf messages.
  17. *
  18. * The program synopsis follows:
  19. *
  20. * @code
  21. * ... TPullStreamProgram::Apply(IStream<google::protobuf::Message*>);
  22. * ... TPullListProgram::Apply(IStream<google::protobuf::Message*>);
  23. * TConsumer<google::protobuf::Message*> TPushStreamProgram::Apply(...);
  24. * @endcode
  25. */
  26. class TProtobufRawInputSpec: public TInputSpecBase {
  27. private:
  28. const google::protobuf::Descriptor& Descriptor_;
  29. const TMaybe<TString> TimestampColumn_;
  30. const TProtoSchemaOptions SchemaOptions_;
  31. mutable TVector<NYT::TNode> SavedSchemas_;
  32. public:
  33. /**
  34. * Build input spec and associate the given message descriptor.
  35. */
  36. explicit TProtobufRawInputSpec(
  37. const google::protobuf::Descriptor& descriptor,
  38. const TMaybe<TString>& timestampColumn = Nothing(),
  39. const TProtoSchemaOptions& options = {}
  40. );
  41. public:
  42. const TVector<NYT::TNode>& GetSchemas() const override;
  43. /**
  44. * Get the descriptor associated with this spec.
  45. */
  46. const google::protobuf::Descriptor& GetDescriptor() const;
  47. const TMaybe<TString>& GetTimestampColumn() const;
  48. /*
  49. * Get options that customize input struct type building.
  50. */
  51. const TProtoSchemaOptions& GetSchemaOptions() const;
  52. };
  53. /**
  54. * Processing mode for working with raw protobuf message outputs.
  55. *
  56. * In this mode purecalc yields pointers to abstract protobuf messages. All generated messages share the same
  57. * descriptor so they can be safely converted into an appropriate message type.
  58. *
  59. * Note that one should not expect that the returned pointer will be valid forever; in can (and will) become
  60. * outdated once a new output is requested/pushed.
  61. *
  62. * All working modes are implemented. In pull stream and pull list modes a program will return an object
  63. * stream of non-const protobuf messages. In push mode, it will accept a single consumer of non-const
  64. * messages.
  65. *
  66. * The program synopsis follows:
  67. *
  68. * @code
  69. * IStream<google::protobuf::Message*> TPullStreamProgram::Apply(...);
  70. * IStream<google::protobuf::Message*> TPullListProgram::Apply(...);
  71. * ... TPushStreamProgram::Apply(TConsumer<google::protobuf::Message*>);
  72. * @endcode
  73. */
  74. class TProtobufRawOutputSpec: public TOutputSpecBase {
  75. private:
  76. const google::protobuf::Descriptor& Descriptor_;
  77. google::protobuf::MessageFactory* Factory_;
  78. TProtoSchemaOptions SchemaOptions_;
  79. google::protobuf::Arena* Arena_;
  80. mutable TMaybe<NYT::TNode> SavedSchema_;
  81. public:
  82. /**
  83. * Build output spec and associate the given message descriptor and maybe the given message factory.
  84. */
  85. explicit TProtobufRawOutputSpec(
  86. const google::protobuf::Descriptor& descriptor,
  87. google::protobuf::MessageFactory* = nullptr,
  88. const TProtoSchemaOptions& options = {},
  89. google::protobuf::Arena* arena = nullptr
  90. );
  91. public:
  92. const NYT::TNode& GetSchema() const override;
  93. /**
  94. * Get the descriptor associated with this spec.
  95. */
  96. const google::protobuf::Descriptor& GetDescriptor() const;
  97. /**
  98. * Set a new message factory which will be used to generate messages. Pass a null pointer to use the
  99. * default factory.
  100. */
  101. void SetFactory(google::protobuf::MessageFactory*);
  102. /**
  103. * Get the message factory which is currently associated with this spec.
  104. */
  105. google::protobuf::MessageFactory* GetFactory() const;
  106. /**
  107. * Set a new arena which will be used to generate messages. Pass a null pointer to create on the heap.
  108. */
  109. void SetArena(google::protobuf::Arena*);
  110. /**
  111. * Get the arena which is currently associated with this spec.
  112. */
  113. google::protobuf::Arena* GetArena() const;
  114. /**
  115. * Get options that customize output struct type building.
  116. */
  117. const TProtoSchemaOptions& GetSchemaOptions() const;
  118. };
  119. /**
  120. * Processing mode for working with raw protobuf messages and several outputs.
  121. *
  122. * The program synopsis follows:
  123. *
  124. * @code
  125. * IStream<std::pair<ui32, google::protobuf::Message*>> TPullStreamProgram::Apply(...);
  126. * IStream<std::pair<ui32, google::protobuf::Message*>> TPullListProgram::Apply(...);
  127. * ... TPushStreamProgram::Apply(TConsumer<std::pair<ui32, google::protobuf::Message*>>);
  128. * @endcode
  129. */
  130. class TProtobufRawMultiOutputSpec: public TOutputSpecBase {
  131. private:
  132. TVector<const google::protobuf::Descriptor*> Descriptors_;
  133. TVector<google::protobuf::MessageFactory*> Factories_;
  134. const TProtoSchemaOptions SchemaOptions_;
  135. TVector<google::protobuf::Arena*> Arenas_;
  136. mutable NYT::TNode SavedSchema_;
  137. public:
  138. TProtobufRawMultiOutputSpec(
  139. TVector<const google::protobuf::Descriptor*>,
  140. TMaybe<TVector<google::protobuf::MessageFactory*>> = {},
  141. const TProtoSchemaOptions& options = {},
  142. TMaybe<TVector<google::protobuf::Arena*>> arenas = {}
  143. );
  144. public:
  145. const NYT::TNode& GetSchema() const override;
  146. /**
  147. * Get the descriptor associated with given output.
  148. */
  149. const google::protobuf::Descriptor& GetDescriptor(ui32) const;
  150. /**
  151. * Set a new message factory for given output. It will be used to generate messages for this output.
  152. */
  153. void SetFactory(ui32, google::protobuf::MessageFactory*);
  154. /**
  155. * Get the message factory which is currently associated with given output.
  156. */
  157. google::protobuf::MessageFactory* GetFactory(ui32) const;
  158. /**
  159. * Set a new arena for given output. It will be used to generate messages for this output.
  160. */
  161. void SetArena(ui32, google::protobuf::Arena*);
  162. /**
  163. * Get the arena which is currently associated with given output.
  164. */
  165. google::protobuf::Arena* GetArena(ui32) const;
  166. /**
  167. * Get number of outputs for this spec.
  168. */
  169. ui32 GetOutputsNumber() const;
  170. /**
  171. * Get options that customize output struct type building.
  172. */
  173. const TProtoSchemaOptions& GetSchemaOptions() const;
  174. };
  175. template <>
  176. struct TInputSpecTraits<TProtobufRawInputSpec> {
  177. static const constexpr bool IsPartial = false;
  178. static const constexpr bool SupportPullStreamMode = true;
  179. static const constexpr bool SupportPullListMode = true;
  180. static const constexpr bool SupportPushStreamMode = true;
  181. using TConsumerType = THolder<IConsumer<google::protobuf::Message*>>;
  182. static void PreparePullStreamWorker(const TProtobufRawInputSpec&, IPullStreamWorker*, THolder<IStream<google::protobuf::Message*>>);
  183. static void PreparePullListWorker(const TProtobufRawInputSpec&, IPullListWorker*, THolder<IStream<google::protobuf::Message*>>);
  184. static TConsumerType MakeConsumer(const TProtobufRawInputSpec&, TWorkerHolder<IPushStreamWorker>);
  185. };
  186. template <>
  187. struct TOutputSpecTraits<TProtobufRawOutputSpec> {
  188. static const constexpr bool IsPartial = false;
  189. static const constexpr bool SupportPullStreamMode = true;
  190. static const constexpr bool SupportPullListMode = true;
  191. static const constexpr bool SupportPushStreamMode = true;
  192. using TOutputItemType = google::protobuf::Message*;
  193. using TPullStreamReturnType = THolder<IStream<TOutputItemType>>;
  194. using TPullListReturnType = THolder<IStream<TOutputItemType>>;
  195. static const constexpr TOutputItemType StreamSentinel = nullptr;
  196. static TPullStreamReturnType ConvertPullStreamWorkerToOutputType(const TProtobufRawOutputSpec&, TWorkerHolder<IPullStreamWorker>);
  197. static TPullListReturnType ConvertPullListWorkerToOutputType(const TProtobufRawOutputSpec&, TWorkerHolder<IPullListWorker>);
  198. static void SetConsumerToWorker(const TProtobufRawOutputSpec&, IPushStreamWorker*, THolder<IConsumer<TOutputItemType>>);
  199. };
  200. template <>
  201. struct TOutputSpecTraits<TProtobufRawMultiOutputSpec> {
  202. static const constexpr bool IsPartial = false;
  203. static const constexpr bool SupportPullStreamMode = true;
  204. static const constexpr bool SupportPullListMode = true;
  205. static const constexpr bool SupportPushStreamMode = true;
  206. using TOutputItemType = std::pair<ui32, google::protobuf::Message*>;
  207. using TPullStreamReturnType = THolder<IStream<TOutputItemType>>;
  208. using TPullListReturnType = THolder<IStream<TOutputItemType>>;
  209. static const constexpr TOutputItemType StreamSentinel = {0, nullptr};
  210. static TPullStreamReturnType ConvertPullStreamWorkerToOutputType(const TProtobufRawMultiOutputSpec&, TWorkerHolder<IPullStreamWorker>);
  211. static TPullListReturnType ConvertPullListWorkerToOutputType(const TProtobufRawMultiOutputSpec&, TWorkerHolder<IPullListWorker>);
  212. static void SetConsumerToWorker(const TProtobufRawMultiOutputSpec&, IPushStreamWorker*, THolder<IConsumer<TOutputItemType>>);
  213. };
  214. }
  215. }