Browse Source

added limit parameter

bbiff 2 years ago
parent
commit
7eeb61811b

+ 122 - 0
util/generic/function_ref.h

@@ -0,0 +1,122 @@
+#pragma once
+
+#include <util/generic/function.h>
+#include <util/system/yassert.h>
+
+#include <functional>
+
+template <typename Signature>
+class TFunctionRef;
+
+template <typename Ret, typename... Args, bool IsNoexcept>
+class TFunctionRef<Ret(Args...) noexcept(IsNoexcept)> {
+public:
+    using TSignature = Ret(Args...) noexcept(IsNoexcept);
+
+private:
+    union TErasedCallable {
+        const void* Functor;
+        void (*Function)();
+    };
+    using TProxy = Ret (*)(TErasedCallable callable, Args...);
+
+    // Making this a lambda inside TFunctionRef ctor caused:
+    // "error: cannot compile this forwarded non-trivially copyable parameter yet"
+    // on clang-win-i686-release.
+    //
+    // Using correct noexcept specifiers here (noexcept(IsNoexcept)) caused miscompilation on clang:
+    // https://github.com/llvm/llvm-project/issues/55280.
+    template <typename Functor>
+    static Ret InvokeErasedFunctor(TErasedCallable callable, Args... args) {
+        auto& ref = *static_cast<const std::remove_reference_t<Functor>*>(callable.Functor);
+        return static_cast<Ret>(std::invoke(ref, std::forward<Args>(args)...));
+    }
+
+    template <typename Function>
+    static Ret InvokeErasedFunction(TErasedCallable callable, Args... args) {
+        auto* function = reinterpret_cast<Function*>(callable.Function);
+        return static_cast<Ret>(std::invoke(function, std::forward<Args>(args)...));
+    }
+
+    template <class F>
+    static constexpr bool IsInvocableUsing = std::conditional_t<
+        IsNoexcept,
+        std::is_nothrow_invocable_r<Ret, F, Args...>,
+        std::is_invocable_r<Ret, F, Args...>>::value;
+
+    // clang-format off
+    template <class Callable>
+    static constexpr bool IsSuitableFunctor =
+        IsInvocableUsing<Callable>
+        && !std::is_function_v<Callable>
+        && !std::is_same_v<std::remove_cvref_t<Callable>, TFunctionRef>;
+
+    template <class Callable>
+    static constexpr bool IsSuitableFunction =
+        IsInvocableUsing<Callable>
+        && std::is_function_v<Callable>;
+    // clang-format on
+
+public:
+    // Function ref should not be default constructible.
+    // While the function ref can have empty state (for example, Proxy_ == nullptr),
+    // It does not make sense in common usage cases.
+    TFunctionRef() = delete;
+
+    // Construct function ref from a functor.
+    template <typename Functor, typename = std::enable_if_t<IsSuitableFunctor<Functor>>>
+    TFunctionRef(Functor&& functor) noexcept
+        : Callable_{
+              .Functor = std::addressof(functor),
+          }
+        , Proxy_{InvokeErasedFunctor<Functor>}
+    {
+    }
+
+    // Construct function ref from a function pointer.
+    template <typename Function, typename = std::enable_if_t<IsSuitableFunction<Function>>>
+    TFunctionRef(Function* function) noexcept
+        : Callable_{
+              .Function = reinterpret_cast<void (*)()>(function),
+          }
+        , Proxy_{InvokeErasedFunction<Function>}
+    {
+    }
+
+    // Copy ctors & assignment.
+    // Just copy pointers.
+    TFunctionRef(const TFunctionRef& rhs) noexcept = default;
+    TFunctionRef& operator=(const TFunctionRef& rhs) noexcept = default;
+
+    Ret operator()(Args... args) const noexcept(IsNoexcept) {
+        return Proxy_(Callable_, std::forward<Args>(args)...);
+    }
+
+private:
+    TErasedCallable Callable_;
+    TProxy Proxy_ = nullptr;
+};
+
+namespace NPrivate {
+
+    template <typename Callable, typename Signature = typename TCallableTraits<Callable>::TSignature>
+    struct TIsNothrowInvocable;
+
+    template <typename Callable, typename Ret, typename... Args>
+    struct TIsNothrowInvocable<Callable, Ret(Args...)> {
+        static constexpr bool IsNoexcept = std::is_nothrow_invocable_r_v<Ret, Callable, Args...>;
+        using TSignature = Ret(Args...) noexcept(IsNoexcept);
+    };
+
+    template <typename Callable>
+    struct TCallableTraitsWithNoexcept {
+        using TSignature = typename TIsNothrowInvocable<Callable>::TSignature;
+    };
+
+} // namespace NPrivate
+
+template <typename Callable>
+TFunctionRef(Callable&&) -> TFunctionRef<typename NPrivate::TCallableTraitsWithNoexcept<Callable>::TSignature>;
+
+template <typename Function>
+TFunctionRef(Function*) -> TFunctionRef<Function>;

+ 150 - 0
util/generic/function_ref_ut.cpp

@@ -0,0 +1,150 @@
+#include "function_ref.h"
+
+#include <library/cpp/testing/unittest/registar.h>
+
+Y_UNIT_TEST_SUITE(TestFunctionRef) {
+    template <typename Signature>
+    struct TTestFunction;
+
+    template <typename Ret, typename... Args, bool IsNoexcept>
+    struct TTestFunction<Ret(Args...) noexcept(IsNoexcept)> {
+        Ret operator()(Args...) const noexcept(IsNoexcept) {
+            return {};
+        }
+    };
+
+    Y_UNIT_TEST(NonDefaultConstructible) {
+        static_assert(!std::is_default_constructible_v<TFunctionRef<void()>>);
+        static_assert(!std::is_default_constructible_v<TFunctionRef<void() noexcept>>);
+        static_assert(!std::is_default_constructible_v<TFunctionRef<int(double, void********* megaptr, TTestFunction<void(int)>)>>);
+    }
+
+    int F1(bool x) {
+        if (x)
+            throw 19;
+        return 42;
+    }
+
+    int F2(bool x) noexcept {
+        return 42 + x;
+    }
+
+    static const TTestFunction<int(bool)> C1;
+    static const TTestFunction<int(bool) noexcept> C2;
+
+    Y_UNIT_TEST(Noexcept) {
+        static_assert(std::is_constructible_v<TFunctionRef<int(bool)>, decltype(F1)>);
+        static_assert(std::is_constructible_v<TFunctionRef<int(bool)>, decltype(F2)>);
+        static_assert(!std::is_constructible_v<TFunctionRef<int(bool) noexcept>, decltype(F1)>);
+        static_assert(std::is_constructible_v<TFunctionRef<int(bool) noexcept>, decltype(F2)>);
+
+        static_assert(std::is_constructible_v<TFunctionRef<int(bool)>, decltype(C1)>);
+        static_assert(std::is_constructible_v<TFunctionRef<int(bool)>, decltype(C2)>);
+        static_assert(!std::is_constructible_v<TFunctionRef<int(bool) noexcept>, decltype(C1)>);
+        static_assert(std::is_constructible_v<TFunctionRef<int(bool) noexcept>, decltype(C2)>);
+    }
+
+    Y_UNIT_TEST(Deduction) {
+        TFunctionRef ref1(F1);
+        TFunctionRef ref2(F2);
+        TFunctionRef ref3(C1);
+        TFunctionRef ref4(C2);
+
+        static_assert(!std::is_nothrow_invocable_r_v<int, decltype(ref1), bool>);
+        static_assert(std::is_nothrow_invocable_r_v<int, decltype(ref2), bool>);
+        static_assert(std::is_same_v<decltype(ref1)::TSignature, int(bool)>);
+        static_assert(std::is_same_v<decltype(ref2)::TSignature, int(bool) noexcept>);
+    }
+
+    void WithCallback(TFunctionRef<double(double, int) noexcept>);
+
+    void Iterate(int from, int to, TFunctionRef<void(int)> callback) {
+        while (from < to) {
+            callback(from++);
+        }
+    }
+
+    void IterateNoexcept(int from, int to, TFunctionRef<void(int) noexcept> callback) {
+        while (from < to) {
+            callback(from++);
+        }
+    }
+
+    Y_UNIT_TEST(AsArgument) {
+        int sum = 0;
+        Iterate(0, 10, [&](int x) { sum += x; });
+        UNIT_ASSERT_EQUAL(sum, 45);
+
+        Iterate(0, 10, [&](int x) noexcept { sum += x; });
+        UNIT_ASSERT_EQUAL(sum, 90);
+
+        IterateNoexcept(0, 10, [&](int x) noexcept { sum += x; });
+        UNIT_ASSERT_EQUAL(sum, 135);
+
+        auto summer = [&](int x) { sum += x; };
+        Iterate(0, 10, summer);
+        Iterate(0, 10, summer);
+        Iterate(0, 10, summer);
+        UNIT_ASSERT_EQUAL(sum, 270);
+
+        TFunctionRef ref = summer;
+        Iterate(0, 10, ref);
+        UNIT_ASSERT_EQUAL(sum, 315);
+    }
+
+    int GlobalSum = 0;
+    void AddToGlobalSum(int x) {
+        GlobalSum += x;
+    }
+
+    Y_UNIT_TEST(FunctionPointer) {
+        GlobalSum = 0;
+        Iterate(0, 10, AddToGlobalSum);
+        UNIT_ASSERT_EQUAL(GlobalSum, 45);
+
+        TFunctionRef ref1 = AddToGlobalSum;
+        Iterate(0, 10, ref1);
+        UNIT_ASSERT_EQUAL(GlobalSum, 90);
+
+        TFunctionRef ref2{AddToGlobalSum};
+        Iterate(0, 10, ref2);
+        UNIT_ASSERT_EQUAL(GlobalSum, 135);
+    }
+
+    Y_UNIT_TEST(Reassign) {
+        TFunctionRef kek = [](double) { return 42; };
+        kek = [](double) { return 19; };
+        kek = [](int) { return 22.8; };
+    }
+
+    const char* Greet() {
+        return "Hello, world!";
+    }
+
+    Y_UNIT_TEST(ImplicitCasts) {
+        TFunctionRef<void(int)> ref = [](int x) { return x; };
+        ref = [](double x) { return x; };
+        ref = [](char x) { return x; };
+
+        TFunctionRef<int()> ref1 = [] { return 0.5; };
+        ref1 = [] { return 'a'; };
+        ref1 = [] { return 124u; };
+
+        TFunctionRef<TStringBuf()> ref2{Greet};
+    }
+
+    Y_UNIT_TEST(StatelessLambdaLifetime) {
+        TFunctionRef<int(int, int)> ref{[](int a, int b) { return a + b; }};
+        UNIT_ASSERT_EQUAL(ref(5, 5), 10);
+    }
+
+    Y_UNIT_TEST(ForwardArguments) {
+        char x = 'x';
+        TFunctionRef<void(std::unique_ptr<int>, char&)> ref = [](std::unique_ptr<int> ptr, char& ch) {
+            UNIT_ASSERT_EQUAL(*ptr, 5);
+            ch = 'a';
+        };
+        ref(std::make_unique<int>(5), x);
+        UNIT_ASSERT_EQUAL(x, 'a');
+    }
+}

+ 8 - 10
ydb/library/yql/providers/common/http_gateway/yql_http_gateway.cpp

@@ -31,7 +31,7 @@ public:
         PUT
     };
 
-    TEasyCurl(const ::NMonitoring::TDynamicCounters::TCounterPtr&  counter, const ::NMonitoring::TDynamicCounters::TCounterPtr& downloadedBytes, const ::NMonitoring::TDynamicCounters::TCounterPtr& uploadedBytes, TString url, IHTTPGateway::THeaders headers, EMethod method, size_t offset = 0ULL, bool withBody = false, const TCurlInitConfig& config = TCurlInitConfig())
+    TEasyCurl(const ::NMonitoring::TDynamicCounters::TCounterPtr&  counter, const ::NMonitoring::TDynamicCounters::TCounterPtr& downloadedBytes, const ::NMonitoring::TDynamicCounters::TCounterPtr& uploadedBytes, TString url, IHTTPGateway::THeaders headers, EMethod method, size_t offset = 0ULL, bool withBody = false, ui64 expectedSize = 0, const TCurlInitConfig& config = TCurlInitConfig())
         : Offset(offset), Handle(curl_easy_init()), Counter(counter), DownloadedBytes(downloadedBytes), UploadedBytes(uploadedBytes)
     {
         switch (method) {
@@ -44,7 +44,6 @@ public:
                 curl_easy_setopt(Handle, CURLOPT_UPLOAD, 1L);
                 break;
         }
-
         curl_easy_setopt(Handle, CURLOPT_URL, url.c_str());
         curl_easy_setopt(Handle, CURLOPT_USERAGENT, "YQ HTTP gateway");
         curl_easy_setopt(Handle, CURLOPT_SSL_VERIFYPEER, 0L);
@@ -57,14 +56,14 @@ public:
                 std::bind(&curl_slist_append, std::placeholders::_1, std::bind(&TString::c_str, std::placeholders::_2)));
             curl_easy_setopt(Handle, CURLOPT_HTTPHEADER, Headers);
         }
-
-        if (Offset) {
-            curl_easy_setopt(Handle, CURLOPT_RANGE,  (ToString(Offset) += '-').c_str());
+        TStringBuilder byteRange;
+        byteRange << Offset << "-";
+        if (expectedSize) {
+            byteRange << Offset + expectedSize - 1;
         }
-
+        curl_easy_setopt(Handle, CURLOPT_RANGE, byteRange.c_str());
         curl_easy_setopt(Handle, EMethod::PUT == method ? CURLOPT_HEADERFUNCTION : CURLOPT_WRITEFUNCTION, &WriteMemoryCallback);
         curl_easy_setopt(Handle, EMethod::PUT == method ? CURLOPT_HEADERDATA :CURLOPT_WRITEDATA, static_cast<void*>(this));
-
         if (withBody) {
             curl_easy_setopt(Handle, CURLOPT_READFUNCTION, &ReadMemoryCallback);
             curl_easy_setopt(Handle, CURLOPT_READDATA, static_cast<void*>(this));
@@ -124,7 +123,7 @@ public:
     using TWeakPtr = std::weak_ptr<TEasyCurlBuffer>;
 
     TEasyCurlBuffer(const ::NMonitoring::TDynamicCounters::TCounterPtr&  counter, const ::NMonitoring::TDynamicCounters::TCounterPtr& downloadedBytes, const ::NMonitoring::TDynamicCounters::TCounterPtr& uploadededBytes, TString url, EMethod method, TString data, IHTTPGateway::THeaders headers, size_t offset, size_t expectedSize, IHTTPGateway::TOnResult callback, IRetryPolicy<long>::IRetryState::TPtr retryState, const TCurlInitConfig& config = TCurlInitConfig())
-        : TEasyCurl(counter, downloadedBytes, uploadededBytes, url, headers, method, offset, !data.empty(), std::move(config)), ExpectedSize(expectedSize), Data(std::move(data)), Input(Data), Output(Buffer), RetryState(std::move(retryState))
+        : TEasyCurl(counter, downloadedBytes, uploadededBytes, url, headers, method, offset, !data.empty(), expectedSize, std::move(config)), ExpectedSize(expectedSize), Data(std::move(data)), Input(Data), Output(Buffer), RetryState(std::move(retryState))
     {
         Output.Reserve(ExpectedSize);
         Callbacks.emplace(std::move(callback));
@@ -220,7 +219,7 @@ public:
         IHTTPGateway::TOnNewDataPart onNewData,
         IHTTPGateway::TOnDownloadFinish onFinish,
         const TCurlInitConfig& config = TCurlInitConfig())
-        : TEasyCurl(counter, downloadedBytes, uploadededBytes, url, headers, EMethod::GET, offset, false, std::move(config)), OnStart(std::move(onStart)), OnNewData(std::move(onNewData)), OnFinish(std::move(onFinish)), Counter(std::make_shared<std::atomic_size_t>(0ULL))
+        : TEasyCurl(counter, downloadedBytes, uploadededBytes, url, headers, EMethod::GET, offset, false, 0, std::move(config)), OnStart(std::move(onStart)), OnNewData(std::move(onNewData)), OnFinish(std::move(onFinish)), Counter(std::make_shared<std::atomic_size_t>(0ULL))
     {}
 
     static TPtr Make(
@@ -571,7 +570,6 @@ private:
         IRetryPolicy<long>::TPtr retryPolicy) final
     {
         Rps->Inc();
-
         if (expectedSize > MaxSimulatenousDownloadsSize) {
             TIssue error(TStringBuilder() << "Too big file for downloading: size " << expectedSize << ", but limit is " << MaxSimulatenousDownloadsSize);
             callback(TIssues{error});

+ 14 - 6
ydb/library/yql/providers/s3/actors/yql_s3_read_actor.cpp

@@ -128,7 +128,8 @@ public:
         TPathList&& paths,
         bool addPathIndex,
         ui64 startPathIndex,
-        const NActors::TActorId& computeActorId
+        const NActors::TActorId& computeActorId,
+        ui64 expectedSize
     )   : Gateway(std::move(gateway))
         , HolderFactory(holderFactory)
         , InputIndex(inputIndex)
@@ -139,6 +140,7 @@ public:
         , Paths(std::move(paths))
         , AddPathIndex(addPathIndex)
         , StartPathIndex(startPathIndex)
+        , ExpectedSize(expectedSize)
     {}
 
     void Bootstrap() {
@@ -146,7 +148,7 @@ public:
         for (size_t pathInd = 0; pathInd < Paths.size(); ++pathInd) {
             const TPath& path = Paths[pathInd];
             Gateway->Download(Url + std::get<TString>(path),
-                Headers, std::get<size_t>(path),
+                Headers, std::min(std::get<size_t>(path), ExpectedSize),
                 std::bind(&TS3ReadActor::OnDownloadFinished, ActorSystem, SelfId(), std::placeholders::_1, pathInd + StartPathIndex), {}, GetS3RetryPolicy());
         };
     }
@@ -247,6 +249,7 @@ private:
     const TPathList Paths;
     const bool AddPathIndex;
     const ui64 StartPathIndex;
+    const ui64 ExpectedSize;
 
     std::queue<std::tuple<IHTTPGateway::TContent, ui64>> Blocks;
 };
@@ -257,6 +260,7 @@ struct TReadSpec {
     NDB::ColumnsWithTypeAndName Columns;
     NDB::FormatSettings Settings;
     TString Format, Compression;
+    ui64 ExpectedSize = 0;
 };
 
 struct TRetryStuff {
@@ -778,9 +782,9 @@ std::pair<NYql::NDq::IDqComputeActorAsyncInput*, IActor*> CreateS3ReadActor(
         const auto readSpec = std::make_shared<TReadSpec>();
         readSpec->Columns.resize(structType->GetMembersCount());
         for (ui32 i = 0U; i < structType->GetMembersCount(); ++i) {
-            auto& colsumn = readSpec->Columns[i];
-            colsumn.type = MetaToClickHouse(structType->GetMemberType(i), intervalUnit);
-            colsumn.name = structType->GetMemberName(i);
+            auto& column = readSpec->Columns[i];
+            column.type = MetaToClickHouse(structType->GetMemberType(i), intervalUnit);
+            column.name = structType->GetMemberName(i);
         }
         readSpec->Format = params.GetFormat();
 
@@ -808,8 +812,12 @@ std::pair<NYql::NDq::IDqComputeActorAsyncInput*, IActor*> CreateS3ReadActor(
                                                   std::move(paths), addPathIndex, startPathIndex, readSpec, computeActorId);
         return {actor, actor};
     } else {
+        ui64 expectedSize = std::numeric_limits<ui64>::max();
+        if (const auto it = settings.find("expectedSize"); settings.cend() != it)
+            expectedSize = FromString<ui64>(it->second);
+
         const auto actor = new TS3ReadActor(inputIndex, std::move(gateway), holderFactory, params.GetUrl(), authToken,
-                                            std::move(paths), addPathIndex, startPathIndex, computeActorId);
+                                            std::move(paths), addPathIndex, startPathIndex, computeActorId, expectedSize);
         return {actor, actor};
     }
 }

+ 4 - 1
ydb/library/yql/providers/s3/expr_nodes/yql_s3_expr_nodes.json

@@ -51,7 +51,10 @@
         {
             "Name": "TS3SourceSettings",
             "Base": "TS3SourceSettingsBase",
-            "Match": {"Type": "Callable", "Name": "S3SourceSettings"}
+            "Match": {"Type": "Callable", "Name": "S3SourceSettings"},
+            "Children": [
+                {"Index": 2, "Name": "ExpectedSize", "Type": "TCoAtom", "Optional": true}
+            ]
         },
         {
             "Name": "TS3ParseSettings",

+ 25 - 4
ydb/library/yql/providers/s3/provider/yql_s3_datasource_type_ann.cpp

@@ -95,7 +95,7 @@ public:
     }
 
     TStatus HandleS3SourceSettings(const TExprNode::TPtr& input, TExprContext& ctx) {
-        if (!EnsureArgsCount(*input, 2U, ctx)) {
+        if (!EnsureMinArgsCount(*input, 2U, ctx)) {
             return TStatus::Error;
         }
 
@@ -250,17 +250,20 @@ public:
             return TStatus::Error;
         }
 
+        auto format = input->Child(TS3Object::idx_Format)->Content();
+
         if (!EnsureAtom(*input->Child(TS3Object::idx_Format), ctx) ||
-            !NCommon::ValidateFormat(input->Child(TS3Object::idx_Format)->Content(), ctx))
+            !NCommon::ValidateFormat(format, ctx))
         {
             return TStatus::Error;
         }
 
+
         if (input->ChildrenSize() > TS3Object::idx_Settings) {
             bool haveProjection = false;
             bool havePartitionedBy = false;
             auto validator = [&](TStringBuf name, const TExprNode& setting, TExprContext& ctx) {
-                if ((name == "compression" || name == "projection" || name == "data.interval.unit") && setting.ChildrenSize() != 2) {
+                if ((name == "compression" || name == "projection" || name == "data.interval.unit" || name == "readmaxbytes") && setting.ChildrenSize() != 2) {
                     ctx.AddError(TIssue(ctx.GetPosition(setting.Pos()),
                         TStringBuilder() << "Expected single value setting for " << name << ", but got " << setting.ChildrenSize() - 1));
                     return false;
@@ -324,6 +327,24 @@ public:
                     return NCommon::ValidateIntervalUnit(unit, ctx);
                 }
 
+                if (name == "readmaxbytes") {
+                    auto& value = setting.Tail();
+                    if (format != "raw") {
+                        ctx.AddError(TIssue(ctx.GetPosition(value.Pos()), "read_max_bytes can only be used with raw format"));
+                        return false;
+                    }
+                    if (!value.IsAtom()) {
+                        if (!EnsureStringOrUtf8Type(value, ctx)) {
+                            return false;
+                        }
+                        if (!value.IsCallable({"String", "Utf8"})) {
+                            ctx.AddError(TIssue(ctx.GetPosition(value.Pos()), "read_max_bytes must be literal value"));
+                            return false;
+                        }
+                    }
+                    return true;
+                }
+
                 YQL_ENSURE(name == "projection");
                 haveProjection = true;
                 if (!EnsureAtom(setting.Tail(), ctx)) {
@@ -338,7 +359,7 @@ public:
                 return true;
             };
             if (!EnsureValidSettings(*input->Child(TS3Object::idx_Settings),
-                                     { "compression", "partitionedby", "projection", "data.interval.unit" }, validator, ctx))
+                                     { "compression", "partitionedby", "projection", "data.interval.unit", "readmaxbytes" }, validator, ctx))
             {
                 return TStatus::Error;
             }

+ 30 - 1
ydb/library/yql/providers/s3/provider/yql_s3_dq_integration.cpp

@@ -188,18 +188,41 @@ public:
                             .Seal().Build()
                     );
                 }
+                auto readSettings = s3ReadObject.Object().Settings().Cast().Ptr();
 
-                return Build<TDqSourceWrap>(ctx, read->Pos())
+                int expectedSizeIndex = -1;
+                for (size_t childInd = 0; childInd < readSettings->ChildrenSize(); ++childInd) {
+                    if (readSettings->Child(childInd)->Head().Content() == "readmaxbytes") {
+                        expectedSizeIndex = childInd;
+                        break;
+                    }
+                }
+
+                if (expectedSizeIndex != -1) {
+                    return Build<TDqSourceWrap>(ctx, read->Pos())
                     .Input<TS3SourceSettings>()
                         .Paths(s3ReadObject.Object().Paths())
                         .Token<TCoSecureParam>()
                             .Name().Build(token)
                             .Build()
+                        .ExpectedSize(readSettings->Child(expectedSizeIndex)->TailPtr())
                         .Build()
                     .RowType(ExpandType(s3ReadObject.Pos(), *rowType, ctx))
                     .DataSource(s3ReadObject.DataSource().Cast<TCoDataSource>())
                     .Settings(ctx.NewList(s3ReadObject.Object().Pos(), std::move(settings)))
                     .Done().Ptr();
+                }
+                return Build<TDqSourceWrap>(ctx, read->Pos())
+                .Input<TS3SourceSettings>()
+                    .Paths(s3ReadObject.Object().Paths())
+                    .Token<TCoSecureParam>()
+                        .Name().Build(token)
+                        .Build()
+                    .Build()
+                .RowType(ExpandType(s3ReadObject.Pos(), *rowType, ctx))
+                .DataSource(s3ReadObject.DataSource().Cast<TCoDataSource>())
+                .Settings(ctx.NewList(s3ReadObject.Object().Pos(), std::move(settings)))
+                .Done().Ptr();
             }
         }
         return read;
@@ -240,6 +263,12 @@ public:
                         srcDesc.MutableSettings()->insert({TString(settings.Ref().Child(i)->Head().Content()), TString(settings.Ref().Child(i)->Tail().IsAtom() ? settings.Ref().Child(i)->Tail().Content() : settings.Ref().Child(i)->Tail().Head().Content())});
                     }
                 }
+            } else if (const auto maySourceSettings = source.Settings().Maybe<TS3SourceSettings>()){
+                const auto sourceSettings = maySourceSettings.Cast();
+                auto expectedSize = sourceSettings.ExpectedSize();
+                if (expectedSize.IsValid()) {
+                    srcDesc.MutableSettings()->insert({"expectedSize", expectedSize.Cast().StringValue()});
+                }
             }
 
             if (extraColumnsType->GetSize()) {