123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737738739740741742743744745746747748749750751752753754755756757758759760761762763764765766767768769770771772773774775776777778779780781782783784785786787788789790791792793794795796797798799800801802803804805806807808809810811812813814815816817818819820821822823824825826827828829830831832833834835836837838839840841842843844845846847848849850851852853854855856857858859860861862863864865866867868869870871872873874875876877878879880881882883884885886887888889890891892893894895896897898899900901902903904905906907908909910911912913914915916917918919920921922923924925926927928929930931932933934935936937938939940941942943944945946947948949950951952953954955956957958959960961962963964965966967968969970971972973974975976977978979980981982983984985986987988989990991992993994995996997998999100010011002100310041005100610071008100910101011101210131014101510161017101810191020102110221023102410251026102710281029103010311032103310341035103610371038103910401041104210431044104510461047104810491050105110521053105410551056105710581059106010611062106310641065106610671068106910701071107210731074107510761077107810791080108110821083108410851086108710881089109010911092109310941095109610971098109911001101110211031104110511061107110811091110111111121113111411151116111711181119112011211122112311241125112611271128112911301131113211331134113511361137113811391140114111421143114411451146114711481149115011511152115311541155115611571158115911601161116211631164116511661167116811691170117111721173117411751176117711781179118011811182118311841185118611871188118911901191119211931194119511961197119811991200120112021203120412051206120712081209121012111212121312141215121612171218121912201221122212231224122512261227122812291230123112321233123412351236123712381239124012411242124312441245124612471248124912501251125212531254125512561257125812591260126112621263126412651266126712681269127012711272127312741275127612771278127912801281128212831284128512861287128812891290129112921293129412951296129712981299130013011302130313041305130613071308130913101311131213131314131513161317131813191320132113221323132413251326132713281329133013311332133313341335133613371338133913401341134213431344134513461347134813491350135113521353135413551356135713581359136013611362136313641365136613671368136913701371137213731374137513761377137813791380138113821383138413851386138713881389139013911392139313941395139613971398139914001401140214031404140514061407140814091410141114121413141414151416141714181419142014211422142314241425142614271428142914301431143214331434143514361437143814391440144114421443144414451446144714481449145014511452145314541455145614571458145914601461146214631464146514661467146814691470147114721473147414751476147714781479148014811482148314841485148614871488148914901491149214931494149514961497149814991500150115021503150415051506150715081509151015111512151315141515151615171518151915201521152215231524152515261527152815291530153115321533153415351536153715381539154015411542154315441545154615471548154915501551155215531554155515561557155815591560 |
- #include "schemeshard__operation.h"
- #include "schemeshard__operation_part.h"
- #include "schemeshard__operation_side_effects.h"
- #include "schemeshard__operation_memory_changes.h"
- #include "schemeshard__operation_db_changes.h"
- #include "schemeshard_audit_log_fragment.h"
- #include "ydb/core/audit/audit_log.h"
- #include "schemeshard_impl.h"
- #include <ydb/core/tablet/tablet_exception.h>
- #include <ydb/core/tablet_flat/flat_cxx_database.h>
- #include <ydb/core/tablet_flat/tablet_flat_executor.h>
- #include <util/generic/algorithm.h>
- namespace NKikimr::NSchemeShard {
- using namespace NTabletFlatExecutor;
- std::tuple<TMaybe<NACLib::TUserToken>, bool> ParseUserToken(const TString& tokenStr) {
- TMaybe<NACLib::TUserToken> result;
- bool parseError = false;
- if (!tokenStr.empty()) {
- NACLibProto::TUserToken tokenPb;
- if (tokenPb.ParseFromString(tokenStr)) {
- result = NACLib::TUserToken(tokenPb);
- } else {
- parseError = true;
- }
- }
- return std::make_tuple(result, parseError);
- }
- TString RenderPaths(const TVector<TString>& paths) {
- auto result = TStringBuilder();
- result << "[" << JoinStrings(paths.begin(), paths.end(), ", ") << "]";
- return result;
- }
- void AuditLogModifySchemeTransaction(const NKikimrScheme::TEvModifySchemeTransaction& request, const NKikimrScheme::TEvModifySchemeTransactionResult& response, TSchemeShard* SS, const TString& userSID) {
- // Each TEvModifySchemeTransaction.Transaction is a self sufficient operation and should be logged independently
- // (even if it was packed into a single TxProxy transaction with some other operations).
- //NOTE: UserSIDNone couldn't be an empty string as "subject" field is a required one,
- // but AUDIT_PART() skips any part with an empty value
- static const TString EmptyValue = "{none}";
- for (const auto& operation : request.GetTransaction()) {
- auto logEntry = MakeAuditLogFragment(operation);
- auto databasePath = TPath::Resolve(operation.GetWorkingDir(), SS);
- if (!databasePath.IsResolved()) {
- databasePath.RiseUntilFirstResolvedParent();
- }
- auto peerName = request.GetPeerName();
- AUDIT_LOG(
- AUDIT_PART("txId", std::to_string(request.GetTxId()))
- AUDIT_PART("remote_address", (!peerName.empty() ? peerName : EmptyValue) )
- AUDIT_PART("subject", (!userSID.empty() ? userSID : EmptyValue))
- AUDIT_PART("database", (!databasePath.IsEmpty() ? databasePath.GetDomainPathString() : EmptyValue))
- AUDIT_PART("operation", logEntry.Operation)
- AUDIT_PART("paths", RenderPaths(logEntry.Paths), !logEntry.Paths.empty())
- AUDIT_PART("status", NKikimrScheme::EStatus_Name(response.GetStatus()))
- AUDIT_PART("reason", response.GetReason(), response.HasReason())
- );
- }
- }
- struct TSchemeShard::TTxOperationProposeCancelTx: public NTabletFlatExecutor::TTransactionBase<TSchemeShard> {
- TEvSchemeShard::TEvCancelTx::TPtr Ev;
- TSideEffects OnComplete;
- TMemoryChanges MemChanges;
- TStorageChanges DbChanges;
- TTxOperationProposeCancelTx(TSchemeShard* self, TEvSchemeShard::TEvCancelTx::TPtr ev)
- : TBase(self)
- , Ev(ev)
- {}
- TTxType GetTxType() const override { return TXTYPE_CANCEL_BACKUP_IMPL; }
- bool Execute(NTabletFlatExecutor::TTransactionContext& txc, const TActorContext& ctx) override {
- const auto& record = Ev->Get()->Record;
- LOG_DEBUG_S(ctx, NKikimrServices::FLAT_TX_SCHEMESHARD,
- "TTxOperationProposeCancelTx Execute"
- << ", at schemeshard: " << Self->TabletID()
- << ", message: " << record.ShortDebugString());
- txc.DB.NoMoreReadsForTx();
- ISubOperation::TPtr part = CreateTxCancelTx(Ev);
- TOperationContext context{Self, txc, ctx, OnComplete, MemChanges, DbChanges};
- auto fakeResponse = part->Propose(TString(), context);
- Y_UNUSED(fakeResponse);
- OnComplete.ApplyOnExecute(Self, txc, ctx);
- DbChanges.Apply(Self, txc, ctx);
- return true;
- }
- void Complete(const TActorContext& ctx) override {
- LOG_DEBUG_S(ctx, NKikimrServices::FLAT_TX_SCHEMESHARD,
- "TTxOperationProposeCancelTx Complete"
- << ", at schemeshard: " << Self->TabletID());
- OnComplete.ApplyOnComplete(Self, ctx);
- }
- };
- NKikimrScheme::TEvModifySchemeTransaction GetRecordForPrint(const NKikimrScheme::TEvModifySchemeTransaction& record) {
- auto recordForPrint = record;
- if (record.HasUserToken()) {
- recordForPrint.SetUserToken("***hide token***");
- }
- return recordForPrint;
- }
- THolder<TProposeResponse> TSchemeShard::IgniteOperation(TProposeRequest& request, TOperationContext& context) {
- THolder<TProposeResponse> response = nullptr;
- auto selfId = SelfTabletId();
- auto& record = request.Record;
- auto txId = TTxId(record.GetTxId());
- if (Operations.contains(txId)) {
- response.Reset(new TProposeResponse(NKikimrScheme::StatusAccepted, ui64(txId), ui64(selfId)));
- response->SetError(NKikimrScheme::StatusAccepted, "There is operation with the same txId has been found in flight."
- " Actually that shouldn't have happened."
- " Note that tx body equality isn't granted."
- " StatusAccepted is just returned on retries.");
- return std::move(response);
- }
- TOperation::TPtr operation = new TOperation(txId);
- Operations[txId] = operation; //record is erased at ApplyOnExecute if all parts are done at propose
- for (const auto& transaction : record.GetTransaction()) {
- auto quotaResult = operation->ConsumeQuota(transaction, context);
- if (quotaResult.Status != NKikimrScheme::StatusSuccess) {
- response.Reset(new TProposeResponse(quotaResult.Status, ui64(txId), ui64(selfId)));
- response->SetError(quotaResult.Status, quotaResult.Reason);
- Operations.erase(txId);
- return std::move(response);
- }
- }
- if (record.HasFailOnExist()) {
- // inherit FailOnExist from TEvModifySchemeTransaction into TModifyScheme
- for (auto& transaction : *record.MutableTransaction()) {
- if (!transaction.HasFailOnExist()) {
- transaction.SetFailOnExist(record.GetFailOnExist());
- }
- }
- }
- TVector<TTxTransaction> transactions;
- for (const auto& transaction : record.GetTransaction()) {
- auto splitResult = operation->SplitIntoTransactions(transaction, context);
- if (splitResult.Status != NKikimrScheme::StatusSuccess) {
- response.Reset(new TProposeResponse(splitResult.Status, ui64(txId), ui64(selfId)));
- response->SetError(splitResult.Status, splitResult.Reason);
- Operations.erase(txId);
- return std::move(response);
- }
- std::move(splitResult.Transactions.begin(), splitResult.Transactions.end(), std::back_inserter(transactions));
- }
- const TString owner = record.HasOwner() ? record.GetOwner() : BUILTIN_ACL_ROOT;
- for (const auto& transaction : transactions) {
- auto parts = operation->ConstructParts(transaction, context);
- if (parts.size() > 1) {
- // les't allow altering impl index tables as part of consistent operation
- context.IsAllowedPrivateTables = true;
- }
- for (auto& part : parts) {
- TString errStr;
- if (!context.SS->CheckInFlightLimit(part->GetTransaction().GetOperationType(), errStr)) {
- response.Reset(new TProposeResponse(NKikimrScheme::StatusResourceExhausted, ui64(txId), ui64(selfId)));
- response->SetError(NKikimrScheme::StatusResourceExhausted, errStr);
- } else {
- response = part->Propose(owner, context);
- }
- Y_VERIFY(response);
- LOG_NOTICE_S(context.Ctx, NKikimrServices::FLAT_TX_SCHEMESHARD,
- "IgniteOperation"
- << ", opId: " << operation->NextPartId()
- << ", propose status:" << NKikimrScheme::EStatus_Name(response->Record.GetStatus())
- << ", reason: " << response->Record.GetReason()
- << ", at schemeshard: " << selfId);
- if (response->IsDone()) {
- operation->AddPart(part); //at ApplyOnExecute parts is erased
- context.OnComplete.DoneOperation(part->GetOperationId()); //mark it here by self for sure
- } else if (response->IsConditionalAccepted()) {
- //happens on retries, we answer like AlreadyExist or StatusSuccess with error message and do nothing in operation
- operation->AddPart(part); //at ApplyOnExecute parts is erased
- context.OnComplete.DoneOperation(part->GetOperationId()); //mark it here by self for sure
- } else if (response->IsAccepted()) {
- operation->AddPart(part);
- //context.OnComplete.ActivateTx(partOpId) ///TODO maybe it is good idea
- } else {
- if (!operation->Parts.empty()) {
- LOG_NOTICE_S(context.Ctx, NKikimrServices::FLAT_TX_SCHEMESHARD,
- "Abort operation: IgniteOperation fail to propose a part"
- << ", opId: " << part->GetOperationId()
- << ", at schemeshard: " << selfId
- << ", already accepted parts: " << operation->Parts.size()
- << ", propose result status: " << NKikimrScheme::EStatus_Name(response->Record.GetStatus())
- << ", with reason: " << response->Record.GetReason()
- << ", tx message: " << GetRecordForPrint(record).ShortDebugString());
- }
- Y_VERIFY_S(context.IsUndoChangesSafe(),
- "Operation is aborted and all changes should be reverted"
- << ", but context.IsUndoChangesSafe is false, which means some direct writes have been done"
- << ", opId: " << part->GetOperationId()
- << ", at schemeshard: " << selfId
- << ", already accepted parts: " << operation->Parts.size()
- << ", propose result status: " << NKikimrScheme::EStatus_Name(response->Record.GetStatus())
- << ", with reason: " << response->Record.GetReason()
- << ", tx message: " << GetRecordForPrint(record).ShortDebugString());
- context.OnComplete = {}; // recreate
- context.DbChanges = {};
- for (auto& toAbort : operation->Parts) {
- toAbort->AbortPropose(context);
- }
- context.MemChanges.UnDo(context.SS);
- context.OnComplete.ApplyOnExecute(context.SS, context.GetTxc(), context.Ctx);
- Operations.erase(txId);
- return std::move(response);
- }
- }
- }
- return std::move(response);
- }
- struct TSchemeShard::TTxOperationPropose: public NTabletFlatExecutor::TTransactionBase<TSchemeShard> {
- using TBase = NTabletFlatExecutor::TTransactionBase<TSchemeShard>;
- TProposeRequest::TPtr Request;
- THolder<TProposeResponse> Response = nullptr;
- TString UserSID;
- TSideEffects OnComplete;
- TTxOperationPropose(TSchemeShard* self, TProposeRequest::TPtr request)
- : TBase(self)
- , Request(request)
- {}
- TTxType GetTxType() const override { return TXTYPE_PROPOSE; }
- bool Execute(NTabletFlatExecutor::TTransactionContext& txc, const TActorContext& ctx) override {
- TTabletId selfId = Self->SelfTabletId();
- LOG_DEBUG_S(ctx, NKikimrServices::FLAT_TX_SCHEMESHARD,
- "TTxOperationPropose Execute"
- << ", message: " << GetRecordForPrint(Request->Get()->Record).ShortDebugString()
- << ", at schemeshard: " << selfId);
- txc.DB.NoMoreReadsForTx();
- auto [userToken, tokenParseError] = ParseUserToken(Request->Get()->Record.GetUserToken());
- if (tokenParseError) {
- auto txId = Request->Get()->Record.GetTxId();
- Response = MakeHolder<TProposeResponse>(NKikimrScheme::StatusInvalidParameter, ui64(txId), ui64(selfId), "Failed to parse user token");
- return true;
- }
- if (userToken) {
- UserSID = userToken->GetUserSID();
- }
- TMemoryChanges memChanges;
- TStorageChanges dbChanges;
- TOperationContext context{Self, txc, ctx, OnComplete, memChanges, dbChanges, std::move(userToken)};
- Response = Self->IgniteOperation(*Request->Get(), context);
- OnComplete.ApplyOnExecute(Self, txc, ctx);
- dbChanges.Apply(Self, txc, ctx);
- return true;
- }
- void Complete(const TActorContext& ctx) override {
- Y_VERIFY(Response);
- const auto& record = Request->Get()->Record;
- const auto txId = TTxId(record.GetTxId());
- LOG_DEBUG_S(ctx, NKikimrServices::FLAT_TX_SCHEMESHARD,
- "TTxOperationPropose Complete"
- << ", txId: " << txId
- << ", response: " << Response->Record.ShortDebugString()
- << ", at schemeshard: " << Self->TabletID());
- AuditLogModifySchemeTransaction(record, Response->Record, Self, UserSID);
- const TActorId sender = Request->Sender;
- const ui64 cookie = Request->Cookie;
- ctx.Send(sender, Response.Release(), 0, cookie);
- OnComplete.ApplyOnComplete(Self, ctx);
- }
- };
- struct TSchemeShard::TTxOperationProgress: public NTabletFlatExecutor::TTransactionBase<TSchemeShard> {
- TOperationId OpId;
- TSideEffects OnComplete;
- TMemoryChanges MemChanges;
- TStorageChanges DbChanges;
- TTxOperationProgress(TSchemeShard* self, TOperationId id)
- : TBase(self)
- , OpId(id)
- {}
- TTxType GetTxType() const override { return TXTYPE_PROGRESS_OP; }
- bool Execute(NTabletFlatExecutor::TTransactionContext& txc, const TActorContext& ctx) override {
- LOG_DEBUG_S(ctx, NKikimrServices::FLAT_TX_SCHEMESHARD,
- "TTxOperationProgress Execute"
- << ", operationId: " << OpId
- << ", at schemeshard: " << Self->TabletID());
- if (!Self->Operations.contains(OpId.GetTxId())) {
- LOG_WARN_S(ctx, NKikimrServices::FLAT_TX_SCHEMESHARD,
- "TTxOperationProgress Execute"
- << " for unknown txId " << OpId.GetTxId());
- return true;
- }
- TOperation::TPtr operation = Self->Operations.at(OpId.GetTxId());
- if (operation->DoneParts.contains(OpId.GetSubTxId())) {
- LOG_INFO_S(ctx, NKikimrServices::FLAT_TX_SCHEMESHARD,
- "TTxOperationProgress Execute"
- << " operation already done"
- << ", operationId: " << OpId
- << ", at schemeshard: " << Self->TabletID());
- return true;
- }
- ISubOperation::TPtr part = operation->Parts.at(ui64(OpId.GetSubTxId()));
- TOperationContext context{Self, txc, ctx, OnComplete, MemChanges, DbChanges};
- part->ProgressState(context);
- OnComplete.ApplyOnExecute(Self, txc, ctx);
- DbChanges.Apply(Self, txc, ctx);
- return true;
- }
- void Complete(const TActorContext& ctx) override {
- OnComplete.ApplyOnComplete(Self, ctx);
- }
- };
- template <class TEvType>
- struct TSchemeShard::TTxOperationReply {};
- #define DefineTTxOperationReply(TEvType, TxType) \
- template<> \
- struct TSchemeShard::TTxOperationReply<TEvType>: public NTabletFlatExecutor::TTransactionBase<TSchemeShard> { \
- TOperationId OperationId; \
- TEvType::TPtr EvReply; \
- TSideEffects OnComplete; \
- TMemoryChanges MemChanges; \
- TStorageChanges DbChanges; \
- \
- TTxType GetTxType() const override { return TxType; } \
- \
- TTxOperationReply(TSchemeShard* self, TOperationId id, TEvType::TPtr& ev) \
- : TBase(self) \
- , OperationId(id) \
- , EvReply(ev) \
- { \
- Y_VERIFY(TEvType::EventType != TEvPrivate::TEvOperationPlan::EventType); \
- Y_VERIFY(TEvType::EventType != TEvTxProcessing::TEvPlanStep::EventType); \
- } \
- \
- bool Execute(NTabletFlatExecutor::TTransactionContext& txc, const TActorContext& ctx) override { \
- LOG_DEBUG_S(ctx, NKikimrServices::FLAT_TX_SCHEMESHARD, \
- "TTxOperationReply<" #TEvType "> execute " \
- << ", operationId: " << OperationId \
- << ", at schemeshard: " << Self->TabletID() \
- << ", message: " << ISubOperationState::DebugReply(EvReply)); \
- if (!Self->Operations.contains(OperationId.GetTxId())) { \
- return true; \
- } \
- TOperation::TPtr operation = Self->Operations.at(OperationId.GetTxId()); \
- if (operation->DoneParts.contains(OperationId.GetSubTxId())) { \
- LOG_DEBUG_S(ctx, NKikimrServices::FLAT_TX_SCHEMESHARD, \
- "TTxOperationReply<" #TEvType "> execute " \
- << ", operation already done" \
- << ", operationId: " << OperationId \
- << ", at schemeshard: " << Self->TabletID()); \
- return true; \
- } \
- ISubOperation::TPtr part = operation->Parts.at(ui64(OperationId.GetSubTxId())); \
- TOperationContext context{Self, txc, ctx, OnComplete, MemChanges, DbChanges}; \
- Y_VERIFY(EvReply); \
- part->HandleReply(EvReply, context); \
- OnComplete.ApplyOnExecute(Self, txc, ctx); \
- DbChanges.Apply(Self, txc, ctx); \
- return true; \
- } \
- void Complete(const TActorContext& ctx) override { \
- OnComplete.ApplyOnComplete(Self, ctx); \
- } \
- };
- SCHEMESHARD_INCOMING_EVENTS(DefineTTxOperationReply)
- #undef DefineTxOperationReply
- struct TSchemeShard::TTxOperationPlanStep: public NTabletFlatExecutor::TTransactionBase<TSchemeShard> {
- TEvTxProcessing::TEvPlanStep::TPtr Ev;
- TSideEffects OnComplete;
- TMemoryChanges MemChanges;
- TStorageChanges DbChanges;
- TTxOperationPlanStep(TSchemeShard* self, TEvTxProcessing::TEvPlanStep::TPtr ev)
- : TBase(self)
- , Ev(ev)
- {}
- TTxType GetTxType() const override { return TXTYPE_PLAN_STEP; }
- bool Execute(NTabletFlatExecutor::TTransactionContext& txc, const TActorContext& ctx) override {
- const NKikimrTx::TEvMediatorPlanStep& record = Ev->Get()->Record;
- const auto step = TStepId(record.GetStep());
- const size_t txCount = record.TransactionsSize();
- LOG_NOTICE_S(ctx, NKikimrServices::FLAT_TX_SCHEMESHARD,
- "TTxOperationPlanStep Execute"
- << ", stepId: " << step
- << ", transactions count in step: " << txCount
- << ", at schemeshard: " << Self->TabletID());
- LOG_DEBUG_S(ctx, NKikimrServices::FLAT_TX_SCHEMESHARD,
- "TTxOperationPlanStep Execute"
- << ", message: " << record.ShortDebugString()
- << ", at schemeshard: " << Self->TabletID());
- for (size_t i = 0; i < txCount; ++i) {
- const auto txId = TTxId(record.GetTransactions(i).GetTxId());
- const auto coordinator = ActorIdFromProto(record.GetTransactions(i).GetAckTo());
- const auto coordinatorId = TTabletId(record.GetTransactions(i).GetCoordinator());
- if (!Self->Operations.contains(txId)) {
- LOG_INFO_S(ctx, NKikimrServices::FLAT_TX_SCHEMESHARD,
- "TTxOperationPlanStep Execute"
- << " unknown operation, assumed as already done"
- << ", transaction Id: " << txId);
- OnComplete.CoordinatorAck(coordinator, step, txId);
- continue;
- }
- TOperation::TPtr operation = Self->Operations.at(txId);
- for (ui32 partIdx = 0; partIdx < operation->Parts.size(); ++partIdx) {
- auto opId = TOperationId(txId, partIdx);
- if (operation->DoneParts.contains(TSubTxId(partIdx))) {
- LOG_DEBUG_S(ctx, NKikimrServices::FLAT_TX_SCHEMESHARD,
- "TTxOperationPlanStep Execute"
- << " operation part is already done"
- << ", operationId: " << opId);
- continue;
- }
- TOperationContext context{Self, txc, ctx, OnComplete, MemChanges, DbChanges};
- THolder<TEvPrivate::TEvOperationPlan> msg = MakeHolder<TEvPrivate::TEvOperationPlan>(ui64(step), ui64(txId));
- TEvPrivate::TEvOperationPlan::TPtr personalEv = (TEventHandle<TEvPrivate::TEvOperationPlan>*) new IEventHandle(
- context.SS->SelfId(), context.SS->SelfId(), msg.Release());
- operation->Parts.at(partIdx)->HandleReply(personalEv, context);
- }
- OnComplete.CoordinatorAck(coordinator, step, txId);
- OnComplete.UnbindMsgFromPipe(TOperationId(txId, InvalidSubTxId), coordinatorId, TPipeMessageId(0, txId));
- }
- const TActorId mediator = Ev->Sender;
- OnComplete.MediatorAck(mediator, step);
- OnComplete.ApplyOnExecute(Self, txc, ctx);
- DbChanges.Apply(Self, txc, ctx);
- return true;
- }
- void Complete(const TActorContext& ctx) override {
- OnComplete.ApplyOnComplete(Self, ctx);
- }
- };
- NTabletFlatExecutor::ITransaction* TSchemeShard::CreateTxOperationPropose(TEvSchemeShard::TEvCancelTx::TPtr& ev) {
- return new TTxOperationProposeCancelTx(this, ev);
- }
- NTabletFlatExecutor::ITransaction* TSchemeShard::CreateTxOperationPropose(TEvSchemeShard::TEvModifySchemeTransaction::TPtr& ev) {
- return new TTxOperationPropose(this, ev);
- }
- NTabletFlatExecutor::ITransaction* TSchemeShard::CreateTxOperationPlanStep(TEvTxProcessing::TEvPlanStep::TPtr& ev) {
- return new TTxOperationPlanStep(this, ev);
- }
- NTabletFlatExecutor::ITransaction* TSchemeShard::CreateTxOperationProgress(TOperationId opId) {
- return new TTxOperationProgress(this, opId);
- }
- #define DefineCreateTxOperationReply(TEvType, TxType) \
- NTabletFlatExecutor::ITransaction* TSchemeShard::CreateTxOperationReply(TOperationId id, TEvType::TPtr& ev) { \
- return new TTxOperationReply<TEvType>(this, id, ev); \
- }
- SCHEMESHARD_INCOMING_EVENTS(DefineCreateTxOperationReply)
- #undef DefineTxOperationReply
- TString JoinPath(const TString& workingDir, const TString& name) {
- Y_VERIFY(!name.StartsWith('/') && !name.EndsWith('/'));
- return TStringBuilder()
- << workingDir
- << (workingDir.EndsWith('/') ? "" : "/")
- << name;
- }
- TOperation::TConsumeQuotaResult TOperation::ConsumeQuota(const TTxTransaction& tx, TOperationContext& context) {
- TConsumeQuotaResult result;
- // Internal operations never consume quota
- if (tx.GetInternal()) {
- return result;
- }
- // These operations never consume quota
- switch (tx.GetOperationType()) {
- case NKikimrSchemeOp::EOperationType::ESchemeOpCreateSubDomain:
- case NKikimrSchemeOp::EOperationType::ESchemeOpDropSubDomain:
- case NKikimrSchemeOp::EOperationType::ESchemeOpForceDropSubDomain:
- case NKikimrSchemeOp::EOperationType::ESchemeOpAlterSubDomain:
- case NKikimrSchemeOp::EOperationType::ESchemeOpForceDropUnsafe:
- case NKikimrSchemeOp::EOperationType::ESchemeOpCreateExtSubDomain:
- case NKikimrSchemeOp::EOperationType::ESchemeOpAlterExtSubDomain:
- case NKikimrSchemeOp::EOperationType::ESchemeOpForceDropExtSubDomain:
- case NKikimrSchemeOp::EOperationType::ESchemeOpUpgradeSubDomain:
- case NKikimrSchemeOp::EOperationType::ESchemeOpUpgradeSubDomainDecision:
- return result;
- default:
- break;
- }
- const TString workingDir = tx.GetWorkingDir();
- TPath path = TPath::Resolve(workingDir, context.SS);
- // Find the first directory that actually exists
- path.RiseUntilExisted();
- // Don't fail on some completely invalid path
- if (!path.IsResolved()) {
- return result;
- }
- auto domainPathId = path.GetPathIdForDomain();
- auto domainInfo = path.DomainInfo();
- if (!domainInfo->TryConsumeSchemeQuota(context.Ctx.Now())) {
- result.Status = NKikimrScheme::StatusQuotaExceeded;
- result.Reason = "Request exceeded a limit on the number of schema operations, try again later.";
- }
- // Even if operation fails later we want to persist updated/consumed quotas
- NIceDb::TNiceDb db(context.GetTxc().DB); // write quotas directly in db even if operation fails
- context.SS->PersistSubDomainSchemeQuotas(db, domainPathId, *domainInfo);
- return result;
- }
- TOperation::TSplitTransactionsResult TOperation::SplitIntoTransactions(const TTxTransaction& tx, const TOperationContext& context) {
- TSplitTransactionsResult result;
- const TPath parentPath = TPath::Resolve(tx.GetWorkingDir(), context.SS);
- {
- TPath::TChecker checks = parentPath.Check();
- checks
- .NotUnderDomainUpgrade()
- .IsAtLocalSchemeShard()
- .IsResolved()
- .NotDeleted()
- .NotUnderDeleting()
- .IsCommonSensePath()
- .IsLikeDirectory();
- if (!checks) {
- result.Transactions.push_back(tx);
- return result;
- }
- }
- TString targetName;
- switch (tx.GetOperationType()) {
- case NKikimrSchemeOp::EOperationType::ESchemeOpMkDir:
- targetName = tx.GetMkDir().GetName();
- break;
- case NKikimrSchemeOp::EOperationType::ESchemeOpCreateTable:
- if (tx.GetCreateTable().HasCopyFromTable()) {
- result.Transactions.push_back(tx);
- return result;
- }
- targetName = tx.GetCreateTable().GetName();
- break;
- case NKikimrSchemeOp::EOperationType::ESchemeOpCreatePersQueueGroup:
- targetName = tx.GetCreatePersQueueGroup().GetName();
- break;
- case NKikimrSchemeOp::EOperationType::ESchemeOpAllocatePersQueueGroup:
- targetName = tx.GetAllocatePersQueueGroup().GetName();
- break;
- case NKikimrSchemeOp::EOperationType::ESchemeOpDeallocatePersQueueGroup:
- targetName = tx.GetDeallocatePersQueueGroup().GetName();
- break;
- case NKikimrSchemeOp::EOperationType::ESchemeOpCreateSubDomain:
- case NKikimrSchemeOp::EOperationType::ESchemeOpCreateExtSubDomain:
- targetName = tx.GetSubDomain().GetName();
- break;
- case NKikimrSchemeOp::EOperationType::ESchemeOpCreateRtmrVolume:
- targetName = tx.GetCreateRtmrVolume().GetName();
- break;
- case NKikimrSchemeOp::EOperationType::ESchemeOpCreateBlockStoreVolume:
- targetName = tx.GetCreateBlockStoreVolume().GetName();
- break;
- case NKikimrSchemeOp::EOperationType::ESchemeOpCreateFileStore:
- targetName = tx.GetCreateFileStore().GetName();
- break;
- case NKikimrSchemeOp::EOperationType::ESchemeOpCreateKesus:
- targetName = tx.GetKesus().GetName();
- break;
- case NKikimrSchemeOp::EOperationType::ESchemeOpCreateSolomonVolume:
- targetName = tx.GetCreateSolomonVolume().GetName();
- break;
- case NKikimrSchemeOp::EOperationType::ESchemeOpCreateIndexedTable:
- targetName = tx.GetCreateIndexedTable().GetTableDescription().GetName();
- break;
- case NKikimrSchemeOp::EOperationType::ESchemeOpCreateColumnStore:
- targetName = tx.GetCreateColumnStore().GetName();
- break;
- case NKikimrSchemeOp::EOperationType::ESchemeOpCreateColumnTable:
- targetName = tx.GetCreateColumnTable().GetName();
- break;
- case NKikimrSchemeOp::EOperationType::ESchemeOpCreateExternalTable:
- targetName = tx.GetCreateExternalTable().GetName();
- break;
- default:
- result.Transactions.push_back(tx);
- return result;
- }
- if (!targetName || targetName.StartsWith('/') || targetName.EndsWith('/')) {
- result.Transactions.push_back(tx);
- return result;
- }
- TPath path = TPath::Resolve(JoinPath(tx.GetWorkingDir(), targetName), context.SS);
- {
- TPath::TChecker checks = path.Check();
- checks.IsAtLocalSchemeShard();
- bool exists = false;
- if (path.IsResolved()) {
- checks.IsResolved();
- exists = !path.IsDeleted();
- } else {
- checks
- .NotEmpty()
- .NotResolved();
- }
- if (checks && !exists) {
- checks
- .IsValidLeafName()
- .DepthLimit()
- .PathsLimit();
- }
- if (checks && !exists && path.Parent().IsResolved()) {
- checks.DirChildrenLimit();
- }
- if (!checks) {
- result.Status = checks.GetStatus();
- result.Reason = checks.GetError();
- result.Transactions.push_back(tx);
- return result;
- }
- const TString name = path.LeafName();
- path.Rise();
- TTxTransaction create(tx);
- create.SetWorkingDir(path.PathString());
- create.SetFailOnExist(tx.GetFailOnExist());
- switch (tx.GetOperationType()) {
- case NKikimrSchemeOp::EOperationType::ESchemeOpMkDir:
- create.MutableMkDir()->SetName(name);
- break;
- case NKikimrSchemeOp::EOperationType::ESchemeOpCreateTable:
- create.MutableCreateTable()->SetName(name);
- break;
- case NKikimrSchemeOp::EOperationType::ESchemeOpCreatePersQueueGroup:
- create.MutableCreatePersQueueGroup()->SetName(name);
- break;
- case NKikimrSchemeOp::EOperationType::ESchemeOpAllocatePersQueueGroup:
- create.MutableAllocatePersQueueGroup()->SetName(name);
- break;
- case NKikimrSchemeOp::EOperationType::ESchemeOpDeallocatePersQueueGroup:
- create.MutableDeallocatePersQueueGroup()->SetName(name);
- break;
- case NKikimrSchemeOp::EOperationType::ESchemeOpCreateSubDomain:
- case NKikimrSchemeOp::EOperationType::ESchemeOpCreateExtSubDomain:
- create.MutableSubDomain()->SetName(name);
- break;
- case NKikimrSchemeOp::EOperationType::ESchemeOpCreateRtmrVolume:
- create.MutableCreateRtmrVolume()->SetName(name);
- break;
- case NKikimrSchemeOp::EOperationType::ESchemeOpCreateBlockStoreVolume:
- create.MutableCreateBlockStoreVolume()->SetName(name);
- break;
- case NKikimrSchemeOp::EOperationType::ESchemeOpCreateFileStore:
- create.MutableCreateFileStore()->SetName(name);
- break;
- case NKikimrSchemeOp::EOperationType::ESchemeOpCreateKesus:
- create.MutableKesus()->SetName(name);
- break;
- case NKikimrSchemeOp::EOperationType::ESchemeOpCreateSolomonVolume:
- create.MutableCreateSolomonVolume()->SetName(name);
- break;
- case NKikimrSchemeOp::EOperationType::ESchemeOpCreateIndexedTable:
- create.MutableCreateIndexedTable()->MutableTableDescription()->SetName(name);
- break;
- case NKikimrSchemeOp::EOperationType::ESchemeOpCreateColumnStore:
- create.MutableCreateColumnStore()->SetName(name);
- break;
- case NKikimrSchemeOp::EOperationType::ESchemeOpCreateColumnTable:
- create.MutableCreateColumnTable()->SetName(name);
- break;
- case NKikimrSchemeOp::EOperationType::ESchemeOpCreateExternalTable:
- create.MutableCreateExternalTable()->SetName(name);
- break;
- default:
- Y_UNREACHABLE();
- }
- result.Transactions.push_back(create);
- if (exists) {
- return result;
- }
- }
- while (path != parentPath) {
- TPath::TChecker checks = path.Check();
- checks
- .NotUnderDomainUpgrade()
- .IsAtLocalSchemeShard();
- if (path.IsResolved()) {
- checks.IsResolved();
- if (path.IsDeleted()) {
- checks.IsDeleted();
- } else {
- checks
- .NotDeleted()
- .NotUnderDeleting()
- .IsCommonSensePath()
- .IsLikeDirectory();
- if (checks) {
- break;
- }
- }
- } else {
- checks
- .NotEmpty()
- .NotResolved();
- }
- if (checks) {
- checks
- .IsValidLeafName()
- .DepthLimit()
- .PathsLimit(result.Transactions.size() + 1);
- }
- if (checks && path.Parent().IsResolved()) {
- checks.DirChildrenLimit();
- }
- if (!checks) {
- result.Status = checks.GetStatus();
- result.Reason = checks.GetError();
- result.Transactions.clear();
- result.Transactions.push_back(tx);
- return result;
- }
- const TString name = path.LeafName();
- path.Rise();
- TTxTransaction mkdir;
- mkdir.SetFailOnExist(true);
- mkdir.SetOperationType(NKikimrSchemeOp::EOperationType::ESchemeOpMkDir);
- mkdir.SetWorkingDir(path.PathString());
- mkdir.MutableMkDir()->SetName(name);
- result.Transactions.push_back(mkdir);
- }
- Reverse(result.Transactions.begin(), result.Transactions.end());
- return result;
- }
- ISubOperation::TPtr TOperation::RestorePart(TTxState::ETxType txType, TTxState::ETxState txState) const {
- switch (txType) {
- case TTxState::ETxType::TxMkDir:
- return CreateMkDir(NextPartId(), txState);
- case TTxState::ETxType::TxRmDir:
- return CreateRmDir(NextPartId(), txState);
- case TTxState::ETxType::TxModifyACL:
- return CreateModifyACL(NextPartId(), txState);
- case TTxState::ETxType::TxAlterUserAttributes:
- return CreateAlterUserAttrs(NextPartId(), txState);
- case TTxState::ETxType::TxCreateTable:
- return CreateNewTable(NextPartId(), txState);
- case TTxState::ETxType::TxCopyTable:
- return CreateCopyTable(NextPartId(), txState);
- case TTxState::ETxType::TxAlterTable:
- return CreateAlterTable(NextPartId(), txState);
- case TTxState::ETxType::TxSplitTablePartition:
- case TTxState::ETxType::TxMergeTablePartition:
- return CreateSplitMerge(NextPartId(), txState);
- case TTxState::ETxType::TxBackup:
- return CreateBackup(NextPartId(), txState);
- case TTxState::ETxType::TxRestore:
- return CreateRestore(NextPartId(), txState);
- case TTxState::ETxType::TxDropTable:
- return CreateDropTable(NextPartId(), txState);
- case TTxState::ETxType::TxCreateTableIndex:
- return CreateNewTableIndex(NextPartId(), txState);
- case TTxState::ETxType::TxDropTableIndex:
- return CreateDropTableIndex(NextPartId(), txState);
- case TTxState::ETxType::TxCreateRtmrVolume:
- return CreateNewRTMR(NextPartId(), txState);
- case TTxState::ETxType::TxCreateOlapStore:
- return CreateNewOlapStore(NextPartId(), txState);
- case TTxState::ETxType::TxAlterOlapStore:
- return CreateAlterOlapStore(NextPartId(), txState);
- case TTxState::ETxType::TxDropOlapStore:
- return CreateDropOlapStore(NextPartId(), txState);
- case TTxState::ETxType::TxCreateColumnTable:
- return CreateNewColumnTable(NextPartId(), txState);
- case TTxState::ETxType::TxAlterColumnTable:
- return CreateAlterColumnTable(NextPartId(), txState);
- case TTxState::ETxType::TxDropColumnTable:
- return CreateDropColumnTable(NextPartId(), txState);
- case TTxState::ETxType::TxCreatePQGroup:
- return CreateNewPQ(NextPartId(), txState);
- case TTxState::ETxType::TxAlterPQGroup:
- return CreateAlterPQ(NextPartId(), txState);
- case TTxState::ETxType::TxDropPQGroup:
- return CreateDropPQ(NextPartId(), txState);
- case TTxState::ETxType::TxAllocatePQ:
- return CreateAllocatePQ(NextPartId(), txState);
- case TTxState::ETxType::TxCreateSolomonVolume:
- return CreateNewSolomon(NextPartId(), txState);
- case TTxState::ETxType::TxDropSolomonVolume:
- return CreateDropSolomon(NextPartId(), txState);
- case TTxState::ETxType::TxCreateSubDomain:
- return CreateSubDomain(NextPartId(), txState);
- case TTxState::ETxType::TxAlterSubDomain:
- return CreateAlterSubDomain(NextPartId(), txState);
- case TTxState::ETxType::TxUpgradeSubDomain:
- return CreateUpgradeSubDomain(NextPartId(), txState);
- case TTxState::ETxType::TxUpgradeSubDomainDecision:
- return CreateUpgradeSubDomainDecision(NextPartId(), txState);
- case TTxState::ETxType::TxDropSubDomain:
- return CreateDropSubdomain(NextPartId(), txState);
- case TTxState::ETxType::TxForceDropSubDomain:
- return CreateForceDropSubDomain(NextPartId(), txState);
- case TTxState::ETxType::TxCreateKesus:
- return CreateNewKesus(NextPartId(), txState);
- case TTxState::ETxType::TxAlterKesus:
- return CreateAlterKesus(NextPartId(), txState);
- case TTxState::ETxType::TxDropKesus:
- return CreateDropKesus(NextPartId(), txState);
- case TTxState::ETxType::TxInitializeBuildIndex:
- return CreateInitializeBuildIndexMainTable(NextPartId(), txState);
- case TTxState::ETxType::TxFinalizeBuildIndex:
- return CreateFinalizeBuildIndexMainTable(NextPartId(), txState);
- case TTxState::ETxType::TxDropTableIndexAtMainTable:
- return CreateDropTableIndexAtMainTable(NextPartId(), txState);
- case TTxState::ETxType::TxUpdateMainTableOnIndexMove:
- return CreateUpdateMainTableOnIndexMove(NextPartId(), txState);
- case TTxState::ETxType::TxCreateLock:
- return CreateLock(NextPartId(), txState);
- case TTxState::ETxType::TxDropLock:
- return DropLock(NextPartId(), txState);
- case TTxState::ETxType::TxAlterTableIndex:
- return CreateAlterTableIndex(NextPartId(), txState);
- case TTxState::ETxType::TxAlterSolomonVolume:
- return CreateAlterSolomon(NextPartId(), txState);
- // ExtSubDomain
- case TTxState::ETxType::TxCreateExtSubDomain:
- return CreateExtSubDomain(NextPartId(), txState);
- case TTxState::ETxType::TxAlterExtSubDomain:
- return CreateAlterExtSubDomain(NextPartId(), txState);
- case TTxState::ETxType::TxAlterExtSubDomainCreateHive:
- return CreateAlterExtSubDomainCreateHive(NextPartId(), txState);
- case TTxState::ETxType::TxForceDropExtSubDomain:
- return CreateForceDropExtSubDomain(NextPartId(), txState);
- // BlockStore
- case TTxState::ETxType::TxCreateBlockStoreVolume:
- return CreateNewBSV(NextPartId(), txState);
- case TTxState::ETxType::TxAssignBlockStoreVolume:
- return CreateAssignBSV(NextPartId(), txState);
- case TTxState::ETxType::TxAlterBlockStoreVolume:
- return CreateAlterBSV(NextPartId(), txState);
- case TTxState::ETxType::TxDropBlockStoreVolume:
- return CreateDropBSV(NextPartId(), txState);
- // FileStore
- case TTxState::ETxType::TxCreateFileStore:
- return CreateNewFileStore(NextPartId(), txState);
- case TTxState::ETxType::TxAlterFileStore:
- return CreateAlterFileStore(NextPartId(), txState);
- case TTxState::ETxType::TxDropFileStore:
- return CreateDropFileStore(NextPartId(), txState);
- // CDC
- case TTxState::ETxType::TxCreateCdcStream:
- return CreateNewCdcStreamImpl(NextPartId(), txState);
- case TTxState::ETxType::TxCreateCdcStreamAtTable:
- return CreateNewCdcStreamAtTable(NextPartId(), txState, false);
- case TTxState::ETxType::TxCreateCdcStreamAtTableWithInitialScan:
- return CreateNewCdcStreamAtTable(NextPartId(), txState, true);
- case TTxState::ETxType::TxAlterCdcStream:
- return CreateAlterCdcStreamImpl(NextPartId(), txState);
- case TTxState::ETxType::TxAlterCdcStreamAtTable:
- return CreateAlterCdcStreamAtTable(NextPartId(), txState, false);
- case TTxState::ETxType::TxAlterCdcStreamAtTableDropSnapshot:
- return CreateAlterCdcStreamAtTable(NextPartId(), txState, true);
- case TTxState::ETxType::TxDropCdcStream:
- return CreateDropCdcStreamImpl(NextPartId(), txState);
- case TTxState::ETxType::TxDropCdcStreamAtTable:
- return CreateDropCdcStreamAtTable(NextPartId(), txState, false);
- case TTxState::ETxType::TxDropCdcStreamAtTableDropSnapshot:
- return CreateDropCdcStreamAtTable(NextPartId(), txState, true);
- // Sequences
- case TTxState::ETxType::TxCreateSequence:
- return CreateNewSequence(NextPartId(), txState);
- case TTxState::ETxType::TxAlterSequence:
- Y_FAIL("TODO: implement");
- case TTxState::ETxType::TxDropSequence:
- return CreateDropSequence(NextPartId(), txState);
- case TTxState::ETxType::TxFillIndex:
- Y_FAIL("deprecated");
- case TTxState::ETxType::TxMoveTable:
- return CreateMoveTable(NextPartId(), txState);
- case TTxState::ETxType::TxMoveTableIndex:
- return CreateMoveTableIndex(NextPartId(), txState);
- // Replication
- case TTxState::ETxType::TxCreateReplication:
- return CreateNewReplication(NextPartId(), txState);
- case TTxState::ETxType::TxAlterReplication:
- Y_FAIL("TODO: implement");
- case TTxState::ETxType::TxDropReplication:
- return CreateDropReplication(NextPartId(), txState);
- // BlobDepot
- case TTxState::ETxType::TxCreateBlobDepot:
- return CreateNewBlobDepot(NextPartId(), txState);
- case TTxState::ETxType::TxAlterBlobDepot:
- return CreateAlterBlobDepot(NextPartId(), txState);
- case TTxState::ETxType::TxDropBlobDepot:
- return CreateDropBlobDepot(NextPartId(), txState);
- case TTxState::ETxType::TxCreateExternalTable:
- return CreateNewExternalTable(NextPartId(), txState);
- case TTxState::ETxType::TxDropExternalTable:
- return CreateDropExternalTable(NextPartId(), txState);
- case TTxState::ETxType::TxAlterExternalTable:
- Y_FAIL("TODO: implement");
- case TTxState::ETxType::TxInvalid:
- Y_UNREACHABLE();
- }
- Y_UNREACHABLE();
- }
- ISubOperation::TPtr TOperation::ConstructPart(NKikimrSchemeOp::EOperationType opType, const TTxTransaction& tx) const {
- switch (opType) {
- case NKikimrSchemeOp::EOperationType::ESchemeOpMkDir:
- return CreateMkDir(NextPartId(), tx);
- case NKikimrSchemeOp::EOperationType::ESchemeOpRmDir:
- return CreateRmDir(NextPartId(), tx);
- case NKikimrSchemeOp::EOperationType::ESchemeOpModifyACL:
- return CreateModifyACL(NextPartId(), tx);
- case NKikimrSchemeOp::EOperationType::ESchemeOpAlterUserAttributes:
- return CreateAlterUserAttrs(NextPartId(), tx);
- case NKikimrSchemeOp::EOperationType::ESchemeOpForceDropUnsafe:
- return CreateForceDropUnsafe(NextPartId(), tx);
- case NKikimrSchemeOp::EOperationType::ESchemeOpCreateTable:
- return CreateNewTable(NextPartId(), tx);
- case NKikimrSchemeOp::EOperationType::ESchemeOpAlterTable:
- Y_FAIL("in general, alter table is multipart operation now due table indexes");
- case NKikimrSchemeOp::EOperationType::ESchemeOpSplitMergeTablePartitions:
- return CreateSplitMerge(NextPartId(), tx);
- case NKikimrSchemeOp::EOperationType::ESchemeOpBackup:
- return CreateBackup(NextPartId(), tx);
- case NKikimrSchemeOp::EOperationType::ESchemeOpRestore:
- return CreateRestore(NextPartId(), tx);
- case NKikimrSchemeOp::EOperationType::ESchemeOpDropTable:
- Y_FAIL("in general, drop table is multipart operation now due table indexes");
- case NKikimrSchemeOp::EOperationType::ESchemeOpCreateIndexedTable:
- Y_FAIL("multipart operations are handled before, also they require transaction details");
- case NKikimrSchemeOp::EOperationType::ESchemeOpCreateTableIndex:
- Y_FAIL("is handled as part of ESchemeOpCreateIndexedTable");
- case NKikimrSchemeOp::EOperationType::ESchemeOpDropTableIndex:
- Y_FAIL("is handled as part of ESchemeOpDropTable");
- case NKikimrSchemeOp::EOperationType::ESchemeOpCreateConsistentCopyTables:
- Y_FAIL("multipart operations are handled before, also they require transaction details");
- case NKikimrSchemeOp::EOperationType::ESchemeOpCreateRtmrVolume:
- return CreateNewRTMR(NextPartId(), tx);
- case NKikimrSchemeOp::EOperationType::ESchemeOpCreateColumnStore:
- return CreateNewOlapStore(NextPartId(), tx);
- case NKikimrSchemeOp::EOperationType::ESchemeOpAlterColumnStore:
- return CreateAlterOlapStore(NextPartId(), tx);
- case NKikimrSchemeOp::EOperationType::ESchemeOpDropColumnStore:
- return CreateDropOlapStore(NextPartId(), tx);
- case NKikimrSchemeOp::EOperationType::ESchemeOpCreateColumnTable:
- return CreateNewColumnTable(NextPartId(), tx);
- case NKikimrSchemeOp::EOperationType::ESchemeOpAlterColumnTable:
- return CreateAlterColumnTable(NextPartId(), tx);
- case NKikimrSchemeOp::EOperationType::ESchemeOpDropColumnTable:
- return CreateDropColumnTable(NextPartId(), tx);
- case NKikimrSchemeOp::EOperationType::ESchemeOpCreatePersQueueGroup:
- return CreateNewPQ(NextPartId(), tx);
- case NKikimrSchemeOp::EOperationType::ESchemeOpAlterPersQueueGroup:
- return CreateAlterPQ(NextPartId(), tx);
- case NKikimrSchemeOp::EOperationType::ESchemeOpDropPersQueueGroup:
- return CreateDropPQ(NextPartId(), tx);
- case NKikimrSchemeOp::EOperationType::ESchemeOpAllocatePersQueueGroup:
- return CreateAllocatePQ(NextPartId(), tx);
- case NKikimrSchemeOp::EOperationType::ESchemeOpDeallocatePersQueueGroup:
- return CreateDeallocatePQ(NextPartId(), tx);
- case NKikimrSchemeOp::EOperationType::ESchemeOpCreateSolomonVolume:
- return CreateNewSolomon(NextPartId(), tx);
- case NKikimrSchemeOp::EOperationType::ESchemeOpAlterSolomonVolume:
- return CreateAlterSolomon(NextPartId(), tx);
- case NKikimrSchemeOp::EOperationType::ESchemeOpDropSolomonVolume:
- return CreateDropSolomon(NextPartId(), tx);
- case NKikimrSchemeOp::EOperationType::ESchemeOpCreateSubDomain:
- return CreateSubDomain(NextPartId(), tx);
- case NKikimrSchemeOp::EOperationType::ESchemeOpAlterSubDomain:
- Y_FAIL("run in compatible");
- case NKikimrSchemeOp::EOperationType::ESchemeOpDropSubDomain:
- return CreateDropSubdomain(NextPartId(), tx);
- case NKikimrSchemeOp::EOperationType::ESchemeOpForceDropSubDomain:
- Y_FAIL("run in compatible");
- // ExtSubDomain
- case NKikimrSchemeOp::EOperationType::ESchemeOpCreateExtSubDomain:
- return CreateExtSubDomain(NextPartId(), tx);
- case NKikimrSchemeOp::EOperationType::ESchemeOpAlterExtSubDomain:
- return CreateAlterExtSubDomain(NextPartId(), tx);
- case NKikimrSchemeOp::EOperationType::ESchemeOpAlterExtSubDomainCreateHive:
- Y_FAIL("multipart operations are handled before, also they require transaction details");
- case NKikimrSchemeOp::EOperationType::ESchemeOpForceDropExtSubDomain:
- return CreateForceDropExtSubDomain(NextPartId(), tx);
- case NKikimrSchemeOp::EOperationType::ESchemeOpCreateKesus:
- return CreateNewKesus(NextPartId(), tx);
- case NKikimrSchemeOp::EOperationType::ESchemeOpAlterKesus:
- return CreateAlterKesus(NextPartId(), tx);
- case NKikimrSchemeOp::EOperationType::ESchemeOpDropKesus:
- return CreateDropKesus(NextPartId(), tx);
- case NKikimrSchemeOp::EOperationType::ESchemeOpUpgradeSubDomain:
- return CreateUpgradeSubDomain(NextPartId(), tx);
- case NKikimrSchemeOp::EOperationType::ESchemeOpUpgradeSubDomainDecision:
- return CreateUpgradeSubDomainDecision(NextPartId(), tx);
- case NKikimrSchemeOp::EOperationType::ESchemeOpCreateIndexBuild:
- Y_FAIL("multipart operations are handled before, also they require transaction details");
- case NKikimrSchemeOp::EOperationType::ESchemeOpCreateLock:
- return CreateLock(NextPartId(), tx);
- case NKikimrSchemeOp::EOperationType::ESchemeOpDropLock:
- return DropLock(NextPartId(), tx);
- // BlockStore
- case NKikimrSchemeOp::EOperationType::ESchemeOpCreateBlockStoreVolume:
- return CreateNewBSV(NextPartId(), tx);
- case NKikimrSchemeOp::EOperationType::ESchemeOpAssignBlockStoreVolume:
- return CreateAssignBSV(NextPartId(), tx);
- case NKikimrSchemeOp::EOperationType::ESchemeOpAlterBlockStoreVolume:
- return CreateAlterBSV(NextPartId(), tx);
- case NKikimrSchemeOp::EOperationType::ESchemeOpDropBlockStoreVolume:
- return CreateDropBSV(NextPartId(), tx);
- // FileStore
- case NKikimrSchemeOp::EOperationType::ESchemeOpCreateFileStore:
- return CreateNewFileStore(NextPartId(), tx);
- case NKikimrSchemeOp::EOperationType::ESchemeOpAlterFileStore:
- return CreateAlterFileStore(NextPartId(), tx);
- case NKikimrSchemeOp::EOperationType::ESchemeOpDropFileStore:
- return CreateDropFileStore(NextPartId(), tx);
- // Login
- case NKikimrSchemeOp::EOperationType::ESchemeOpAlterLogin:
- return CreateAlterLogin(NextPartId(), tx);
- // Sequence
- case NKikimrSchemeOp::EOperationType::ESchemeOpCreateSequence:
- return CreateNewSequence(NextPartId(), tx);
- case NKikimrSchemeOp::EOperationType::ESchemeOpAlterSequence:
- Y_FAIL("TODO: implement");
- case NKikimrSchemeOp::EOperationType::ESchemeOpDropSequence:
- return CreateDropSequence(NextPartId(), tx);
- // Index
- case NKikimrSchemeOp::EOperationType::ESchemeOpApplyIndexBuild:
- Y_FAIL("multipart operations are handled before, also they require transaction details");
- case NKikimrSchemeOp::EOperationType::ESchemeOpAlterTableIndex:
- Y_FAIL("multipart operations are handled before, also they require transaction details");
- case NKikimrSchemeOp::EOperationType::ESchemeOpInitiateBuildIndexImplTable:
- Y_FAIL("multipart operations are handled before, also they require transaction details");
- case NKikimrSchemeOp::EOperationType::ESchemeOpFinalizeBuildIndexImplTable:
- Y_FAIL("multipart operations are handled before, also they require transaction details");
- case NKikimrSchemeOp::EOperationType::ESchemeOpInitiateBuildIndexMainTable:
- Y_FAIL("multipart operations are handled before, also they require transaction details");
- case NKikimrSchemeOp::EOperationType::ESchemeOpFinalizeBuildIndexMainTable:
- Y_FAIL("multipart operations are handled before, also they require transaction details");
- case NKikimrSchemeOp::EOperationType::ESchemeOpCancelIndexBuild:
- Y_FAIL("multipart operations are handled before, also they require transaction details");
- case NKikimrSchemeOp::EOperationType::ESchemeOpDropIndex:
- Y_FAIL("multipart operations are handled before, also they require transaction details");
- case NKikimrSchemeOp::EOperationType::ESchemeOpDropTableIndexAtMainTable:
- Y_FAIL("multipart operations are handled before, also they require transaction details");
- // CDC
- case NKikimrSchemeOp::EOperationType::ESchemeOpCreateCdcStream:
- case NKikimrSchemeOp::EOperationType::ESchemeOpCreateCdcStreamImpl:
- case NKikimrSchemeOp::EOperationType::ESchemeOpCreateCdcStreamAtTable:
- Y_FAIL("multipart operations are handled before, also they require transaction details");
- case NKikimrSchemeOp::EOperationType::ESchemeOpAlterCdcStream:
- case NKikimrSchemeOp::EOperationType::ESchemeOpAlterCdcStreamImpl:
- case NKikimrSchemeOp::EOperationType::ESchemeOpAlterCdcStreamAtTable:
- Y_FAIL("multipart operations are handled before, also they require transaction details");
- case NKikimrSchemeOp::EOperationType::ESchemeOpDropCdcStream:
- case NKikimrSchemeOp::EOperationType::ESchemeOpDropCdcStreamImpl:
- case NKikimrSchemeOp::EOperationType::ESchemeOpDropCdcStreamAtTable:
- Y_FAIL("multipart operations are handled before, also they require transaction details");
- case NKikimrSchemeOp::EOperationType::ESchemeOp_DEPRECATED_35:
- Y_FAIL("impossible");
- // Move
- case NKikimrSchemeOp::EOperationType::ESchemeOpMoveTable:
- return CreateMoveTable(NextPartId(), tx);
- case NKikimrSchemeOp::EOperationType::ESchemeOpMoveTableIndex:
- return CreateMoveTableIndex(NextPartId(), tx);
- case NKikimrSchemeOp::EOperationType::ESchemeOpMoveIndex:
- Y_FAIL("impossible");
- // Replication
- case NKikimrSchemeOp::EOperationType::ESchemeOpCreateReplication:
- return CreateNewReplication(NextPartId(), tx);
- case NKikimrSchemeOp::EOperationType::ESchemeOpAlterReplication:
- Y_FAIL("TODO: implement");
- case NKikimrSchemeOp::EOperationType::ESchemeOpDropReplication:
- return CreateDropReplication(NextPartId(), tx);
- // BlobDepot
- case NKikimrSchemeOp::EOperationType::ESchemeOpCreateBlobDepot:
- return CreateNewBlobDepot(NextPartId(), tx);
- case NKikimrSchemeOp::EOperationType::ESchemeOpAlterBlobDepot:
- return CreateAlterBlobDepot(NextPartId(), tx);
- case NKikimrSchemeOp::EOperationType::ESchemeOpDropBlobDepot:
- return CreateDropBlobDepot(NextPartId(), tx);
- // ExternalTable
- case NKikimrSchemeOp::EOperationType::ESchemeOpCreateExternalTable:
- return CreateNewExternalTable(NextPartId(), tx);
- case NKikimrSchemeOp::EOperationType::ESchemeOpDropExternalTable:
- return CreateDropExternalTable(NextPartId(), tx);
- case NKikimrSchemeOp::EOperationType::ESchemeOpAlterExternalTable:
- Y_FAIL("TODO: implement");
- }
- Y_UNREACHABLE();
- }
- TVector<ISubOperation::TPtr> TOperation::ConstructParts(const TTxTransaction& tx, TOperationContext& context) const {
- const auto& opType = tx.GetOperationType();
- switch (opType) {
- case NKikimrSchemeOp::EOperationType::ESchemeOpCreateTable:
- if (tx.GetCreateTable().HasCopyFromTable()) {
- return CreateCopyTable(NextPartId(), tx, context); // Copy indexes table as well as common table
- }
- return {ConstructPart(opType, tx)};
- case NKikimrSchemeOp::EOperationType::ESchemeOpCreateIndexedTable:
- return CreateIndexedTable(NextPartId(), tx, context);
- case NKikimrSchemeOp::EOperationType::ESchemeOpCreateConsistentCopyTables:
- return CreateConsistentCopyTables(NextPartId(), tx, context);
- case NKikimrSchemeOp::EOperationType::ESchemeOpDropTable:
- return CreateDropIndexedTable(NextPartId(), tx, context);
- case NKikimrSchemeOp::EOperationType::ESchemeOpForceDropSubDomain:
- return {CreateCompatibleSubdomainDrop(context.SS, NextPartId(), tx)};
- case NKikimrSchemeOp::EOperationType::ESchemeOpCreateIndexBuild:
- return CreateBuildIndex(NextPartId(), tx, context);
- case NKikimrSchemeOp::EOperationType::ESchemeOpApplyIndexBuild:
- return ApplyBuildIndex(NextPartId(), tx, context);
- case NKikimrSchemeOp::EOperationType::ESchemeOpDropIndex:
- return CreateDropIndex(NextPartId(), tx, context);
- case NKikimrSchemeOp::EOperationType::ESchemeOpCancelIndexBuild:
- return CancelBuildIndex(NextPartId(), tx, context);
- case NKikimrSchemeOp::EOperationType::ESchemeOpAlterSubDomain:
- return {CreateCompatibleSubdomainAlter(context.SS, NextPartId(), tx)};
- case NKikimrSchemeOp::EOperationType::ESchemeOpCreateCdcStream:
- return CreateNewCdcStream(NextPartId(), tx, context);
- case NKikimrSchemeOp::EOperationType::ESchemeOpAlterCdcStream:
- return CreateAlterCdcStream(NextPartId(), tx, context);
- case NKikimrSchemeOp::EOperationType::ESchemeOpDropCdcStream:
- return CreateDropCdcStream(NextPartId(), tx, context);
- case NKikimrSchemeOp::EOperationType::ESchemeOpMoveTable:
- return CreateConsistentMoveTable(NextPartId(), tx, context);
- case NKikimrSchemeOp::EOperationType::ESchemeOpAlterTable:
- return CreateConsistentAlterTable(NextPartId(), tx, context);
- case NKikimrSchemeOp::EOperationType::ESchemeOpMoveIndex:
- return CreateConsistentMoveIndex(NextPartId(), tx, context);
- case NKikimrSchemeOp::EOperationType::ESchemeOpAlterExtSubDomain:
- return CreateCompatibleAlterExtSubDomain(NextPartId(), tx, context);
- default:
- return {ConstructPart(opType, tx)};
- }
- }
- void TOperation::AddPart(ISubOperation::TPtr part) {
- Parts.push_back(part);
- }
- bool TOperation::AddPublishingPath(TPathId pathId, ui64 version) {
- Y_VERIFY(!IsReadyToNotify());
- return Publications.emplace(pathId, version).second;
- }
- bool TOperation::IsPublished() const {
- return Publications.empty();
- }
- void TOperation::ReadyToNotifyPart(TSubTxId partId) {
- ReadyToNotifyParts.insert(partId);
- }
- bool TOperation::IsReadyToNotify(const TActorContext& ctx) const {
- LOG_DEBUG_S(ctx, NKikimrServices::FLAT_TX_SCHEMESHARD,
- "TOperation IsReadyToNotify"
- << ", TxId: " << TxId
- << ", ready parts: " << ReadyToNotifyParts.size() << "/" << Parts.size()
- << ", is published: " << (IsPublished() ? "true" : "false"));
- return IsReadyToNotify();
- }
- bool TOperation::IsReadyToNotify() const {
- return IsPublished() && ReadyToNotifyParts.size() == Parts.size();
- }
- void TOperation::AddNotifySubscriber(const TActorId& actorId) {
- Y_VERIFY(!IsReadyToNotify());
- Subscribers.insert(actorId);
- }
- void TOperation::DoNotify(TSchemeShard*, TSideEffects& sideEffects, const TActorContext& ctx) {
- Y_VERIFY(IsReadyToNotify());
- for (auto& subscriber: Subscribers) {
- THolder<TEvSchemeShard::TEvNotifyTxCompletionResult> msg = MakeHolder<TEvSchemeShard::TEvNotifyTxCompletionResult>(ui64(TxId));
- LOG_DEBUG_S(ctx, NKikimrServices::FLAT_TX_SCHEMESHARD,
- "TOperation DoNotify"
- << " send TEvNotifyTxCompletionResult"
- << " to actorId: " << subscriber
- << " message: " << msg->Record.ShortDebugString());
- sideEffects.Send(subscriber, msg.Release(), ui64(TxId));
- }
- Subscribers.clear();
- }
- bool TOperation::IsReadyToDone(const TActorContext& ctx) const {
- LOG_DEBUG_S(ctx, NKikimrServices::FLAT_TX_SCHEMESHARD,
- "TOperation IsReadyToDone "
- << " TxId: " << TxId
- << " ready parts: " << DoneParts.size() << "/" << Parts.size());
- return DoneParts.size() == Parts.size();
- }
- bool TOperation::IsReadyToPropose(const TActorContext& ctx) const {
- LOG_DEBUG_S(ctx, NKikimrServices::FLAT_TX_SCHEMESHARD,
- "TOperation IsReadyToPropose "
- << ", TxId: " << TxId
- << " ready parts: " << ReadyToProposeParts.size() << "/" << Parts.size());
- return IsReadyToPropose();
- }
- bool TOperation::IsReadyToPropose() const {
- return ReadyToProposeParts.size() == Parts.size();
- }
- void TOperation::ProposePart(TSubTxId partId, TPathId pathId, TStepId minStep) {
- Proposes.push_back(TProposeRec(partId, pathId, minStep));
- ReadyToProposeParts.insert(partId);
- }
- void TOperation::ProposePart(TSubTxId partId, TTabletId tableId) {
- ShardsProposes.push_back(TProposeShards(partId, tableId));
- ReadyToProposeParts.insert(partId);
- }
- void TOperation::DoPropose(TSchemeShard* ss, TSideEffects& sideEffects, const TActorContext& ctx) const {
- Y_VERIFY(IsReadyToPropose());
- //aggregate
- TTabletId selfTabletId = ss->SelfTabletId();
- TTabletId coordinatorId = InvalidTabletId; //common for all parts
- TStepId effectiveMinStep = TStepId(0);
- for (auto [_, pathId, minStep]: Proposes) {
- {
- TTabletId curCoordinatorId = ss->SelectCoordinator(TxId, pathId);
- if (coordinatorId == InvalidTabletId) {
- coordinatorId = curCoordinatorId;
- }
- Y_VERIFY(coordinatorId == curCoordinatorId);
- }
- effectiveMinStep = Max<TStepId>(effectiveMinStep, minStep);
- }
- TSet<TTabletId> shards;
- for (auto [partId, shard]: ShardsProposes) {
- shards.insert(shard);
- sideEffects.RouteByTablet(TOperationId(TxId, partId), shard);
- }
- shards.insert(selfTabletId);
- {
- const ui8 execLevel = 0;
- const TStepId maxStep = TStepId(Max<ui64>());
- THolder<TEvTxProxy::TEvProposeTransaction> message(
- new TEvTxProxy::TEvProposeTransaction(ui64(coordinatorId), ui64(TxId), execLevel, ui64(effectiveMinStep), ui64(maxStep)));
- auto* proposal = message->Record.MutableTransaction();
- auto* reqAffectedSet = proposal->MutableAffectedSet();
- reqAffectedSet->Reserve(shards.size());
- for (auto affectedTablet : shards) {
- auto* x = reqAffectedSet->Add();
- x->SetTabletId(ui64(affectedTablet));
- x->SetFlags(2 /*todo: use generic enum*/);
- }
- // TODO: probably want this for drops only
- proposal->SetIgnoreLowDiskSpace(true);
- LOG_DEBUG_S(ctx, NKikimrServices::FLAT_TX_SCHEMESHARD,
- "TOperation DoPropose"
- << " send propose"
- << " to coordinator: " << coordinatorId
- << " message:" << message->Record.ShortDebugString());
- sideEffects.BindMsgToPipe(TOperationId(TxId, InvalidSubTxId), coordinatorId, TPipeMessageId(0, TxId), message.Release());
- }
- }
- void TOperation::RegisterRelationByTabletId(TSubTxId partId, TTabletId tablet, const TActorContext& ctx) {
- if (RelationsByTabletId.contains(tablet)) {
- if (RelationsByTabletId.at(tablet) != partId) {
- // it is Ok if Hive otherwise it is error
- LOG_DEBUG_S(ctx, NKikimrServices::FLAT_TX_SCHEMESHARD,
- "TOperation RegisterRelationByTabletId"
- << " collision in routes has found"
- << ", TxId: " << TxId
- << ", partId: " << partId
- << ", prev tablet: " << RelationsByTabletId.at(tablet)
- << ", new tablet: " << tablet);
- RelationsByTabletId.erase(tablet);
- }
- return;
- }
- LOG_DEBUG_S(ctx, NKikimrServices::FLAT_TX_SCHEMESHARD,
- "TOperation RegisterRelationByTabletId"
- << ", TxId: " << TxId
- << ", partId: " << partId
- << ", tablet: " << tablet);
- RelationsByTabletId[tablet] = partId;
- }
- TSubTxId TOperation::FindRelatedPartByTabletId(TTabletId tablet, const TActorContext& ctx) const {
- auto partIdPtr = RelationsByTabletId.FindPtr(tablet);
- auto partId = partIdPtr == nullptr ? InvalidSubTxId : *partIdPtr;
- LOG_DEBUG_S(ctx, NKikimrServices::FLAT_TX_SCHEMESHARD,
- "TOperation FindRelatedPartByTabletId"
- << ", TxId: " << TxId
- << ", tablet: " << tablet
- << ", partId: " << partId);
- return partId;
- }
- void TOperation::RegisterRelationByShardIdx(TSubTxId partId, TShardIdx shardIdx, const TActorContext& ctx) {
- if (RelationsByShardIdx.contains(shardIdx)) {
- Y_VERIFY(RelationsByShardIdx.at(shardIdx) == partId);
- return;
- }
- LOG_DEBUG_S(ctx, NKikimrServices::FLAT_TX_SCHEMESHARD,
- "TOperation RegisterRelationByShardIdx"
- << ", TxId: " << TxId
- << ", shardIdx: " << shardIdx
- << ", partId: " << partId);
- RelationsByShardIdx[shardIdx] = partId;
- }
- TSubTxId TOperation::FindRelatedPartByShardIdx(TShardIdx shardIdx, const TActorContext& ctx) const {
- auto partIdPtr = RelationsByShardIdx.FindPtr(shardIdx);
- auto partId = partIdPtr == nullptr ? InvalidSubTxId : *partIdPtr;
- LOG_DEBUG_S(ctx, NKikimrServices::FLAT_TX_SCHEMESHARD,
- "TOperation FindRelatedPartByShardIdx"
- << ", TxId: " << TxId
- << ", shardIdx: " << shardIdx
- << ", partId: " << partId);
- return partId;
- }
- void TOperation::WaitShardCreated(TShardIdx shardIdx, TSubTxId partId) {
- WaitingShardCreatedByShard[shardIdx].insert(partId);
- WaitingShardCreatedByPart[partId].insert(shardIdx);
- }
- TVector<TSubTxId> TOperation::ActivateShardCreated(TShardIdx shardIdx) {
- TVector<TSubTxId> parts;
- auto it = WaitingShardCreatedByShard.find(shardIdx);
- if (it != WaitingShardCreatedByShard.end()) {
- for (auto partId : it->second) {
- auto itByPart = WaitingShardCreatedByPart.find(partId);
- Y_VERIFY(itByPart != WaitingShardCreatedByPart.end());
- itByPart->second.erase(shardIdx);
- if (itByPart->second.empty()) {
- WaitingShardCreatedByPart.erase(itByPart);
- parts.push_back(partId);
- }
- }
- WaitingShardCreatedByShard.erase(it);
- }
- return parts;
- }
- void TOperation::RegisterWaitPublication(TSubTxId partId, TPathId pathId, ui64 pathVersion) {
- auto publication = TPublishPath(pathId, pathVersion);
- WaitingPublicationsByPart[partId].insert(publication);
- WaitingPublicationsByPath[publication].insert(partId);
- }
- TSet<TOperationId> TOperation::ActivatePartsWaitPublication(TPathId pathId, ui64 pathVersion) {
- TSet<TOperationId> activateParts;
- auto it = WaitingPublicationsByPath.lower_bound({pathId, 0}); // iterate all path version [0; pathVersion]
- while (it != WaitingPublicationsByPath.end()
- && it->first.first == pathId && it->first.second <= pathVersion)
- {
- auto waitVersion = it->first.second;
- for (const auto& partId: it->second) {
- LOG_INFO_S(TlsActivationContext->AsActorContext(), NKikimrServices::FLAT_TX_SCHEMESHARD,
- "ActivateWaitPublication, publication confirmed"
- << ", opId: " << TOperationId(TxId, partId)
- << ", pathId: " << pathId
- << ", version: " << waitVersion);
- WaitingPublicationsByPart[partId].erase(TPublishPath(pathId, waitVersion));
- if (WaitingPublicationsByPart.at(partId).empty()) {
- WaitingPublicationsByPart.erase(partId);
- }
- activateParts.insert(TOperationId(TxId, partId)); // activate on every path
- }
- it = WaitingPublicationsByPath.erase(it); // move iterator it forward to the next element
- }
- return activateParts;
- }
- ui64 TOperation::CountWaitPublication(TOperationId opId) const {
- auto it = WaitingPublicationsByPart.find(opId.GetSubTxId());
- if (it == WaitingPublicationsByPart.end()) {
- return 0;
- }
- return it->second.size();
- }
- }
|