|
@@ -462,10 +462,16 @@ private:
|
|
|
NYql::TIssues issues;
|
|
|
NYql::IssuesFromMessage(res->Record.GetIssues(), issues);
|
|
|
|
|
|
- LOG_D("Recv EvWriteResult from ShardID=" << shardId
|
|
|
+ LOG_D("Recv EvWriteResult (prepare) from ShardID=" << shardId
|
|
|
<< ", Status=" << NKikimrDataEvents::TEvWriteResult::EStatus_Name(ev->Get()->GetStatus())
|
|
|
<< ", TxId=" << ev->Get()->Record.GetTxId()
|
|
|
- << ", LocksCount= " << ev->Get()->Record.GetTxLocks().size()
|
|
|
+ << ", Locks= " << [&]() {
|
|
|
+ TStringBuilder builder;
|
|
|
+ for (const auto& lock : ev->Get()->Record.GetTxLocks()) {
|
|
|
+ builder << lock.ShortDebugString();
|
|
|
+ }
|
|
|
+ return builder;
|
|
|
+ }()
|
|
|
<< ", Cookie=" << ev->Cookie
|
|
|
<< ", error=" << issues.ToString());
|
|
|
|
|
@@ -486,6 +492,18 @@ private:
|
|
|
case NKikimrDataEvents::TEvWriteResult::STATUS_COMPLETED: {
|
|
|
YQL_ENSURE(false);
|
|
|
}
|
|
|
+ case NKikimrDataEvents::TEvWriteResult::STATUS_LOCKS_BROKEN: {
|
|
|
+ LOG_D("Broken locks: " << res->Record.DebugString());
|
|
|
+ YQL_ENSURE(shardState->State == TShardState::EState::Preparing);
|
|
|
+ Counters->TxProxyMon->TxResultAborted->Inc();
|
|
|
+ LocksBroken = true;
|
|
|
+
|
|
|
+ YQL_ENSURE(!res->Record.GetTxLocks().empty());
|
|
|
+ ResponseEv->BrokenLockPathId = NYql::TKikimrPathId(
|
|
|
+ res->Record.GetTxLocks(0).GetSchemeShard(),
|
|
|
+ res->Record.GetTxLocks(0).GetPathId());
|
|
|
+ ReplyErrorAndDie(Ydb::StatusIds::ABORTED, {});
|
|
|
+ }
|
|
|
default:
|
|
|
{
|
|
|
return ShardError(res->Record);
|
|
@@ -863,6 +881,7 @@ private:
|
|
|
return ReplyErrorAndDie(Ydb::StatusIds::SCHEME_ERROR, issues);
|
|
|
}
|
|
|
case NKikimrDataEvents::TEvWriteResult::STATUS_LOCKS_BROKEN: {
|
|
|
+ issues.AddIssue(NYql::YqlIssue({}, TIssuesIds::KIKIMR_LOCKS_INVALIDATED, "Transaction locks invalidated."));
|
|
|
return ReplyErrorAndDie(Ydb::StatusIds::ABORTED, issues);
|
|
|
}
|
|
|
}
|
|
@@ -923,6 +942,7 @@ private:
|
|
|
}
|
|
|
|
|
|
void ExecutePlanned() {
|
|
|
+ YQL_ENSURE(!LocksBroken);
|
|
|
YQL_ENSURE(TxCoordinator);
|
|
|
auto ev = MakeHolder<TEvTxProxy::TEvProposeTransaction>();
|
|
|
ev->Record.SetCoordinatorID(TxCoordinator);
|
|
@@ -1133,10 +1153,16 @@ private:
|
|
|
NYql::TIssues issues;
|
|
|
NYql::IssuesFromMessage(res->Record.GetIssues(), issues);
|
|
|
|
|
|
- LOG_D("Recv EvWriteResult from ShardID=" << shardId
|
|
|
+ LOG_D("Recv EvWriteResult (execute) from ShardID=" << shardId
|
|
|
<< ", Status=" << NKikimrDataEvents::TEvWriteResult::EStatus_Name(ev->Get()->GetStatus())
|
|
|
<< ", TxId=" << ev->Get()->Record.GetTxId()
|
|
|
- << ", LocksCount= " << ev->Get()->Record.GetTxLocks().size()
|
|
|
+ << ", Locks= " << [&]() {
|
|
|
+ TStringBuilder builder;
|
|
|
+ for (const auto& lock : ev->Get()->Record.GetTxLocks()) {
|
|
|
+ builder << lock.ShortDebugString();
|
|
|
+ }
|
|
|
+ return builder;
|
|
|
+ }()
|
|
|
<< ", Cookie=" << ev->Cookie
|
|
|
<< ", error=" << issues.ToString());
|
|
|
|
|
@@ -1167,16 +1193,11 @@ private:
|
|
|
shardState->State = TShardState::EState::Finished;
|
|
|
Counters->TxProxyMon->TxResultAborted->Inc();
|
|
|
LocksBroken = true;
|
|
|
-
|
|
|
- if (!res->Record.GetTxLocks().empty()) {
|
|
|
- ResponseEv->BrokenLockPathId = NYql::TKikimrPathId(
|
|
|
- res->Record.GetTxLocks(0).GetSchemeShard(),
|
|
|
- res->Record.GetTxLocks(0).GetPathId());
|
|
|
- return ReplyErrorAndDie(Ydb::StatusIds::ABORTED, {});
|
|
|
- }
|
|
|
-
|
|
|
- CheckExecutionComplete();
|
|
|
- return;
|
|
|
+ YQL_ENSURE(!res->Record.GetTxLocks().empty());
|
|
|
+ ResponseEv->BrokenLockPathId = NYql::TKikimrPathId(
|
|
|
+ res->Record.GetTxLocks(0).GetSchemeShard(),
|
|
|
+ res->Record.GetTxLocks(0).GetPathId());
|
|
|
+ ReplyErrorAndDie(Ydb::StatusIds::ABORTED, {});
|
|
|
}
|
|
|
default:
|
|
|
{
|
|
@@ -1722,7 +1743,13 @@ private:
|
|
|
<< ", LocksOp=" << NKikimrDataEvents::TKqpLocks::ELocksOp_Name(evWriteTransaction->Record.GetLocks().GetOp())
|
|
|
<< ", SendingShards=" << shardsToString(evWriteTransaction->Record.GetLocks().GetSendingShards())
|
|
|
<< ", ReceivingShards=" << shardsToString(evWriteTransaction->Record.GetLocks().GetReceivingShards())
|
|
|
- << ", LocksCount= " << evWriteTransaction->Record.GetLocks().LocksSize());
|
|
|
+ << ", Locks= " << [&]() {
|
|
|
+ TStringBuilder builder;
|
|
|
+ for (const auto& lock : evWriteTransaction->Record.GetLocks().GetLocks()) {
|
|
|
+ builder << lock.ShortDebugString();
|
|
|
+ }
|
|
|
+ return builder;
|
|
|
+ }());
|
|
|
|
|
|
LOG_D("ExecuteEvWriteTransaction traceId.verbosity: " << std::to_string(traceId.GetVerbosity()));
|
|
|
|