|
@@ -87,7 +87,18 @@ void TWriteSessionImpl::Start(const TDuration& delay) {
|
|
|
}
|
|
|
Started = true;
|
|
|
|
|
|
- DoConnect(delay, DbDriverState->DiscoveryEndpoint);
|
|
|
+ if (Settings.PartitionId_.Defined() && Settings.DirectWriteToPartition_)
|
|
|
+ {
|
|
|
+ with_lock (Lock) {
|
|
|
+ PreferedPartitionLocation = {};
|
|
|
+
|
|
|
+ return ConnectToPreferedPartitionLocation(delay);
|
|
|
+ }
|
|
|
+ }
|
|
|
+ else
|
|
|
+ {
|
|
|
+ return Connect(delay);
|
|
|
+ }
|
|
|
}
|
|
|
|
|
|
TWriteSessionImpl::THandleResult TWriteSessionImpl::RestartImpl(const TPlainStatus& status) {
|
|
@@ -98,11 +109,7 @@ TWriteSessionImpl::THandleResult TWriteSessionImpl::RestartImpl(const TPlainStat
|
|
|
LOG_LAZY(DbDriverState->Log, TLOG_DEBUG, LogPrefix() << "Write session is aborting and will not restart");
|
|
|
return result;
|
|
|
}
|
|
|
- LOG_LAZY(DbDriverState->Log,
|
|
|
- TLOG_ERR,
|
|
|
- LogPrefix() << "Got error. Status: " << status.Status
|
|
|
- << ". Description: " << NPersQueue::IssuesSingleLineString(status.Issues)
|
|
|
- );
|
|
|
+ LOG_LAZY(DbDriverState->Log, TLOG_ERR, LogPrefix() << "Got error. " << status.ToDebugString());
|
|
|
SessionEstablished = false;
|
|
|
TMaybe<TDuration> nextDelay = TDuration::Zero();
|
|
|
if (!RetryState) {
|
|
@@ -113,10 +120,7 @@ TWriteSessionImpl::THandleResult TWriteSessionImpl::RestartImpl(const TPlainStat
|
|
|
if (nextDelay) {
|
|
|
result.StartDelay = *nextDelay;
|
|
|
result.DoRestart = true;
|
|
|
- LOG_LAZY(DbDriverState->Log,
|
|
|
- TLOG_DEBUG,
|
|
|
- LogPrefix() << "Write session will restart in " << result.StartDelay.MilliSeconds() << " ms"
|
|
|
- );
|
|
|
+ LOG_LAZY(DbDriverState->Log, TLOG_WARNING, LogPrefix() << "Write session will restart in " << result.StartDelay);
|
|
|
ResetForRetryImpl();
|
|
|
|
|
|
} else {
|
|
@@ -126,6 +130,127 @@ TWriteSessionImpl::THandleResult TWriteSessionImpl::RestartImpl(const TPlainStat
|
|
|
}
|
|
|
return result;
|
|
|
}
|
|
|
+
|
|
|
+void TWriteSessionImpl::ConnectToPreferedPartitionLocation(const TDuration& delay)
|
|
|
+{
|
|
|
+ Y_VERIFY(Lock.IsLocked());
|
|
|
+ Y_VERIFY(Settings.PartitionId_.Defined() && Settings.DirectWriteToPartition_);
|
|
|
+
|
|
|
+ if (AtomicGet(Aborting)) {
|
|
|
+ return;
|
|
|
+ }
|
|
|
+
|
|
|
+ LOG_LAZY(DbDriverState->Log, TLOG_INFO, LogPrefix() << "Get partition location async, partition " << *Settings.PartitionId_ << ", delay " << delay );
|
|
|
+
|
|
|
+ NGrpc::IQueueClientContextPtr prevDescribePartitionContext;
|
|
|
+ NGrpc::IQueueClientContextPtr describePartitionContext = Client->CreateContext();
|
|
|
+
|
|
|
+ if (!describePartitionContext) {
|
|
|
+ AbortImpl();
|
|
|
+ return;
|
|
|
+ }
|
|
|
+
|
|
|
+ prevDescribePartitionContext = std::exchange(DescribePartitionContext, describePartitionContext);
|
|
|
+ Y_ASSERT(DescribePartitionContext);
|
|
|
+ NPersQueue::Cancel(prevDescribePartitionContext);
|
|
|
+
|
|
|
+ Ydb::Topic::DescribePartitionRequest request;
|
|
|
+ request.set_path(Settings.Path_);
|
|
|
+ request.set_partition_id(*Settings.PartitionId_);
|
|
|
+ request.set_include_location(true);
|
|
|
+
|
|
|
+ auto extractor = [sharedThis = shared_from_this(), wire = Tracker->MakeTrackedWire(), context = describePartitionContext](Ydb::Topic::DescribePartitionResponse* response, TPlainStatus status) mutable {
|
|
|
+ Ydb::Topic::DescribePartitionResult result;
|
|
|
+ if (response)
|
|
|
+ response->operation().result().UnpackTo(&result);
|
|
|
+
|
|
|
+ TStatus st(std::move(status));
|
|
|
+ sharedThis->OnDescribePartition(st, result, context);
|
|
|
+ };
|
|
|
+
|
|
|
+ auto callback = [sharedThis = this->shared_from_this(), wire = Tracker->MakeTrackedWire(), req = std::move(request), extr = std::move(extractor), connections = std::shared_ptr<TGRpcConnectionsImpl>(Connections), dbState = DbDriverState, context = describePartitionContext]() mutable {
|
|
|
+ LOG_LAZY(dbState->Log, TLOG_DEBUG, sharedThis->LogPrefix() << " Getting partition location, partition " << sharedThis->Settings.PartitionId_);
|
|
|
+ connections->Run<Ydb::Topic::V1::TopicService, Ydb::Topic::DescribePartitionRequest, Ydb::Topic::DescribePartitionResponse>(
|
|
|
+ std::move(req),
|
|
|
+ std::move(extr),
|
|
|
+ &Ydb::Topic::V1::TopicService::Stub::AsyncDescribePartition,
|
|
|
+ dbState,
|
|
|
+ {},
|
|
|
+ context);
|
|
|
+ };
|
|
|
+
|
|
|
+ Connections->ScheduleOneTimeTask(std::move(callback), delay);
|
|
|
+}
|
|
|
+
|
|
|
+void TWriteSessionImpl::OnDescribePartition(const TStatus& status, const Ydb::Topic::DescribePartitionResult& proto, const NGrpc::IQueueClientContextPtr& describePartitionContext)
|
|
|
+{
|
|
|
+ LOG_LAZY(DbDriverState->Log, TLOG_INFO, LogPrefix() << "Got PartitionLocation response. Status " << status.GetStatus() << ", proto:\n" << proto.DebugString());
|
|
|
+ TString endpoint, name;
|
|
|
+ THandleResult handleResult;
|
|
|
+
|
|
|
+ with_lock (Lock) {
|
|
|
+ if (DescribePartitionContext == describePartitionContext)
|
|
|
+ DescribePartitionContext = nullptr;
|
|
|
+ else
|
|
|
+ return;
|
|
|
+ }
|
|
|
+
|
|
|
+ if (!status.IsSuccess()) {
|
|
|
+ with_lock (Lock) {
|
|
|
+ handleResult = OnErrorImpl({status.GetStatus(), NPersQueue::MakeIssueWithSubIssues("Failed to get partition location", status.GetIssues())});
|
|
|
+ }
|
|
|
+ ProcessHandleResult(handleResult);
|
|
|
+ return;
|
|
|
+ }
|
|
|
+
|
|
|
+ const Ydb::Topic::DescribeTopicResult_PartitionInfo& partition = proto.partition();
|
|
|
+ if (partition.partition_id() != Settings.PartitionId_ || !partition.has_partition_location() || partition.partition_location().node_id() == 0 || partition.partition_location().generation() == 0) {
|
|
|
+ with_lock (Lock) {
|
|
|
+ handleResult = OnErrorImpl({EStatus::INTERNAL_ERROR, "Wrong partition location"});
|
|
|
+ }
|
|
|
+ ProcessHandleResult(handleResult);
|
|
|
+ return;
|
|
|
+ }
|
|
|
+
|
|
|
+ TMaybe<TEndpointKey> preferedEndpoint;
|
|
|
+ with_lock (Lock) {
|
|
|
+ preferedEndpoint = GetPreferedEndpointImpl(*Settings.PartitionId_, partition.partition_location().node_id());
|
|
|
+ }
|
|
|
+
|
|
|
+ if (!preferedEndpoint.Defined()) {
|
|
|
+ with_lock (Lock) {
|
|
|
+ handleResult = OnErrorImpl({EStatus::UNAVAILABLE, "Partition prefered endpoint is not found"});
|
|
|
+ }
|
|
|
+ ProcessHandleResult(handleResult);
|
|
|
+ return;
|
|
|
+ }
|
|
|
+
|
|
|
+ with_lock (Lock) {
|
|
|
+ PreferedPartitionLocation = {*preferedEndpoint, partition.partition_location().generation()};
|
|
|
+ }
|
|
|
+
|
|
|
+ Connect(TDuration::Zero());
|
|
|
+}
|
|
|
+
|
|
|
+TMaybe<TEndpointKey> TWriteSessionImpl::GetPreferedEndpointImpl(ui32 partitionId, ui64 partitionNodeId) {
|
|
|
+ Y_VERIFY(Lock.IsLocked());
|
|
|
+
|
|
|
+ TEndpointKey preferedEndpoint{"", partitionNodeId};
|
|
|
+
|
|
|
+ bool nodeIsKnown = (bool)DbDriverState->EndpointPool.GetEndpoint(preferedEndpoint, true);
|
|
|
+ if (nodeIsKnown)
|
|
|
+ {
|
|
|
+ LOG_LAZY(DbDriverState->Log, TLOG_DEBUG, LogPrefix() << "GetPreferedEndpoint: partitionId " << partitionId << ", partitionNodeId " << partitionNodeId << " exists in the endpoint pool.");
|
|
|
+ return preferedEndpoint;
|
|
|
+ }
|
|
|
+ else
|
|
|
+ {
|
|
|
+ LOG_LAZY(DbDriverState->Log, TLOG_ERR, LogPrefix() << "GetPreferedEndpoint: partitionId " << partitionId << ", nodeId " << partitionNodeId << " does not exist in the endpoint pool.");
|
|
|
+ DbDriverState->EndpointPool.UpdateAsync();
|
|
|
+ return {};
|
|
|
+ }
|
|
|
+}
|
|
|
+
|
|
|
TString GenerateProducerId() {
|
|
|
return CreateGuidAsString();
|
|
|
}
|
|
@@ -280,9 +405,7 @@ TWriteSessionImpl::THandleResult TWriteSessionImpl::OnErrorImpl(NYdb::TPlainStat
|
|
|
}
|
|
|
|
|
|
// No lock
|
|
|
-void TWriteSessionImpl::DoConnect(const TDuration& delay, const TString& endpoint) {
|
|
|
- LOG_LAZY(DbDriverState->Log, TLOG_INFO, LogPrefix() << "Start write session. Will connect to endpoint: " << endpoint);
|
|
|
-
|
|
|
+void TWriteSessionImpl::Connect(const TDuration& delay) {
|
|
|
NGrpc::IQueueClientContextPtr prevConnectContext;
|
|
|
NGrpc::IQueueClientContextPtr prevConnectTimeoutContext;
|
|
|
NGrpc::IQueueClientContextPtr prevConnectDelayContext;
|
|
@@ -300,6 +423,9 @@ void TWriteSessionImpl::DoConnect(const TDuration& delay, const TString& endpoin
|
|
|
if (Aborting) {
|
|
|
return;
|
|
|
}
|
|
|
+
|
|
|
+ LOG_LAZY(DbDriverState->Log, TLOG_INFO, LogPrefix() << "Start write session. Will connect to nodeId: " << PreferedPartitionLocation.Endpoint.NodeId);
|
|
|
+
|
|
|
++ConnectionGeneration;
|
|
|
auto subclient = Client;
|
|
|
connectionFactory = subclient->CreateWriteSessionConnectionProcessorFactory();
|
|
@@ -334,9 +460,8 @@ void TWriteSessionImpl::DoConnect(const TDuration& delay, const TString& endpoin
|
|
|
if (prevConnectDelayContext)
|
|
|
NPersQueue::Cancel(prevConnectDelayContext);
|
|
|
NPersQueue::Cancel(prevConnectTimeoutContext);
|
|
|
- Y_ASSERT(connectContext);
|
|
|
- Y_ASSERT(connectTimeoutContext);
|
|
|
- reqSettings = TRpcRequestSettings::Make(Settings);
|
|
|
+
|
|
|
+ reqSettings = TRpcRequestSettings::Make(Settings, PreferedPartitionLocation.Endpoint);
|
|
|
|
|
|
connectCallback = [sharedThis = shared_from_this(),
|
|
|
wire = Tracker->MakeTrackedWire(),
|
|
@@ -440,11 +565,24 @@ void TWriteSessionImpl::InitImpl() {
|
|
|
auto* init = req.mutable_init_request();
|
|
|
init->set_path(Settings.Path_);
|
|
|
init->set_producer_id(Settings.ProducerId_);
|
|
|
-
|
|
|
- if (Settings.PartitionId_.Defined())
|
|
|
- init->set_partition_id(*Settings.PartitionId_);
|
|
|
+
|
|
|
+ if (Settings.PartitionId_.Defined()) {
|
|
|
+ if (Settings.DirectWriteToPartition_) {
|
|
|
+ auto* partitionWithGeneration = init->mutable_partition_with_generation();
|
|
|
+ partitionWithGeneration->set_partition_id(*Settings.PartitionId_);
|
|
|
+ partitionWithGeneration->set_generation(PreferedPartitionLocation.Generation);
|
|
|
+ LOG_LAZY(DbDriverState->Log, TLOG_DEBUG, LogPrefix() << "Write session: direct write to partition: " << *Settings.PartitionId_ << ", generation " << PreferedPartitionLocation.Generation);
|
|
|
+ }
|
|
|
+ else {
|
|
|
+ init->set_partition_id(*Settings.PartitionId_);
|
|
|
+ LOG_LAZY(DbDriverState->Log, TLOG_DEBUG, LogPrefix() << "Write session: write to partition: " << *Settings.PartitionId_);
|
|
|
+ }
|
|
|
+ }
|
|
|
else
|
|
|
+ {
|
|
|
init->set_message_group_id(Settings.MessageGroupId_);
|
|
|
+ LOG_LAZY(DbDriverState->Log, TLOG_DEBUG, LogPrefix() << "Write session: write to message_group: " << Settings.MessageGroupId_);
|
|
|
+ }
|
|
|
|
|
|
for (const auto& attr : Settings.Meta_.Fields) {
|
|
|
(*init->mutable_write_session_meta())[attr.first] = attr.second;
|
|
@@ -494,6 +632,8 @@ void TWriteSessionImpl::ReadFromProcessor() {
|
|
|
}
|
|
|
|
|
|
void TWriteSessionImpl::OnWriteDone(NGrpc::TGrpcStatus&& status, size_t connectionGeneration) {
|
|
|
+ LOG_LAZY(DbDriverState->Log, TLOG_DEBUG, LogPrefix() << "Write session: OnWriteDone " << status.ToDebugString());
|
|
|
+
|
|
|
THandleResult handleResult;
|
|
|
with_lock (Lock) {
|
|
|
if (connectionGeneration != ConnectionGeneration) {
|
|
@@ -510,6 +650,8 @@ void TWriteSessionImpl::OnWriteDone(NGrpc::TGrpcStatus&& status, size_t connecti
|
|
|
}
|
|
|
|
|
|
void TWriteSessionImpl::OnReadDone(NGrpc::TGrpcStatus&& grpcStatus, size_t connectionGeneration) {
|
|
|
+ LOG_LAZY(DbDriverState->Log, TLOG_DEBUG, LogPrefix() << "Write session: OnReadDone " << grpcStatus.ToDebugString());
|
|
|
+
|
|
|
TPlainStatus errorStatus;
|
|
|
TProcessSrvMessageResult processResult;
|
|
|
bool needSetValue = false;
|
|
@@ -558,7 +700,18 @@ void TWriteSessionImpl::OnReadDone(NGrpc::TGrpcStatus&& grpcStatus, size_t conne
|
|
|
}
|
|
|
|
|
|
TStringBuilder TWriteSessionImpl::LogPrefix() const {
|
|
|
- return TStringBuilder() << "ProducerId [" << Settings.ProducerId_ << "] MessageGroupId [" << Settings.MessageGroupId_ << "] SessionId [" << SessionId << "] ";
|
|
|
+ TStringBuilder ret;
|
|
|
+ ret << " SessionId [" << SessionId << "] ";
|
|
|
+
|
|
|
+ if (Settings.PartitionId_.Defined()) {
|
|
|
+ ret << " PartitionId [" << *Settings.PartitionId_ << "] ";
|
|
|
+ if (Settings.DirectWriteToPartition_)
|
|
|
+ ret << " Generation [" << PreferedPartitionLocation.Generation << "] ";
|
|
|
+ } else {
|
|
|
+ ret << " MessageGroupId [" << Settings.MessageGroupId_ << "] ";
|
|
|
+ }
|
|
|
+
|
|
|
+ return ret;
|
|
|
}
|
|
|
|
|
|
template<>
|
|
@@ -1053,10 +1206,7 @@ void TWriteSessionImpl::SendImpl() {
|
|
|
bool TWriteSessionImpl::Close(TDuration closeTimeout) {
|
|
|
if (AtomicGet(Aborting))
|
|
|
return false;
|
|
|
- LOG_LAZY(DbDriverState->Log,
|
|
|
- TLOG_INFO,
|
|
|
- LogPrefix() << "Write session: close. Timeout = " << closeTimeout.MilliSeconds() << " ms"
|
|
|
- );
|
|
|
+ LOG_LAZY(DbDriverState->Log, TLOG_INFO, LogPrefix() << "Write session: close. Timeout " << closeTimeout);
|
|
|
auto startTime = TInstant::Now();
|
|
|
auto remaining = closeTimeout;
|
|
|
bool ready = false;
|
|
@@ -1088,11 +1238,7 @@ bool TWriteSessionImpl::Close(TDuration closeTimeout) {
|
|
|
if (ready) {
|
|
|
LOG_LAZY(DbDriverState->Log, TLOG_INFO, LogPrefix() << "Write session: gracefully shut down, all writes complete");
|
|
|
} else {
|
|
|
- LOG_LAZY(DbDriverState->Log,
|
|
|
- TLOG_WARNING,
|
|
|
- LogPrefix() << "Write session: could not confirm all writes in time"
|
|
|
- << " or session aborted, perform hard shutdown"
|
|
|
- );
|
|
|
+ LOG_LAZY(DbDriverState->Log, TLOG_WARNING, LogPrefix() << "Write session: could not confirm all writes in time or session aborted, perform hard shutdown");
|
|
|
}
|
|
|
return ready;
|
|
|
}
|
|
@@ -1169,6 +1315,7 @@ void TWriteSessionImpl::AbortImpl() {
|
|
|
if (!AtomicGet(Aborting)) {
|
|
|
LOG_LAZY(DbDriverState->Log, TLOG_DEBUG, LogPrefix() << "Write session: aborting");
|
|
|
AtomicSet(Aborting, 1);
|
|
|
+ NPersQueue::Cancel(DescribePartitionContext);
|
|
|
NPersQueue::Cancel(ConnectContext);
|
|
|
NPersQueue::Cancel(ConnectTimeoutContext);
|
|
|
NPersQueue::Cancel(ConnectDelayContext);
|