Browse Source

Added import s3 to directory (#678)

* Added directory import in ydb import s3 cmd

* Added path "." handling

* Fixed bugs

* Reverted ya make

* Fixed issues

* Fixed diff

* Replaced emplace_back to push_back

* Fixed issue

* Fixed leak in help

* Fixed issues

* Remove dump.h include

* Fixed excess space

* Fixed issue
Bulat 1 year ago
parent
commit
909ffbad0e

+ 37 - 4
ydb/public/lib/ydb_cli/commands/ydb_service_import.cpp

@@ -1,5 +1,7 @@
 #include "ydb_service_import.h"
 
+#include "ydb_common.h"
+
 #include <ydb/public/lib/ydb_cli/common/normalize_path.h>
 #include <ydb/public/lib/ydb_cli/common/print_operation.h>
 #include <ydb/public/lib/ydb_cli/common/interactive.h>
@@ -16,6 +18,10 @@
 #include <unistd.h>
 #endif
 
+namespace NYdb::NDump {
+    extern const char SCHEME_FILE_NAME[];
+}
+
 namespace NYdb::NConsoleClient {
 
 TCommandImport::TCommandImport()
@@ -120,16 +126,43 @@ int TCommandImportFromS3::Run(TConfig& config) {
     settings.AccessKey(AwsAccessKey);
     settings.SecretKey(AwsSecretKey);
 
-    for (const auto& item : Items) {
-        settings.AppendItem({item.Source, item.Destination});
-    }
-
     if (Description) {
         settings.Description(Description);
     }
 
     settings.NumberOfRetries(NumberOfRetries);
 
+    InitAwsAPI();
+    try {
+        auto s3Client = CreateS3ClientWrapper(settings);
+        for (auto item : Items) {
+            std::optional<TString> token;
+            if (!item.Source.empty() && item.Source.back() != '/') {
+                item.Source += "/";
+            }
+            if (!item.Destination.empty() && item.Destination.back() == '.') {
+                item.Destination.pop_back();
+            }
+            if (item.Destination.empty() || item.Destination.back() != '/') {
+                item.Destination += "/";
+            }
+            do {
+                auto listResult = s3Client->ListObjectKeys(item.Source, token);
+                token = listResult.NextToken;
+                for (TStringBuf key : listResult.Keys) {
+                    if (key.ChopSuffix(NDump::SCHEME_FILE_NAME)) {
+                        TString destination = item.Destination + key.substr(item.Source.Size());
+                        settings.AppendItem({TString(key), std::move(destination)});
+                    }
+                }
+            } while (token);
+        }
+    } catch (...) {
+        ShutdownAwsAPI();
+        throw;
+    }
+    ShutdownAwsAPI();
+
     TImportClient client(CreateDriver(config));
     TImportFromS3Response response = client.ImportFromS3(std::move(settings)).GetValueSync();
     ThrowOnError(response);

+ 0 - 1
ydb/public/lib/ydb_cli/commands/ydb_service_import.h

@@ -1,7 +1,6 @@
 #pragma once
 
 #include "ydb_command.h"
-#include "ydb_common.h"
 
 #include <ydb/public/sdk/cpp/client/ydb_import/import.h>
 #include <ydb/public/sdk/cpp/client/ydb_table/table.h>

+ 67 - 0
ydb/public/lib/ydb_cli/common/aws.cpp

@@ -1,5 +1,12 @@
 #include "aws.h"
 
+#include <ydb/public/sdk/cpp/client/ydb_import/import.h>
+
+#include <aws/core/Aws.h>
+#include <aws/core/auth/AWSCredentialsProvider.h>
+#include <aws/s3/S3Client.h>
+#include <aws/s3/model/ListObjectsV2Request.h>
+
 namespace NYdb::NConsoleClient {
 
 const TString TCommandWithAwsCredentials::AwsCredentialsFile = "~/.aws/credentials";
@@ -34,4 +41,64 @@ TString TCommandWithAwsCredentials::ReadIniKey(const TString& iniKey) {
     }
 }
 
+class TS3ClientWrapper : public IS3ClientWrapper {
+public:
+    TS3ClientWrapper(const NImport::TImportFromS3Settings& settings) 
+        : Bucket(settings.Bucket_)
+    {
+        Aws::S3::S3ClientConfiguration config;
+        config.endpointOverride = settings.Endpoint_;
+        if (settings.Scheme_ == ES3Scheme::HTTP) {
+            config.scheme = Aws::Http::Scheme::HTTP;
+        } else if (settings.Scheme_ == ES3Scheme::HTTPS) {
+            config.scheme = Aws::Http::Scheme::HTTPS;
+        } else {
+            throw TMisuseException() << "\"" << settings.Scheme_ << "\" scheme type is not supported";
+        }
+
+        Client = std::make_unique<Aws::S3::S3Client>(
+            Aws::Auth::AWSCredentials(settings.AccessKey_, settings.SecretKey_),
+            config,
+            Aws::Client::AWSAuthV4Signer::PayloadSigningPolicy::Never,
+            true);
+    }
+
+    TListS3Result ListObjectKeys(const TString& prefix, const std::optional<TString>& token) override {
+        auto request = Aws::S3::Model::ListObjectsV2Request()
+            .WithBucket(Bucket)
+            .WithPrefix(prefix);
+        if (token) {
+            request.WithContinuationToken(*token);
+        }
+        auto response = Client->ListObjectsV2(request);
+        if (!response.IsSuccess()) {
+            throw TMisuseException() << "ListObjectKeys error: " << response.GetError().GetMessage();
+        }
+        TListS3Result result;
+        for (const auto& object : response.GetResult().GetContents()) {
+            result.Keys.push_back(TString(object.GetKey()));
+        }
+        if (response.GetResult().GetIsTruncated()) {
+            result.NextToken = TString(response.GetResult().GetNextContinuationToken());
+        }
+        return result;
+    }
+
+private:
+    std::unique_ptr<Aws::S3::S3Client> Client;
+    const TString Bucket;
+};
+
+std::unique_ptr<IS3ClientWrapper> CreateS3ClientWrapper(const NImport::TImportFromS3Settings& settings) {
+    return std::make_unique<TS3ClientWrapper>(settings);
+}
+
+void InitAwsAPI() {
+    Aws::InitAPI(Aws::SDKOptions());
+}
+
+void ShutdownAwsAPI() {
+    Aws::ShutdownAPI(Aws::SDKOptions());
+}
+
 }

+ 20 - 0
ydb/public/lib/ydb_cli/common/aws.h

@@ -7,6 +7,10 @@
 #include <util/generic/maybe.h>
 #include <util/system/env.h>
 
+namespace NYdb::NImport {
+    struct TImportFromS3Settings;
+}
+
 namespace NYdb::NConsoleClient {
 
 class TCommandWithAwsCredentials {
@@ -60,4 +64,20 @@ private:
     TMaybe<TString> AwsProfile;
 };
 
+struct TListS3Result {
+    std::vector<TString> Keys;
+    std::optional<TString> NextToken;
+};
+
+class IS3ClientWrapper {
+public:
+    virtual TListS3Result ListObjectKeys(const TString& prefix, const std::optional<TString>& token) = 0;
+    virtual ~IS3ClientWrapper() = default;
+};
+
+std::unique_ptr<IS3ClientWrapper> CreateS3ClientWrapper(const NImport::TImportFromS3Settings& settings);
+
+void InitAwsAPI();
+void ShutdownAwsAPI();
+
 }

+ 1 - 0
ydb/public/lib/ydb_cli/common/ya.make

@@ -31,6 +31,7 @@ SRCS(
 )
 
 PEERDIR(
+    contrib/libs/aws-sdk-cpp/aws-cpp-sdk-s3
     library/cpp/config
     library/cpp/getopt
     library/cpp/json/writer