#pragma once #include "proto_variant.h" #include namespace NYql { namespace NPureCalc { /** * Processing mode for working with non-raw protobuf messages. * * @tparam T message type. */ template class TProtobufInputSpec: public TProtobufRawInputSpec { static_assert(std::is_base_of::value, "should be derived from google::protobuf::Message"); public: TProtobufInputSpec( const TMaybe& timestampColumn = Nothing(), const TProtoSchemaOptions& options = {} ) : TProtobufRawInputSpec(*T::descriptor(), timestampColumn, options) { } }; /** * Processing mode for working with non-raw protobuf messages. * * @tparam T message type. */ template class TProtobufOutputSpec: public TProtobufRawOutputSpec { static_assert(std::is_base_of::value, "should be derived from google::protobuf::Message"); public: TProtobufOutputSpec( const TProtoSchemaOptions& options = {}, google::protobuf::Arena* arena = nullptr ) : TProtobufRawOutputSpec(*T::descriptor(), nullptr, options, arena) { } }; /** * Processing mode for working with non-raw protobuf messages and several outputs. */ template class TProtobufMultiOutputSpec: public TProtobufRawMultiOutputSpec { static_assert( std::conjunction_v...>, "all types should be derived from google::protobuf::Message"); public: TProtobufMultiOutputSpec( const TProtoSchemaOptions& options = {}, TMaybe> arenas = {} ) : TProtobufRawMultiOutputSpec({T::descriptor()...}, Nothing(), options, std::move(arenas)) { } }; template struct TInputSpecTraits> { static const constexpr bool IsPartial = false; static const constexpr bool SupportPullStreamMode = true; static const constexpr bool SupportPullListMode = true; static const constexpr bool SupportPushStreamMode = true; using TConsumerType = THolder>; static void PreparePullStreamWorker(const TProtobufInputSpec& inputSpec, IPullStreamWorker* worker, THolder> stream) { auto raw = ConvertStream(std::move(stream)); TInputSpecTraits::PreparePullStreamWorker(inputSpec, worker, std::move(raw)); } static void PreparePullListWorker(const TProtobufInputSpec& inputSpec, IPullListWorker* worker, THolder> stream) { auto raw = ConvertStream(std::move(stream)); TInputSpecTraits::PreparePullListWorker(inputSpec, worker, std::move(raw)); } static TConsumerType MakeConsumer(const TProtobufInputSpec& inputSpec, TWorkerHolder worker) { auto raw = TInputSpecTraits::MakeConsumer(inputSpec, std::move(worker)); return ConvertConsumer(std::move(raw)); } }; template struct TOutputSpecTraits> { static const constexpr bool IsPartial = false; static const constexpr bool SupportPullStreamMode = true; static const constexpr bool SupportPullListMode = true; static const constexpr bool SupportPushStreamMode = true; using TOutputItemType = T*; using TPullStreamReturnType = THolder>; using TPullListReturnType = THolder>; static TPullStreamReturnType ConvertPullStreamWorkerToOutputType(const TProtobufOutputSpec& outputSpec, TWorkerHolder worker) { auto raw = TOutputSpecTraits::ConvertPullStreamWorkerToOutputType(outputSpec, std::move(worker)); return ConvertStreamUnsafe(std::move(raw)); } static TPullListReturnType ConvertPullListWorkerToOutputType(const TProtobufOutputSpec& outputSpec, TWorkerHolder worker) { auto raw = TOutputSpecTraits::ConvertPullListWorkerToOutputType(outputSpec, std::move(worker)); return ConvertStreamUnsafe(std::move(raw)); } static void SetConsumerToWorker(const TProtobufOutputSpec& outputSpec, IPushStreamWorker* worker, THolder> consumer) { auto raw = ConvertConsumerUnsafe(std::move(consumer)); TOutputSpecTraits::SetConsumerToWorker(outputSpec, worker, std::move(raw)); } }; template struct TOutputSpecTraits> { static const constexpr bool IsPartial = false; static const constexpr bool SupportPullStreamMode = true; static const constexpr bool SupportPullListMode = true; static const constexpr bool SupportPushStreamMode = true; using TOutputItemType = std::variant; using TPullStreamReturnType = THolder>; using TPullListReturnType = THolder>; static TPullStreamReturnType ConvertPullStreamWorkerToOutputType(const TProtobufMultiOutputSpec& outputSpec, TWorkerHolder worker) { auto raw = TOutputSpecTraits::ConvertPullStreamWorkerToOutputType(outputSpec, std::move(worker)); return THolder(new NPrivate::TProtobufsMappingStream(std::move(raw))); } static TPullListReturnType ConvertPullListWorkerToOutputType(const TProtobufMultiOutputSpec& outputSpec, TWorkerHolder worker) { auto raw = TOutputSpecTraits::ConvertPullListWorkerToOutputType(outputSpec, std::move(worker)); return THolder(new NPrivate::TProtobufsMappingStream(std::move(raw))); } static void SetConsumerToWorker(const TProtobufMultiOutputSpec& outputSpec, IPushStreamWorker* worker, THolder> consumer) { auto wrapper = MakeHolder>(std::move(consumer)); TOutputSpecTraits::SetConsumerToWorker(outputSpec, worker, std::move(wrapper)); } }; } }