spec.h 7.2 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147
  1. #pragma once
  2. #include "proto_variant.h"
  3. #include <yql/essentials/public/purecalc/io_specs/protobuf_raw/spec.h>
  4. namespace NYql {
  5. namespace NPureCalc {
  6. /**
  7. * Processing mode for working with non-raw protobuf messages.
  8. *
  9. * @tparam T message type.
  10. */
  11. template <typename T>
  12. class TProtobufInputSpec: public TProtobufRawInputSpec {
  13. static_assert(std::is_base_of<google::protobuf::Message, T>::value,
  14. "should be derived from google::protobuf::Message");
  15. public:
  16. TProtobufInputSpec(
  17. const TMaybe<TString>& timestampColumn = Nothing(),
  18. const TProtoSchemaOptions& options = {}
  19. )
  20. : TProtobufRawInputSpec(*T::descriptor(), timestampColumn, options)
  21. {
  22. }
  23. };
  24. /**
  25. * Processing mode for working with non-raw protobuf messages.
  26. *
  27. * @tparam T message type.
  28. */
  29. template <typename T>
  30. class TProtobufOutputSpec: public TProtobufRawOutputSpec {
  31. static_assert(std::is_base_of<google::protobuf::Message, T>::value,
  32. "should be derived from google::protobuf::Message");
  33. public:
  34. TProtobufOutputSpec(
  35. const TProtoSchemaOptions& options = {},
  36. google::protobuf::Arena* arena = nullptr
  37. )
  38. : TProtobufRawOutputSpec(*T::descriptor(), nullptr, options, arena)
  39. {
  40. }
  41. };
  42. /**
  43. * Processing mode for working with non-raw protobuf messages and several outputs.
  44. */
  45. template <typename... T>
  46. class TProtobufMultiOutputSpec: public TProtobufRawMultiOutputSpec {
  47. static_assert(
  48. std::conjunction_v<std::is_base_of<google::protobuf::Message, T>...>,
  49. "all types should be derived from google::protobuf::Message");
  50. public:
  51. TProtobufMultiOutputSpec(
  52. const TProtoSchemaOptions& options = {},
  53. TMaybe<TVector<google::protobuf::Arena*>> arenas = {}
  54. )
  55. : TProtobufRawMultiOutputSpec({T::descriptor()...}, Nothing(), options, std::move(arenas))
  56. {
  57. }
  58. };
  59. template <typename T>
  60. struct TInputSpecTraits<TProtobufInputSpec<T>> {
  61. static const constexpr bool IsPartial = false;
  62. static const constexpr bool SupportPullStreamMode = true;
  63. static const constexpr bool SupportPullListMode = true;
  64. static const constexpr bool SupportPushStreamMode = true;
  65. using TConsumerType = THolder<IConsumer<T*>>;
  66. static void PreparePullStreamWorker(const TProtobufInputSpec<T>& inputSpec, IPullStreamWorker* worker, THolder<IStream<T*>> stream) {
  67. auto raw = ConvertStream<google::protobuf::Message*>(std::move(stream));
  68. TInputSpecTraits<TProtobufRawInputSpec>::PreparePullStreamWorker(inputSpec, worker, std::move(raw));
  69. }
  70. static void PreparePullListWorker(const TProtobufInputSpec<T>& inputSpec, IPullListWorker* worker, THolder<IStream<T*>> stream) {
  71. auto raw = ConvertStream<google::protobuf::Message*>(std::move(stream));
  72. TInputSpecTraits<TProtobufRawInputSpec>::PreparePullListWorker(inputSpec, worker, std::move(raw));
  73. }
  74. static TConsumerType MakeConsumer(const TProtobufInputSpec<T>& inputSpec, TWorkerHolder<IPushStreamWorker> worker) {
  75. auto raw = TInputSpecTraits<TProtobufRawInputSpec>::MakeConsumer(inputSpec, std::move(worker));
  76. return ConvertConsumer<T*>(std::move(raw));
  77. }
  78. };
  79. template <typename T>
  80. struct TOutputSpecTraits<TProtobufOutputSpec<T>> {
  81. static const constexpr bool IsPartial = false;
  82. static const constexpr bool SupportPullStreamMode = true;
  83. static const constexpr bool SupportPullListMode = true;
  84. static const constexpr bool SupportPushStreamMode = true;
  85. using TOutputItemType = T*;
  86. using TPullStreamReturnType = THolder<IStream<TOutputItemType>>;
  87. using TPullListReturnType = THolder<IStream<TOutputItemType>>;
  88. static TPullStreamReturnType ConvertPullStreamWorkerToOutputType(const TProtobufOutputSpec<T>& outputSpec, TWorkerHolder<IPullStreamWorker> worker) {
  89. auto raw = TOutputSpecTraits<TProtobufRawOutputSpec>::ConvertPullStreamWorkerToOutputType(outputSpec, std::move(worker));
  90. return ConvertStreamUnsafe<TOutputItemType>(std::move(raw));
  91. }
  92. static TPullListReturnType ConvertPullListWorkerToOutputType(const TProtobufOutputSpec<T>& outputSpec, TWorkerHolder<IPullListWorker> worker) {
  93. auto raw = TOutputSpecTraits<TProtobufRawOutputSpec>::ConvertPullListWorkerToOutputType(outputSpec, std::move(worker));
  94. return ConvertStreamUnsafe<TOutputItemType>(std::move(raw));
  95. }
  96. static void SetConsumerToWorker(const TProtobufOutputSpec<T>& outputSpec, IPushStreamWorker* worker, THolder<IConsumer<T*>> consumer) {
  97. auto raw = ConvertConsumerUnsafe<google::protobuf::Message*>(std::move(consumer));
  98. TOutputSpecTraits<TProtobufRawOutputSpec>::SetConsumerToWorker(outputSpec, worker, std::move(raw));
  99. }
  100. };
  101. template <typename... T>
  102. struct TOutputSpecTraits<TProtobufMultiOutputSpec<T...>> {
  103. static const constexpr bool IsPartial = false;
  104. static const constexpr bool SupportPullStreamMode = true;
  105. static const constexpr bool SupportPullListMode = true;
  106. static const constexpr bool SupportPushStreamMode = true;
  107. using TOutputItemType = std::variant<T*...>;
  108. using TPullStreamReturnType = THolder<IStream<TOutputItemType>>;
  109. using TPullListReturnType = THolder<IStream<TOutputItemType>>;
  110. static TPullStreamReturnType ConvertPullStreamWorkerToOutputType(const TProtobufMultiOutputSpec<T...>& outputSpec, TWorkerHolder<IPullStreamWorker> worker) {
  111. auto raw = TOutputSpecTraits<TProtobufRawMultiOutputSpec>::ConvertPullStreamWorkerToOutputType(outputSpec, std::move(worker));
  112. return THolder(new NPrivate::TProtobufsMappingStream<T...>(std::move(raw)));
  113. }
  114. static TPullListReturnType ConvertPullListWorkerToOutputType(const TProtobufMultiOutputSpec<T...>& outputSpec, TWorkerHolder<IPullListWorker> worker) {
  115. auto raw = TOutputSpecTraits<TProtobufRawMultiOutputSpec>::ConvertPullListWorkerToOutputType(outputSpec, std::move(worker));
  116. return THolder(new NPrivate::TProtobufsMappingStream<T...>(std::move(raw)));
  117. }
  118. static void SetConsumerToWorker(const TProtobufMultiOutputSpec<T...>& outputSpec, IPushStreamWorker* worker, THolder<IConsumer<TOutputItemType>> consumer) {
  119. auto wrapper = MakeHolder<NPrivate::TProtobufsMappingConsumer<T...>>(std::move(consumer));
  120. TOutputSpecTraits<TProtobufRawMultiOutputSpec>::SetConsumerToWorker(outputSpec, worker, std::move(wrapper));
  121. }
  122. };
  123. }
  124. }