|
@@ -1,4 +1,5 @@
|
|
|
#include "yql_generic_read_actor.h"
|
|
|
+#include "yql_generic_token_provider.h"
|
|
|
|
|
|
#include <ydb/library/actors/core/actor_bootstrapped.h>
|
|
|
#include <ydb/library/actors/core/actorsystem.h>
|
|
@@ -9,11 +10,9 @@
|
|
|
#include <ydb/library/yql/core/yql_expr_type_annotation.h>
|
|
|
#include <ydb/library/yql/minikql/computation/mkql_computation_node_holders.h>
|
|
|
#include <ydb/library/yql/providers/common/provider/yql_provider_names.h>
|
|
|
-#include <ydb/library/yql/providers/common/structured_token/yql_token_builder.h>
|
|
|
#include <ydb/library/yql/providers/generic/connector/api/service/protos/connector.pb.h>
|
|
|
#include <ydb/library/yql/providers/generic/connector/libcpp/error.h>
|
|
|
#include <ydb/library/yql/providers/generic/connector/libcpp/utils.h>
|
|
|
-#include <ydb/library/yql/providers/generic/proto/range.pb.h>
|
|
|
#include <ydb/library/yql/public/udf/arrow/util.h>
|
|
|
#include <ydb/library/yql/utils/log/log.h>
|
|
|
#include <ydb/library/yql/utils/yql_panic.h>
|
|
@@ -104,14 +103,14 @@ namespace NYql::NDq {
|
|
|
ui64 inputIndex,
|
|
|
TCollectStatsLevel statsLevel,
|
|
|
NConnector::IClient::TPtr client,
|
|
|
- NYdb::TCredentialsProviderPtr credentialsProvider,
|
|
|
- NConnector::TSource&& source,
|
|
|
+ TGenericTokenProvider::TPtr tokenProvider,
|
|
|
+ Generic::TSource&& source,
|
|
|
const NActors::TActorId& computeActorId,
|
|
|
const NKikimr::NMiniKQL::THolderFactory& holderFactory)
|
|
|
: InputIndex_(inputIndex)
|
|
|
, ComputeActorId_(computeActorId)
|
|
|
, Client_(std::move(client))
|
|
|
- , CredentialsProvider_(std::move(credentialsProvider))
|
|
|
+ , TokenProvider_(std::move(tokenProvider))
|
|
|
, HolderFactory_(holderFactory)
|
|
|
, Source_(source)
|
|
|
{
|
|
@@ -146,7 +145,7 @@ namespace NYql::NDq {
|
|
|
// Prepare request
|
|
|
NConnector::NApi::TListSplitsRequest request;
|
|
|
NConnector::NApi::TSelect select = Source_.select(); // copy TSelect from source
|
|
|
- MaybeRefreshToken(select.mutable_data_source_instance());
|
|
|
+ TokenProvider_->MaybeFillToken(*select.mutable_data_source_instance());
|
|
|
*request.mutable_selects()->Add() = std::move(select);
|
|
|
|
|
|
// Initialize stream
|
|
@@ -242,7 +241,7 @@ namespace NYql::NDq {
|
|
|
Splits_.cbegin(), Splits_.cend(),
|
|
|
[&](const NConnector::NApi::TSplit& split) {
|
|
|
NConnector::NApi::TSplit splitCopy = split;
|
|
|
- MaybeRefreshToken(splitCopy.mutable_select()->mutable_data_source_instance());
|
|
|
+ TokenProvider_->MaybeFillToken(*splitCopy.mutable_select()->mutable_data_source_instance());
|
|
|
*request.mutable_splits()->Add() = std::move(split);
|
|
|
});
|
|
|
|
|
@@ -459,20 +458,6 @@ namespace NYql::NDq {
|
|
|
return total;
|
|
|
}
|
|
|
|
|
|
- void MaybeRefreshToken(NConnector::NApi::TDataSourceInstance* dsi) const {
|
|
|
- if (!dsi->credentials().has_token()) {
|
|
|
- return;
|
|
|
- }
|
|
|
-
|
|
|
- // Token may have expired. Refresh it.
|
|
|
- Y_ENSURE(CredentialsProvider_, "CredentialsProvider is not initialized");
|
|
|
- auto iamToken = CredentialsProvider_->GetAuthInfo();
|
|
|
- Y_ENSURE(iamToken, "empty IAM token");
|
|
|
-
|
|
|
- *dsi->mutable_credentials()->mutable_token()->mutable_value() = iamToken;
|
|
|
- *dsi->mutable_credentials()->mutable_token()->mutable_type() = "IAM";
|
|
|
- }
|
|
|
-
|
|
|
// IActor & IDqComputeActorAsyncInput
|
|
|
void PassAway() override { // Is called from Compute Actor
|
|
|
YQL_CLOG(INFO, ProviderGeneric) << "PassAway :: final ingress stats"
|
|
@@ -505,7 +490,7 @@ namespace NYql::NDq {
|
|
|
const NActors::TActorId ComputeActorId_;
|
|
|
|
|
|
NConnector::IClient::TPtr Client_;
|
|
|
- NYdb::TCredentialsProviderPtr CredentialsProvider_;
|
|
|
+ TGenericTokenProvider::TPtr TokenProvider_;
|
|
|
NConnector::IListSplitsStreamIterator::TPtr ListSplitsIterator_;
|
|
|
TVector<NConnector::NApi::TSplit> Splits_; // accumulated list of table splits
|
|
|
NConnector::IReadSplitsStreamIterator::TPtr ReadSplitsIterator_;
|
|
@@ -514,12 +499,12 @@ namespace NYql::NDq {
|
|
|
|
|
|
NKikimr::NMiniKQL::TPlainContainerCache ArrowRowContainerCache_;
|
|
|
const NKikimr::NMiniKQL::THolderFactory& HolderFactory_;
|
|
|
- NConnector::TSource Source_;
|
|
|
+ Generic::TSource Source_;
|
|
|
};
|
|
|
|
|
|
std::pair<NYql::NDq::IDqComputeActorAsyncInput*, IActor*>
|
|
|
CreateGenericReadActor(NConnector::IClient::TPtr genericClient,
|
|
|
- NConnector::TSource&& source,
|
|
|
+ Generic::TSource&& source,
|
|
|
ui64 inputIndex,
|
|
|
TCollectStatsLevel statsLevel,
|
|
|
const THashMap<TString, TString>& /*secureParams*/,
|
|
@@ -548,24 +533,6 @@ namespace NYql::NDq {
|
|
|
*/
|
|
|
|
|
|
// Obtain token to access remote data source if necessary
|
|
|
- NYdb::TCredentialsProviderPtr credentialProvider;
|
|
|
- if (source.GetServiceAccountId() && source.GetServiceAccountIdSignature()) {
|
|
|
- Y_ENSURE(credentialsFactory, "CredentialsFactory is not initialized");
|
|
|
-
|
|
|
- auto structuredTokenJSON = TStructuredTokenBuilder().SetServiceAccountIdAuth(
|
|
|
- source.GetServiceAccountId(), source.GetServiceAccountIdSignature())
|
|
|
- .ToJson();
|
|
|
-
|
|
|
- // If service account is provided, obtain IAM-token
|
|
|
- Y_ENSURE(structuredTokenJSON, "empty structured token");
|
|
|
-
|
|
|
- auto credentialsProviderFactory = CreateCredentialsProviderFactoryForStructuredToken(
|
|
|
- credentialsFactory,
|
|
|
- structuredTokenJSON,
|
|
|
- false);
|
|
|
- credentialProvider = credentialsProviderFactory->CreateProvider();
|
|
|
- }
|
|
|
-
|
|
|
// TODO: partitioning is not implemented now, but this code will be useful for the further research:
|
|
|
/*
|
|
|
TStringBuilder part;
|
|
@@ -579,11 +546,13 @@ namespace NYql::NDq {
|
|
|
part << ';';
|
|
|
*/
|
|
|
|
|
|
+ auto tokenProvider = CreateGenericTokenProvider(source, credentialsFactory);
|
|
|
+
|
|
|
const auto actor = new TGenericReadActor(
|
|
|
inputIndex,
|
|
|
statsLevel,
|
|
|
genericClient,
|
|
|
- std::move(credentialProvider),
|
|
|
+ std::move(tokenProvider),
|
|
|
std::move(source),
|
|
|
computeActorId,
|
|
|
holderFactory);
|