Browse Source

YQ-727 S3 coro read by blocks without mkql (draft).

ref:81453a6a985169e8765081caa84eeaf19b30bbfb
a-romanov 2 years ago
parent
commit
935177fefd

+ 16 - 16
CMakeLists.linux.txt

@@ -898,6 +898,22 @@ add_subdirectory(ydb/library/yql/providers/clickhouse/actors)
 add_subdirectory(ydb/library/yql/providers/pq/async_io)
 add_subdirectory(ydb/library/yql/providers/pq/gateway/native)
 add_subdirectory(ydb/library/yql/providers/s3/actors)
+add_subdirectory(contrib/libs/poco/Util)
+add_subdirectory(contrib/libs/expat)
+add_subdirectory(contrib/libs/poco/Foundation)
+add_subdirectory(contrib/libs/poco/JSON)
+add_subdirectory(contrib/libs/poco/XML)
+add_subdirectory(ydb/library/yql/udfs/common/clickhouse/client)
+add_subdirectory(ydb/library/yql/public/udf/support)
+add_subdirectory(contrib/restricted/boost/libs/program_options)
+add_subdirectory(contrib/restricted/cityhash-1.0.2)
+add_subdirectory(contrib/libs/pdqsort)
+add_subdirectory(contrib/restricted/dragonbox)
+add_subdirectory(contrib/libs/poco/Net)
+add_subdirectory(contrib/libs/poco/NetSSL_OpenSSL)
+add_subdirectory(contrib/libs/poco/Crypto)
+add_subdirectory(contrib/libs/apache/avro)
+add_subdirectory(contrib/restricted/boost/libs/iostreams)
 add_subdirectory(ydb/library/yql/providers/solomon/gateway)
 add_subdirectory(ydb/library/yql/providers/solomon/provider)
 add_subdirectory(ydb/library/yql/providers/solomon/expr_nodes)
@@ -919,22 +935,6 @@ add_subdirectory(ydb/core/yq/libs/audit/mock)
 add_subdirectory(ydb/core/yq/libs/audit/events)
 add_subdirectory(ydb/library/folder_service/mock)
 add_subdirectory(ydb/library/keys)
-add_subdirectory(ydb/library/yql/udfs/common/clickhouse/client)
-add_subdirectory(ydb/library/yql/public/udf/support)
-add_subdirectory(contrib/restricted/boost/libs/program_options)
-add_subdirectory(contrib/restricted/cityhash-1.0.2)
-add_subdirectory(contrib/libs/pdqsort)
-add_subdirectory(contrib/restricted/dragonbox)
-add_subdirectory(contrib/libs/poco/Util)
-add_subdirectory(contrib/libs/expat)
-add_subdirectory(contrib/libs/poco/Foundation)
-add_subdirectory(contrib/libs/poco/JSON)
-add_subdirectory(contrib/libs/poco/XML)
-add_subdirectory(contrib/libs/poco/Net)
-add_subdirectory(contrib/libs/poco/NetSSL_OpenSSL)
-add_subdirectory(contrib/libs/poco/Crypto)
-add_subdirectory(contrib/libs/apache/avro)
-add_subdirectory(contrib/restricted/boost/libs/iostreams)
 add_subdirectory(ydb/library/yql/udfs/common/datetime)
 add_subdirectory(library/cpp/timezone_conversion)
 add_subdirectory(ydb/library/yql/udfs/common/datetime2)

+ 1 - 0
ydb/core/driver_lib/run/ut/CMakeLists.darwin.txt

@@ -17,6 +17,7 @@ target_link_libraries(ydb-core-driver_lib-run-ut PUBLIC
   library-cpp-cpuid_check
   cpp-testing-unittest_main
   run
+  yql-sql-pg_dummy
 )
 target_link_options(ydb-core-driver_lib-run-ut PRIVATE
   -Wl,-no_deduplicate

+ 1 - 0
ydb/core/driver_lib/run/ut/CMakeLists.linux.txt

@@ -18,6 +18,7 @@ target_link_libraries(ydb-core-driver_lib-run-ut PUBLIC
   library-cpp-cpuid_check
   cpp-testing-unittest_main
   run
+  yql-sql-pg_dummy
 )
 target_link_options(ydb-core-driver_lib-run-ut PRIVATE
   -ldl

+ 5 - 3
ydb/library/yql/providers/common/mkql/parser.cpp

@@ -107,7 +107,7 @@ TRuntimeNode BuildParseCall(
     TRuntimeNode input,
     const std::string_view& format,
     const std::string_view& compression,
-    TType* inputItemType,
+    TType* inputType,
     TType* outputItemType,
     NCommon::TMkqlBuildContext& ctx)
 {
@@ -140,8 +140,10 @@ TRuntimeNode BuildParseCall(
                 return ctx.ProgramBuilder.Apply(ctx.ProgramBuilder.Udf("Yson2.ConvertTo", {}, userType), {dom});
             });
     } else {
-        const auto userType = ctx.ProgramBuilder.NewTupleType({ctx.ProgramBuilder.NewTupleType({inputItemType}), ctx.ProgramBuilder.NewStructType({}), outputItemType});
-        input = ctx.ProgramBuilder.ToFlow(ctx.ProgramBuilder.Apply(ctx.ProgramBuilder.Udf("ClickHouseClient.ParseFormat", {}, userType, format), {input}));
+        const auto userType = ctx.ProgramBuilder.NewTupleType({ctx.ProgramBuilder.NewTupleType({inputType}), ctx.ProgramBuilder.NewStructType({}), outputItemType});
+        input = TType::EKind::Resource == static_cast<TStreamType*>(inputType)->GetItemType()->GetKind() ?
+            ctx.ProgramBuilder.ToFlow(ctx.ProgramBuilder.Apply(ctx.ProgramBuilder.Udf("ClickHouseClient.ParseBlocks", {}, userType), {input})):
+            ctx.ProgramBuilder.ToFlow(ctx.ProgramBuilder.Apply(ctx.ProgramBuilder.Udf("ClickHouseClient.ParseFormat", {}, userType, format), {input}));
     }
 
     return ctx.ProgramBuilder.ExpandMap(input,

+ 27 - 0
ydb/library/yql/providers/s3/actors/CMakeLists.darwin.txt

@@ -0,0 +1,27 @@
+
+# This file was gererated by the build system used internally in the Yandex monorepo.
+# Only simple modifications are allowed (adding source-files to targets, adding simple properties
+# like target_include_directories). These modifications will be ported to original
+# ya.make files by maintainers. Any complex modifications which can't be ported back to the
+# original buildsystem will not be accepted.
+
+
+
+add_library(providers-s3-actors)
+target_compile_options(providers-s3-actors PRIVATE
+  -DUSE_CURRENT_UDF_ABI_VERSION
+)
+target_link_libraries(providers-s3-actors PUBLIC
+  contrib-libs-cxxsupp
+  yutil
+  yql-minikql-computation
+  common-token_accessor-client
+  common-schema-mkql
+  yql-public-types
+  dq-actors-compute
+  providers-common-http_gateway
+  providers-s3-proto
+)
+target_sources(providers-s3-actors PRIVATE
+  ${CMAKE_SOURCE_DIR}/ydb/library/yql/providers/s3/actors/yql_s3_source_factory.cpp
+)

+ 36 - 0
ydb/library/yql/providers/s3/actors/CMakeLists.linux.txt

@@ -0,0 +1,36 @@
+
+# This file was gererated by the build system used internally in the Yandex monorepo.
+# Only simple modifications are allowed (adding source-files to targets, adding simple properties
+# like target_include_directories). These modifications will be ported to original
+# ya.make files by maintainers. Any complex modifications which can't be ported back to the
+# original buildsystem will not be accepted.
+
+
+
+add_library(providers-s3-actors)
+target_compile_options(providers-s3-actors PRIVATE
+  -DUSE_CURRENT_UDF_ABI_VERSION
+)
+target_include_directories(providers-s3-actors PRIVATE
+  ${CMAKE_SOURCE_DIR}/ydb/library/yql/udfs/common/clickhouse/client/base
+  ${CMAKE_SOURCE_DIR}/ydb/library/yql/udfs/common/clickhouse/client/base/pcg-random
+  ${CMAKE_SOURCE_DIR}/ydb/library/yql/udfs/common/clickhouse/client/src
+)
+target_link_libraries(providers-s3-actors PUBLIC
+  contrib-libs-cxxsupp
+  yutil
+  contrib-libs-fmt
+  libs-poco-Util
+  yql-minikql-computation
+  common-token_accessor-client
+  common-schema-mkql
+  yql-public-types
+  dq-actors-compute
+  providers-common-http_gateway
+  providers-s3-proto
+  clickhouse_client_udf
+)
+target_sources(providers-s3-actors PRIVATE
+  ${CMAKE_SOURCE_DIR}/ydb/library/yql/providers/s3/actors/yql_s3_read_actor.cpp
+  ${CMAKE_SOURCE_DIR}/ydb/library/yql/providers/s3/actors/yql_s3_source_factory.cpp
+)

+ 5 - 20
ydb/library/yql/providers/s3/actors/CMakeLists.txt

@@ -6,23 +6,8 @@
 # original buildsystem will not be accepted.
 
 
-
-add_library(providers-s3-actors)
-target_compile_options(providers-s3-actors PRIVATE
-  -DUSE_CURRENT_UDF_ABI_VERSION
-)
-target_link_libraries(providers-s3-actors PUBLIC
-  contrib-libs-cxxsupp
-  yutil
-  yql-minikql-computation
-  common-token_accessor-client
-  common-schema-mkql
-  yql-public-types
-  dq-actors-compute
-  providers-common-http_gateway
-  providers-s3-proto
-)
-target_sources(providers-s3-actors PRIVATE
-  ${CMAKE_SOURCE_DIR}/ydb/library/yql/providers/s3/actors/yql_s3_read_actor.cpp
-  ${CMAKE_SOURCE_DIR}/ydb/library/yql/providers/s3/actors/yql_s3_source_factory.cpp
-)
+if (APPLE)
+  include(CMakeLists.darwin.txt)
+elseif (UNIX)
+  include(CMakeLists.linux.txt)
+endif()

+ 182 - 123
ydb/library/yql/providers/s3/actors/yql_s3_read_actor.cpp

@@ -1,3 +1,23 @@
+#ifdef __linux__
+#include <ydb/library/yql/udfs/common/clickhouse/client/src/DataTypes/DataTypeEnum.h>
+#include <ydb/library/yql/udfs/common/clickhouse/client/src/DataTypes/DataTypesNumber.h>
+#include <ydb/library/yql/udfs/common/clickhouse/client/src/DataTypes/DataTypeDate.h>
+#include <ydb/library/yql/udfs/common/clickhouse/client/src/DataTypes/DataTypeFactory.h>
+#include <ydb/library/yql/udfs/common/clickhouse/client/src/DataTypes/DataTypeArray.h>
+#include <ydb/library/yql/udfs/common/clickhouse/client/src/DataTypes/DataTypeNothing.h>
+#include <ydb/library/yql/udfs/common/clickhouse/client/src/DataTypes/DataTypeTuple.h>
+#include <ydb/library/yql/udfs/common/clickhouse/client/src/DataTypes/DataTypeNullable.h>
+#include <ydb/library/yql/udfs/common/clickhouse/client/src/DataTypes/DataTypeString.h>
+#include <ydb/library/yql/udfs/common/clickhouse/client/src/DataTypes/DataTypeUUID.h>
+
+#include <ydb/library/yql/udfs/common/clickhouse/client/src/IO/ReadBuffer.h>
+#include <ydb/library/yql/udfs/common/clickhouse/client/src/Core/Block.h>
+#include <ydb/library/yql/udfs/common/clickhouse/client/src/Core/ColumnsWithTypeAndName.h>
+
+#include <ydb/library/yql/udfs/common/clickhouse/client/src/Formats/FormatFactory.h>
+#include <ydb/library/yql/udfs/common/clickhouse/client/src/Formats/InputStreamFromInputFormat.h>
+#endif
+
 #include "yql_s3_read_actor.h"
 
 #include <ydb/library/yql/minikql/mkql_string_util.h>
@@ -38,6 +58,7 @@ struct TEvPrivate {
         EvReadFinished,
         EvReadError,
         EvRetry,
+        EvNextBlock,
 
         EvEnd
     };
@@ -68,6 +89,11 @@ struct TEvPrivate {
         explicit TEvRetryEventFunc(std::function<void()> functor) : Functor(std::move(functor)) {}
         const std::function<void()> Functor;
     };
+
+    struct TEvNextBlock : public NActors::TEventLocal<TEvNextBlock, EvNextBlock> {
+        explicit TEvNextBlock(NDB::Block& block) { Block.swap(block); }
+        NDB::Block Block;
+    };
 };
 
 using TPath = std::tuple<TString, size_t>;
@@ -242,67 +268,44 @@ private:
     const std::shared_ptr<NS3::TRetryConfig> RetryConfig;
 };
 
-using namespace NKikimr::NMiniKQL;
+struct TReadSpec {
+    using TPtr = std::shared_ptr<TReadSpec>;
 
-struct TOutput {
-    TUnboxedValueDeque Data;
-    using TPtr = std::shared_ptr<TOutput>;
+    NDB::ColumnsWithTypeAndName Columns;
+    NDB::FormatSettings Settings;
+    TString Format;
 };
 
 class TS3ReadCoroImpl : public TActorCoroImpl {
 private:
-    class TCoroStreamWrapper: public TMutableComputationNode<TCoroStreamWrapper> {
-    using TBaseComputation = TMutableComputationNode<TCoroStreamWrapper>;
+    class TReadBufferFromStream : public NDB::ReadBuffer {
     public:
-        class TStreamValue : public TComputationValue<TStreamValue> {
-        public:
-            using TBase = TComputationValue<TStreamValue>;
-
-            TStreamValue(TMemoryUsageInfo* memInfo, TS3ReadCoroImpl* coro)
-                : TBase(memInfo), Coro(coro)
-            {}
-
-        private:
-            NUdf::EFetchStatus Fetch(NUdf::TUnboxedValue& value) final {
-                return Coro->Next(value) ? NUdf::EFetchStatus::Ok : NUdf::EFetchStatus::Finish;
-            }
-
-        private:
-            TS3ReadCoroImpl *const Coro;
-        };
-
-        TCoroStreamWrapper(TComputationMutables& mutables, TS3ReadCoroImpl* coro)
-            : TBaseComputation(mutables), Coro(coro)
+        TReadBufferFromStream(TS3ReadCoroImpl* coro)
+            : NDB::ReadBuffer(nullptr, 0ULL), Coro(coro), Value(TString())
         {}
+    private:
+        bool nextImpl() final {
+            if (Coro->Next(Value)) {
+                working_buffer = NDB::BufferBase::Buffer(const_cast<char*>(Value.data()), const_cast<char*>(Value.data()) + Value.size());
+                return true;
+            }
 
-        NUdf::TUnboxedValuePod DoCalculate(TComputationContext& ctx) const {
-            return ctx.HolderFactory.Create<TStreamValue>(Coro);
+            return false;
         }
 
-        static IComputationNode* Make(TCallable&, const TComputationNodeFactoryContext& ctx, TS3ReadCoroImpl* coro) {
-            return new TCoroStreamWrapper(ctx.Mutables, coro);
-        }
-    private:
-        void RegisterDependencies() const final {}
         TS3ReadCoroImpl *const Coro;
+        IHTTPGateway::TContent Value;
     };
-
 public:
-    TS3ReadCoroImpl(const TTypeEnvironment& typeEnv, const IFunctionRegistry& functionRegistry, ui64 inputIndex, const NActors::TActorId& computeActorId, ui64, TString format, TString rowType, TOutput::TPtr outputs)
-        : TActorCoroImpl(512_KB), TypeEnv(typeEnv), FunctionRegistry(functionRegistry), InputIndex(inputIndex), Format(std::move(format)), RowType(std::move(rowType)), ComputeActorId(computeActorId), Outputs(std::move(outputs))
+    TS3ReadCoroImpl(ui64 inputIndex, const NActors::TActorId& sourceActorId, const NActors::TActorId& computeActorId, const TReadSpec::TPtr& readSpec)
+        : TActorCoroImpl(256_KB), InputIndex(inputIndex), ReadSpec(readSpec), SourceActorId(sourceActorId), ComputeActorId(computeActorId)
     {}
 
-    bool Next(NUdf::TUnboxedValue& value) {
+    bool Next(IHTTPGateway::TContent& value) {
         if (Finished)
             return false;
 
-        TAllocState *const allocState = TlsAllocState;
-        PgReleaseThreadContext(allocState->MainContext);
-        TlsAllocState = nullptr;
         const auto ev = WaitForSpecificEvent<TEvPrivate::TEvReadResult, TEvPrivate::TEvReadError, TEvPrivate::TEvReadFinished>();
-        TlsAllocState = allocState;
-        PgAcquireThreadContext(allocState->MainContext);
-
         switch (const auto etype = ev->GetTypeRewrite()) {
             case TEvPrivate::TEvReadFinished::EventType:
                 Finished = true;
@@ -311,7 +314,7 @@ public:
                 Send(ComputeActorId, new IDqSourceActor::TEvSourceError(InputIndex, ev->Get<TEvPrivate::TEvReadError>()->Error, true));
                 return false;
             case TEvPrivate::TEvReadResult::EventType:
-                value = MakeString(NUdf::TStringRef(std::string_view(ev->Get<TEvPrivate::TEvReadResult>()->Result)));
+                value = std::move(ev->Get<TEvPrivate::TEvReadResult>()->Result);
                 Send(ComputeActorId, new IDqSourceActor::TEvNewSourceDataArrived(InputIndex));
                 return true;
             default:
@@ -320,49 +323,13 @@ public:
     }
 private:
     void Run() final try {
-        TOutput::TPtr outputs;
-        // reset member to decrement ref, important for UV lifetime
-        Outputs.swap(outputs);
-
-        const auto randStub = CreateDeterministicRandomProvider(1);
-        const auto timeStub = CreateDeterministicTimeProvider(10000000);
-
-        Y_VERIFY(!TlsAllocState);
-        TlsAllocState = &TypeEnv.GetAllocator().Ref();
-        PgAcquireThreadContext(TypeEnv.GetAllocator().Ref().MainContext);
-        Y_DEFER{
-            PgReleaseThreadContext(TypeEnv.GetAllocator().Ref().MainContext);
-            TlsAllocState = nullptr;
-        };
-
-        const auto pb = std::make_unique<TProgramBuilder>(TypeEnv, FunctionRegistry);
-
-        TCallableBuilder callableBuilder(TypeEnv, "CoroStream", pb->NewStreamType(pb->NewDataType(NUdf::EDataSlot::String)));
-
-        const auto factory = [this](TCallable& callable, const TComputationNodeFactoryContext& ctx) -> IComputationNode* {
-            return callable.GetType()->GetName() == "CoroStream" ?
-                TCoroStreamWrapper::Make(callable, ctx, this) : GetBuiltinFactory()(callable, ctx);
-        };
-
-        TRuntimeNode stream(callableBuilder.Build(), false);
-
-        const auto outputItemType = NCommon::ParseTypeFromYson(TStringBuf(RowType), *pb,  Cerr);
-        const auto userType = pb->NewTupleType({pb->NewTupleType({pb->NewDataType(NUdf::EDataSlot::String)}), pb->NewStructType({}), outputItemType});
-        const auto root = pb->Apply(pb->Udf("ClickHouseClient.ParseSource", {}, userType, Format), {stream});
+        TReadBufferFromStream buffer(this);
+        NDB::InputStreamFromInputFormat stream(NDB::FormatFactory::instance().getInputFormat(ReadSpec->Format, buffer, NDB::Block(ReadSpec->Columns), nullptr, 1_MB, ReadSpec->Settings));
 
-        TExploringNodeVisitor explorer;
-        explorer.Walk(root.GetNode(), TypeEnv);
-        TComputationPatternOpts opts(TypeEnv.GetAllocator().Ref(), TypeEnv, factory, &FunctionRegistry, NUdf::EValidateMode::None, NUdf::EValidatePolicy::Exception,  "OFF", EGraphPerProcess::Single);
-        const auto pattern = MakeComputationPattern(explorer, root, {}, opts);
-        const auto graph = pattern->Clone(opts.ToComputationOptions(*randStub, *timeStub));
-        const TBindTerminator bind(graph->GetTerminator());
+        while (auto block = stream.read())
+            Send(SourceActorId, new TEvPrivate::TEvNextBlock(block));
 
-        const auto output = graph->GetValue();
-        for (NUdf::TUnboxedValue v; NUdf::EFetchStatus::Ok == output.Fetch(v);)
-            outputs->Data.emplace_back(std::move(v));
-
-        outputs = nullptr;
-        Send(ComputeActorId, new IDqSourceActor::TEvNewSourceDataArrived(InputIndex));
+        Send(SourceActorId, new TEvPrivate::TEvReadFinished);
     } catch (const std::exception& err) {
         Send(ComputeActorId, new IDqSourceActor::TEvSourceError(InputIndex, TIssues{TIssue(err.what())}, true));
         return;
@@ -372,12 +339,11 @@ private:
         Send(ComputeActorId, new IDqSourceActor::TEvSourceError(InputIndex, TIssues{TIssue("Unexpected event")}, true));
     }
 private:
-    const TTypeEnvironment& TypeEnv;
-    const IFunctionRegistry& FunctionRegistry;
     const ui64 InputIndex;
+    const TReadSpec::TPtr ReadSpec;
     const TString Format, RowType, Compression;
+    const NActors::TActorId SourceActorId;
     const NActors::TActorId ComputeActorId;
-    TOutput::TPtr Outputs;
     bool Finished = false;
 };
 
@@ -409,11 +375,9 @@ public:
         const IHTTPGateway::THeaders& headers,
         const TString& path,
         const std::size_t expectedSize,
-        const std::shared_ptr<NS3::TRetryConfig>& retryConfig,
-        TOutput::TPtr outputs)
+        const std::shared_ptr<NS3::TRetryConfig>& retryConfig)
         : TActorCoro(THolder<TActorCoroImpl>(impl.Release()))
         , RetryStuff(std::make_shared<TRetryStuff>(std::move(gateway), url + path, headers, retryConfig, expectedSize))
-        , Outputs(std::move(outputs))
     {}
 private:
     static void OnNewData(TActorSystem* actorSystem, const TActorId& self, const TActorId& parent, const TRetryStuff::TPtr& retryStuff, IHTTPGateway::TContent&& data) {
@@ -449,72 +413,79 @@ private:
     }
 
     const TRetryStuff::TPtr RetryStuff;
-    TOutput::TPtr Outputs;
 };
 
 class TS3StreamReadActor : public TActorBootstrapped<TS3StreamReadActor>, public IDqSourceActor {
 public:
     TS3StreamReadActor(
-        const TTypeEnvironment& typeEnv,
-        const IFunctionRegistry& functionRegistry,
         ui64 inputIndex,
         IHTTPGateway::TPtr gateway,
         const TString& url,
         const TString& token,
         TPathList&& paths,
-        TString format,
-        TString rowType,
+        const TReadSpec::TPtr& readSpec,
         const NActors::TActorId& computeActorId,
         const std::shared_ptr<NS3::TRetryConfig>& retryConfig
-    )   : TypeEnv(typeEnv)
-        , FunctionRegistry(functionRegistry)
-        , Gateway(std::move(gateway))
+    )   : Gateway(std::move(gateway))
         , InputIndex(inputIndex)
         , ComputeActorId(computeActorId)
         , Url(url)
         , Headers(MakeHeader(token))
         , Paths(std::move(paths))
-        , Format(format)
-        , RowType(rowType)
+        , ReadSpec(readSpec)
         , RetryConfig(retryConfig)
-        , Outputs(std::make_shared<TOutput>())
+        , Count(Paths.size())
     {}
 
     void Bootstrap() {
         Become(&TS3StreamReadActor::StateFunc);
         for (const auto& path : Paths) {
-            auto impl = MakeHolder<TS3ReadCoroImpl>(TypeEnv, FunctionRegistry, InputIndex, ComputeActorId, Paths.size(), Format, RowType, Outputs);
-            RegisterWithSameMailbox(MakeHolder<TS3ReadCoroActor>(std::move(impl), Gateway, Url, Headers, std::get<TString>(path), std::get<std::size_t>(path), RetryConfig, Outputs).Release());
+            auto impl = MakeHolder<TS3ReadCoroImpl>(InputIndex, SelfId(), ComputeActorId, ReadSpec);
+            RegisterWithSameMailbox(MakeHolder<TS3ReadCoroActor>(std::move(impl), Gateway, Url, Headers, std::get<TString>(path), std::get<std::size_t>(path), RetryConfig).Release());
         }
     }
 
     static constexpr char ActorName[] = "S3_READ_ACTOR";
 
 private:
+    class TBoxedBlock : public NUdf::TBoxedValueBase {
+    public:
+        TBoxedBlock(NDB::Block& block) {
+            Block.swap(block);
+        }
+    private:
+        NUdf::TStringRef GetResourceTag() const final {
+            return NUdf::TStringRef::Of("ClickHouseClient.Block");
+        }
+
+        void* GetResource() final {
+            return &Block;
+        }
+
+        NDB::Block Block;
+    };
+
     void SaveState(const NDqProto::TCheckpoint&, NDqProto::TSourceState&) final {}
     void LoadState(const NDqProto::TSourceState&) final {}
     void CommitState(const NDqProto::TCheckpoint&) final {}
     ui64 GetInputIndex() const final { return InputIndex; }
 
-    i64 GetSourceData(NKikimr::NMiniKQL::TUnboxedValueVector& buffer, bool& finished, i64) final {
-        if (!Outputs)
-            return 0LL;
-
-        i64 total = Outputs->Data.size();
-        std::move(Outputs->Data.begin(), Outputs->Data.end(), std::back_inserter(buffer));
-        Outputs->Data.clear();
-
-        if (Outputs.unique()) {
-            finished = true;
-            Outputs.reset();
-        }
-
+    i64 GetSourceData(NKikimr::NMiniKQL::TUnboxedValueVector& output, bool& finished, i64 free) final {
+        i64 total = 0LL;
+        if (!Blocks.empty()) do {
+            const i64 s = Blocks.front().bytes();
+            free -= s;
+            total += s;
+            output.emplace_back(NUdf::TUnboxedValuePod(new TBoxedBlock(Blocks.front())));
+            Blocks.pop_front();
+        } while (!Blocks.empty() && free > 0LL && Blocks.front().bytes() <= size_t(free));
+
+        finished = Blocks.empty() && !Count;
         return total;
     }
 
     // IActor & IDqSourceActor
     void PassAway() override { // Is called from Compute Actor
-        Outputs = nullptr;
         TActorBootstrapped<TS3StreamReadActor>::PassAway();
     }
 
@@ -524,31 +495,106 @@ private:
 
     STRICT_STFUNC(StateFunc,
         hFunc(TEvPrivate::TEvRetryEventFunc, HandleRetry);
+        hFunc(TEvPrivate::TEvNextBlock, HandleNextBlock);
+        cFunc(TEvPrivate::EvReadFinished, HandleReadFinished);
     )
 
     void HandleRetry(TEvPrivate::TEvRetryEventFunc::TPtr& retry) {
         return retry->Get()->Functor();
     }
 
-    const TTypeEnvironment& TypeEnv;
-    const IFunctionRegistry& FunctionRegistry;
+    void HandleNextBlock(TEvPrivate::TEvNextBlock::TPtr& next) {
+        Blocks.emplace_back();
+        Blocks.back().swap(next->Get()->Block);
+        Send(ComputeActorId, new IDqSourceActor::TEvNewSourceDataArrived(InputIndex));
+    }
+
+    void HandleReadFinished() {
+        Y_VERIFY(Count);
+        --Count;
+    }
 
     const IHTTPGateway::TPtr Gateway;
 
     const ui64 InputIndex;
     const NActors::TActorId ComputeActorId;
 
+
     const TString Url;
     const IHTTPGateway::THeaders Headers;
     const TPathList Paths;
-    const TString Format, RowType, Compression;
+    const TReadSpec::TPtr ReadSpec;
     const std::shared_ptr<NS3::TRetryConfig> RetryConfig;
-
-    TOutput::TPtr Outputs;
+    std::deque<NDB::Block> Blocks;
+    ui32 Count;
 };
 
+using namespace NKikimr::NMiniKQL;
+
+NDB::DataTypePtr MetaToClickHouse(const TType* type) {
+    switch (type->GetKind()) {
+        case TType::EKind::EmptyList:
+            return std::make_shared<NDB::DataTypeArray>(std::make_shared<NDB::DataTypeNothing>());
+        case TType::EKind::Optional:
+            return makeNullable(MetaToClickHouse(static_cast<const TOptionalType*>(type)->GetItemType()));
+        case TType::EKind::List:
+            return std::make_shared<NDB::DataTypeArray>(MetaToClickHouse(static_cast<const TListType*>(type)->GetItemType()));
+        case TType::EKind::Tuple: {
+            const auto tupleType = static_cast<const TTupleType*>(type);
+            NDB::DataTypes elems;
+            elems.reserve(tupleType->GetElementsCount());
+            for (auto i = 0U; i < tupleType->GetElementsCount(); ++i)
+                elems.emplace_back(MetaToClickHouse(tupleType->GetElementType(i)));
+            return std::make_shared<NDB::DataTypeTuple>(elems);
+        }
+        case TType::EKind::Data: {
+            const auto dataType = static_cast<const TDataType*>(type);
+            switch (const auto slot = *dataType->GetDataSlot()) {
+            case NUdf::EDataSlot::Int8:
+                return std::make_shared<NDB::DataTypeInt8>();
+            case NUdf::EDataSlot::Bool:
+            case NUdf::EDataSlot::Uint8:
+                return std::make_shared<NDB::DataTypeUInt8>();
+            case NUdf::EDataSlot::Int16:
+                return std::make_shared<NDB::DataTypeInt16>();
+            case NUdf::EDataSlot::Uint16:
+                return std::make_shared<NDB::DataTypeUInt16>();
+            case NUdf::EDataSlot::Int32:
+                return std::make_shared<NDB::DataTypeInt32>();
+            case NUdf::EDataSlot::Uint32:
+                return std::make_shared<NDB::DataTypeUInt32>();
+            case NUdf::EDataSlot::Int64:
+                return std::make_shared<NDB::DataTypeInt64>();
+            case NUdf::EDataSlot::Uint64:
+                return std::make_shared<NDB::DataTypeUInt64>();
+            case NUdf::EDataSlot::Float:
+                return std::make_shared<NDB::DataTypeFloat32>();
+            case NUdf::EDataSlot::Double:
+                return std::make_shared<NDB::DataTypeFloat64>();
+            case NUdf::EDataSlot::String:
+                return std::make_shared<NDB::DataTypeString>();
+            case NUdf::EDataSlot::Date:
+            case NUdf::EDataSlot::TzDate:
+                return std::make_shared<NDB::DataTypeDate>();
+            case NUdf::EDataSlot::Datetime:
+            case NUdf::EDataSlot::TzDatetime:
+                return std::make_shared<NDB::DataTypeDateTime>();
+            case NUdf::EDataSlot::Uuid:
+                return std::make_shared<NDB::DataTypeUUID>();
+            default:
+                break;
+            }
+        }
+        default:
+            break;
+    }
+    return nullptr;
+}
+
 } // namespace
 
+using namespace NKikimr::NMiniKQL;
+
 std::pair<NYql::NDq::IDqSourceActor*, IActor*> CreateS3ReadActor(
     const TTypeEnvironment& typeEnv,
     const IFunctionRegistry& functionRegistry,
@@ -584,7 +630,20 @@ std::pair<NYql::NDq::IDqSourceActor*, IActor*> CreateS3ReadActor(
     const auto authToken = credentialsProviderFactory->CreateProvider()->GetAuthInfo();
 
     if (params.HasFormat() && params.HasRowType()) {
-        const auto actor = new TS3StreamReadActor(typeEnv, functionRegistry, inputIndex, std::move(gateway), params.GetUrl(), authToken, std::move(paths), params.GetFormat(), params.GetRowType(), computeActorId, retryConfig);
+        const auto pb = std::make_unique<TProgramBuilder>(typeEnv, functionRegistry);
+        const auto outputItemType = NCommon::ParseTypeFromYson(TStringBuf(params.GetRowType()), *pb,  Cerr);
+        const auto structType = static_cast<TStructType*>(outputItemType);
+
+        const auto readSpec = std::make_shared<TReadSpec>();
+        readSpec->Columns.resize(structType->GetMembersCount());
+        for (ui32 i = 0U; i < structType->GetMembersCount(); ++i) {
+            auto& colsumn = readSpec->Columns[i];
+            colsumn.type = MetaToClickHouse(structType->GetMemberType(i));
+            colsumn.name = structType->GetMemberName(i);
+        }
+        readSpec->Format = params.GetFormat();
+
+        const auto actor = new TS3StreamReadActor(inputIndex, std::move(gateway), params.GetUrl(), authToken, std::move(paths), readSpec, computeActorId, retryConfig);
         return {actor, actor};
     } else {
         const auto actor = new TS3ReadActor(inputIndex, std::move(gateway), params.GetUrl(), authToken, std::move(paths), computeActorId, retryConfig);

+ 11 - 0
ydb/library/yql/providers/s3/actors/yql_s3_source_factory.cpp

@@ -1,7 +1,10 @@
 #include "yql_s3_source_factory.h"
+#ifdef __linux__
 #include "yql_s3_read_actor.h"
 
 #include <ydb/library/yql/dq/actors/compute/dq_compute_actor_sources.h>
+#include <ydb/library/yql/udfs/common/clickhouse/client/src/Formats/registerFormats.h>
+#endif
 
 namespace NYql::NDq {
 
@@ -10,10 +13,18 @@ void RegisterS3ReadActorFactory(
         ISecuredServiceAccountCredentialsFactory::TPtr credentialsFactory,
         IHTTPGateway::TPtr gateway,
         const std::shared_ptr<NYql::NS3::TRetryConfig>& retryConfig) {
+#ifdef __linux__
+    NDB::registerFormats();
     factory.Register<NS3::TSource>("S3Source",
         [credentialsFactory, gateway, retryConfig](NS3::TSource&& settings, IDqSourceActorFactory::TArguments&& args) {
                 return CreateS3ReadActor(args.TypeEnv, *args.HolderFactory.GetFunctionRegistry(), gateway, std::move(settings), args.InputIndex, args.SecureParams, args.TaskParams, args.ComputeActorId, credentialsFactory, retryConfig);
         });
+#else
+    Y_UNUSED(factory);
+    Y_UNUSED(credentialsFactory);
+    Y_UNUSED(gateway);
+    Y_UNUSED(retryConfig);
+#endif
 }
 
 }

+ 1 - 1
ydb/library/yql/providers/s3/provider/yql_s3_datasource_type_ann.cpp

@@ -85,7 +85,7 @@ public:
             return TStatus::Error;
         }
 
-        input->SetTypeAnn(ctx.MakeType<TStreamExprType>(type));
+        input->SetTypeAnn(ctx.MakeType<TStreamExprType>(ctx.MakeType<TResourceExprType>("ClickHouseClient.Block")));
         return TStatus::Ok;
     }
 

Some files were not shown because too many files changed in this diff