schemeshard__operation.cpp 66 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737738739740741742743744745746747748749750751752753754755756757758759760761762763764765766767768769770771772773774775776777778779780781782783784785786787788789790791792793794795796797798799800801802803804805806807808809810811812813814815816817818819820821822823824825826827828829830831832833834835836837838839840841842843844845846847848849850851852853854855856857858859860861862863864865866867868869870871872873874875876877878879880881882883884885886887888889890891892893894895896897898899900901902903904905906907908909910911912913914915916917918919920921922923924925926927928929930931932933934935936937938939940941942943944945946947948949950951952953954955956957958959960961962963964965966967968969970971972973974975976977978979980981982983984985986987988989990991992993994995996997998999100010011002100310041005100610071008100910101011101210131014101510161017101810191020102110221023102410251026102710281029103010311032103310341035103610371038103910401041104210431044104510461047104810491050105110521053105410551056105710581059106010611062106310641065106610671068106910701071107210731074107510761077107810791080108110821083108410851086108710881089109010911092109310941095109610971098109911001101110211031104110511061107110811091110111111121113111411151116111711181119112011211122112311241125112611271128112911301131113211331134113511361137113811391140114111421143114411451146114711481149115011511152115311541155115611571158115911601161116211631164116511661167116811691170117111721173117411751176117711781179118011811182118311841185118611871188118911901191119211931194119511961197119811991200120112021203120412051206120712081209121012111212121312141215121612171218121912201221122212231224122512261227122812291230123112321233123412351236123712381239124012411242124312441245124612471248124912501251125212531254125512561257125812591260126112621263126412651266126712681269127012711272127312741275127612771278127912801281128212831284128512861287128812891290129112921293129412951296129712981299130013011302130313041305130613071308130913101311131213131314131513161317131813191320132113221323132413251326132713281329133013311332133313341335133613371338133913401341134213431344134513461347134813491350135113521353135413551356135713581359136013611362136313641365136613671368136913701371137213731374137513761377137813791380138113821383138413851386138713881389139013911392139313941395139613971398139914001401140214031404140514061407140814091410141114121413141414151416141714181419142014211422142314241425142614271428142914301431143214331434143514361437143814391440144114421443144414451446144714481449145014511452145314541455145614571458145914601461146214631464146514661467146814691470147114721473147414751476147714781479148014811482148314841485148614871488148914901491149214931494149514961497149814991500150115021503150415051506150715081509151015111512151315141515151615171518151915201521152215231524152515261527152815291530153115321533153415351536153715381539154015411542154315441545154615471548154915501551155215531554155515561557155815591560
  1. #include "schemeshard__operation.h"
  2. #include "schemeshard__operation_part.h"
  3. #include "schemeshard__operation_side_effects.h"
  4. #include "schemeshard__operation_memory_changes.h"
  5. #include "schemeshard__operation_db_changes.h"
  6. #include "schemeshard_audit_log_fragment.h"
  7. #include "ydb/core/audit/audit_log.h"
  8. #include "schemeshard_impl.h"
  9. #include <ydb/core/tablet/tablet_exception.h>
  10. #include <ydb/core/tablet_flat/flat_cxx_database.h>
  11. #include <ydb/core/tablet_flat/tablet_flat_executor.h>
  12. #include <util/generic/algorithm.h>
  13. namespace NKikimr::NSchemeShard {
  14. using namespace NTabletFlatExecutor;
  15. std::tuple<TMaybe<NACLib::TUserToken>, bool> ParseUserToken(const TString& tokenStr) {
  16. TMaybe<NACLib::TUserToken> result;
  17. bool parseError = false;
  18. if (!tokenStr.empty()) {
  19. NACLibProto::TUserToken tokenPb;
  20. if (tokenPb.ParseFromString(tokenStr)) {
  21. result = NACLib::TUserToken(tokenPb);
  22. } else {
  23. parseError = true;
  24. }
  25. }
  26. return std::make_tuple(result, parseError);
  27. }
  28. TString RenderPaths(const TVector<TString>& paths) {
  29. auto result = TStringBuilder();
  30. result << "[" << JoinStrings(paths.begin(), paths.end(), ", ") << "]";
  31. return result;
  32. }
  33. void AuditLogModifySchemeTransaction(const NKikimrScheme::TEvModifySchemeTransaction& request, const NKikimrScheme::TEvModifySchemeTransactionResult& response, TSchemeShard* SS, const TString& userSID) {
  34. // Each TEvModifySchemeTransaction.Transaction is a self sufficient operation and should be logged independently
  35. // (even if it was packed into a single TxProxy transaction with some other operations).
  36. //NOTE: UserSIDNone couldn't be an empty string as "subject" field is a required one,
  37. // but AUDIT_PART() skips any part with an empty value
  38. static const TString EmptyValue = "{none}";
  39. for (const auto& operation : request.GetTransaction()) {
  40. auto logEntry = MakeAuditLogFragment(operation);
  41. auto databasePath = TPath::Resolve(operation.GetWorkingDir(), SS);
  42. if (!databasePath.IsResolved()) {
  43. databasePath.RiseUntilFirstResolvedParent();
  44. }
  45. auto peerName = request.GetPeerName();
  46. AUDIT_LOG(
  47. AUDIT_PART("txId", std::to_string(request.GetTxId()))
  48. AUDIT_PART("remote_address", (!peerName.empty() ? peerName : EmptyValue) )
  49. AUDIT_PART("subject", (!userSID.empty() ? userSID : EmptyValue))
  50. AUDIT_PART("database", (!databasePath.IsEmpty() ? databasePath.GetDomainPathString() : EmptyValue))
  51. AUDIT_PART("operation", logEntry.Operation)
  52. AUDIT_PART("paths", RenderPaths(logEntry.Paths), !logEntry.Paths.empty())
  53. AUDIT_PART("status", NKikimrScheme::EStatus_Name(response.GetStatus()))
  54. AUDIT_PART("reason", response.GetReason(), response.HasReason())
  55. );
  56. }
  57. }
  58. struct TSchemeShard::TTxOperationProposeCancelTx: public NTabletFlatExecutor::TTransactionBase<TSchemeShard> {
  59. TEvSchemeShard::TEvCancelTx::TPtr Ev;
  60. TSideEffects OnComplete;
  61. TMemoryChanges MemChanges;
  62. TStorageChanges DbChanges;
  63. TTxOperationProposeCancelTx(TSchemeShard* self, TEvSchemeShard::TEvCancelTx::TPtr ev)
  64. : TBase(self)
  65. , Ev(ev)
  66. {}
  67. TTxType GetTxType() const override { return TXTYPE_CANCEL_BACKUP_IMPL; }
  68. bool Execute(NTabletFlatExecutor::TTransactionContext& txc, const TActorContext& ctx) override {
  69. const auto& record = Ev->Get()->Record;
  70. LOG_DEBUG_S(ctx, NKikimrServices::FLAT_TX_SCHEMESHARD,
  71. "TTxOperationProposeCancelTx Execute"
  72. << ", at schemeshard: " << Self->TabletID()
  73. << ", message: " << record.ShortDebugString());
  74. txc.DB.NoMoreReadsForTx();
  75. ISubOperation::TPtr part = CreateTxCancelTx(Ev);
  76. TOperationContext context{Self, txc, ctx, OnComplete, MemChanges, DbChanges};
  77. auto fakeResponse = part->Propose(TString(), context);
  78. Y_UNUSED(fakeResponse);
  79. OnComplete.ApplyOnExecute(Self, txc, ctx);
  80. DbChanges.Apply(Self, txc, ctx);
  81. return true;
  82. }
  83. void Complete(const TActorContext& ctx) override {
  84. LOG_DEBUG_S(ctx, NKikimrServices::FLAT_TX_SCHEMESHARD,
  85. "TTxOperationProposeCancelTx Complete"
  86. << ", at schemeshard: " << Self->TabletID());
  87. OnComplete.ApplyOnComplete(Self, ctx);
  88. }
  89. };
  90. NKikimrScheme::TEvModifySchemeTransaction GetRecordForPrint(const NKikimrScheme::TEvModifySchemeTransaction& record) {
  91. auto recordForPrint = record;
  92. if (record.HasUserToken()) {
  93. recordForPrint.SetUserToken("***hide token***");
  94. }
  95. return recordForPrint;
  96. }
  97. THolder<TProposeResponse> TSchemeShard::IgniteOperation(TProposeRequest& request, TOperationContext& context) {
  98. THolder<TProposeResponse> response = nullptr;
  99. auto selfId = SelfTabletId();
  100. auto& record = request.Record;
  101. auto txId = TTxId(record.GetTxId());
  102. if (Operations.contains(txId)) {
  103. response.Reset(new TProposeResponse(NKikimrScheme::StatusAccepted, ui64(txId), ui64(selfId)));
  104. response->SetError(NKikimrScheme::StatusAccepted, "There is operation with the same txId has been found in flight."
  105. " Actually that shouldn't have happened."
  106. " Note that tx body equality isn't granted."
  107. " StatusAccepted is just returned on retries.");
  108. return std::move(response);
  109. }
  110. TOperation::TPtr operation = new TOperation(txId);
  111. Operations[txId] = operation; //record is erased at ApplyOnExecute if all parts are done at propose
  112. for (const auto& transaction : record.GetTransaction()) {
  113. auto quotaResult = operation->ConsumeQuota(transaction, context);
  114. if (quotaResult.Status != NKikimrScheme::StatusSuccess) {
  115. response.Reset(new TProposeResponse(quotaResult.Status, ui64(txId), ui64(selfId)));
  116. response->SetError(quotaResult.Status, quotaResult.Reason);
  117. Operations.erase(txId);
  118. return std::move(response);
  119. }
  120. }
  121. if (record.HasFailOnExist()) {
  122. // inherit FailOnExist from TEvModifySchemeTransaction into TModifyScheme
  123. for (auto& transaction : *record.MutableTransaction()) {
  124. if (!transaction.HasFailOnExist()) {
  125. transaction.SetFailOnExist(record.GetFailOnExist());
  126. }
  127. }
  128. }
  129. TVector<TTxTransaction> transactions;
  130. for (const auto& transaction : record.GetTransaction()) {
  131. auto splitResult = operation->SplitIntoTransactions(transaction, context);
  132. if (splitResult.Status != NKikimrScheme::StatusSuccess) {
  133. response.Reset(new TProposeResponse(splitResult.Status, ui64(txId), ui64(selfId)));
  134. response->SetError(splitResult.Status, splitResult.Reason);
  135. Operations.erase(txId);
  136. return std::move(response);
  137. }
  138. std::move(splitResult.Transactions.begin(), splitResult.Transactions.end(), std::back_inserter(transactions));
  139. }
  140. const TString owner = record.HasOwner() ? record.GetOwner() : BUILTIN_ACL_ROOT;
  141. for (const auto& transaction : transactions) {
  142. auto parts = operation->ConstructParts(transaction, context);
  143. if (parts.size() > 1) {
  144. // les't allow altering impl index tables as part of consistent operation
  145. context.IsAllowedPrivateTables = true;
  146. }
  147. for (auto& part : parts) {
  148. TString errStr;
  149. if (!context.SS->CheckInFlightLimit(part->GetTransaction().GetOperationType(), errStr)) {
  150. response.Reset(new TProposeResponse(NKikimrScheme::StatusResourceExhausted, ui64(txId), ui64(selfId)));
  151. response->SetError(NKikimrScheme::StatusResourceExhausted, errStr);
  152. } else {
  153. response = part->Propose(owner, context);
  154. }
  155. Y_VERIFY(response);
  156. LOG_NOTICE_S(context.Ctx, NKikimrServices::FLAT_TX_SCHEMESHARD,
  157. "IgniteOperation"
  158. << ", opId: " << operation->NextPartId()
  159. << ", propose status:" << NKikimrScheme::EStatus_Name(response->Record.GetStatus())
  160. << ", reason: " << response->Record.GetReason()
  161. << ", at schemeshard: " << selfId);
  162. if (response->IsDone()) {
  163. operation->AddPart(part); //at ApplyOnExecute parts is erased
  164. context.OnComplete.DoneOperation(part->GetOperationId()); //mark it here by self for sure
  165. } else if (response->IsConditionalAccepted()) {
  166. //happens on retries, we answer like AlreadyExist or StatusSuccess with error message and do nothing in operation
  167. operation->AddPart(part); //at ApplyOnExecute parts is erased
  168. context.OnComplete.DoneOperation(part->GetOperationId()); //mark it here by self for sure
  169. } else if (response->IsAccepted()) {
  170. operation->AddPart(part);
  171. //context.OnComplete.ActivateTx(partOpId) ///TODO maybe it is good idea
  172. } else {
  173. if (!operation->Parts.empty()) {
  174. LOG_NOTICE_S(context.Ctx, NKikimrServices::FLAT_TX_SCHEMESHARD,
  175. "Abort operation: IgniteOperation fail to propose a part"
  176. << ", opId: " << part->GetOperationId()
  177. << ", at schemeshard: " << selfId
  178. << ", already accepted parts: " << operation->Parts.size()
  179. << ", propose result status: " << NKikimrScheme::EStatus_Name(response->Record.GetStatus())
  180. << ", with reason: " << response->Record.GetReason()
  181. << ", tx message: " << GetRecordForPrint(record).ShortDebugString());
  182. }
  183. Y_VERIFY_S(context.IsUndoChangesSafe(),
  184. "Operation is aborted and all changes should be reverted"
  185. << ", but context.IsUndoChangesSafe is false, which means some direct writes have been done"
  186. << ", opId: " << part->GetOperationId()
  187. << ", at schemeshard: " << selfId
  188. << ", already accepted parts: " << operation->Parts.size()
  189. << ", propose result status: " << NKikimrScheme::EStatus_Name(response->Record.GetStatus())
  190. << ", with reason: " << response->Record.GetReason()
  191. << ", tx message: " << GetRecordForPrint(record).ShortDebugString());
  192. context.OnComplete = {}; // recreate
  193. context.DbChanges = {};
  194. for (auto& toAbort : operation->Parts) {
  195. toAbort->AbortPropose(context);
  196. }
  197. context.MemChanges.UnDo(context.SS);
  198. context.OnComplete.ApplyOnExecute(context.SS, context.GetTxc(), context.Ctx);
  199. Operations.erase(txId);
  200. return std::move(response);
  201. }
  202. }
  203. }
  204. return std::move(response);
  205. }
  206. struct TSchemeShard::TTxOperationPropose: public NTabletFlatExecutor::TTransactionBase<TSchemeShard> {
  207. using TBase = NTabletFlatExecutor::TTransactionBase<TSchemeShard>;
  208. TProposeRequest::TPtr Request;
  209. THolder<TProposeResponse> Response = nullptr;
  210. TString UserSID;
  211. TSideEffects OnComplete;
  212. TTxOperationPropose(TSchemeShard* self, TProposeRequest::TPtr request)
  213. : TBase(self)
  214. , Request(request)
  215. {}
  216. TTxType GetTxType() const override { return TXTYPE_PROPOSE; }
  217. bool Execute(NTabletFlatExecutor::TTransactionContext& txc, const TActorContext& ctx) override {
  218. TTabletId selfId = Self->SelfTabletId();
  219. LOG_DEBUG_S(ctx, NKikimrServices::FLAT_TX_SCHEMESHARD,
  220. "TTxOperationPropose Execute"
  221. << ", message: " << GetRecordForPrint(Request->Get()->Record).ShortDebugString()
  222. << ", at schemeshard: " << selfId);
  223. txc.DB.NoMoreReadsForTx();
  224. auto [userToken, tokenParseError] = ParseUserToken(Request->Get()->Record.GetUserToken());
  225. if (tokenParseError) {
  226. auto txId = Request->Get()->Record.GetTxId();
  227. Response = MakeHolder<TProposeResponse>(NKikimrScheme::StatusInvalidParameter, ui64(txId), ui64(selfId), "Failed to parse user token");
  228. return true;
  229. }
  230. if (userToken) {
  231. UserSID = userToken->GetUserSID();
  232. }
  233. TMemoryChanges memChanges;
  234. TStorageChanges dbChanges;
  235. TOperationContext context{Self, txc, ctx, OnComplete, memChanges, dbChanges, std::move(userToken)};
  236. Response = Self->IgniteOperation(*Request->Get(), context);
  237. OnComplete.ApplyOnExecute(Self, txc, ctx);
  238. dbChanges.Apply(Self, txc, ctx);
  239. return true;
  240. }
  241. void Complete(const TActorContext& ctx) override {
  242. Y_VERIFY(Response);
  243. const auto& record = Request->Get()->Record;
  244. const auto txId = TTxId(record.GetTxId());
  245. LOG_DEBUG_S(ctx, NKikimrServices::FLAT_TX_SCHEMESHARD,
  246. "TTxOperationPropose Complete"
  247. << ", txId: " << txId
  248. << ", response: " << Response->Record.ShortDebugString()
  249. << ", at schemeshard: " << Self->TabletID());
  250. AuditLogModifySchemeTransaction(record, Response->Record, Self, UserSID);
  251. const TActorId sender = Request->Sender;
  252. const ui64 cookie = Request->Cookie;
  253. ctx.Send(sender, Response.Release(), 0, cookie);
  254. OnComplete.ApplyOnComplete(Self, ctx);
  255. }
  256. };
  257. struct TSchemeShard::TTxOperationProgress: public NTabletFlatExecutor::TTransactionBase<TSchemeShard> {
  258. TOperationId OpId;
  259. TSideEffects OnComplete;
  260. TMemoryChanges MemChanges;
  261. TStorageChanges DbChanges;
  262. TTxOperationProgress(TSchemeShard* self, TOperationId id)
  263. : TBase(self)
  264. , OpId(id)
  265. {}
  266. TTxType GetTxType() const override { return TXTYPE_PROGRESS_OP; }
  267. bool Execute(NTabletFlatExecutor::TTransactionContext& txc, const TActorContext& ctx) override {
  268. LOG_DEBUG_S(ctx, NKikimrServices::FLAT_TX_SCHEMESHARD,
  269. "TTxOperationProgress Execute"
  270. << ", operationId: " << OpId
  271. << ", at schemeshard: " << Self->TabletID());
  272. if (!Self->Operations.contains(OpId.GetTxId())) {
  273. LOG_WARN_S(ctx, NKikimrServices::FLAT_TX_SCHEMESHARD,
  274. "TTxOperationProgress Execute"
  275. << " for unknown txId " << OpId.GetTxId());
  276. return true;
  277. }
  278. TOperation::TPtr operation = Self->Operations.at(OpId.GetTxId());
  279. if (operation->DoneParts.contains(OpId.GetSubTxId())) {
  280. LOG_INFO_S(ctx, NKikimrServices::FLAT_TX_SCHEMESHARD,
  281. "TTxOperationProgress Execute"
  282. << " operation already done"
  283. << ", operationId: " << OpId
  284. << ", at schemeshard: " << Self->TabletID());
  285. return true;
  286. }
  287. ISubOperation::TPtr part = operation->Parts.at(ui64(OpId.GetSubTxId()));
  288. TOperationContext context{Self, txc, ctx, OnComplete, MemChanges, DbChanges};
  289. part->ProgressState(context);
  290. OnComplete.ApplyOnExecute(Self, txc, ctx);
  291. DbChanges.Apply(Self, txc, ctx);
  292. return true;
  293. }
  294. void Complete(const TActorContext& ctx) override {
  295. OnComplete.ApplyOnComplete(Self, ctx);
  296. }
  297. };
  298. template <class TEvType>
  299. struct TSchemeShard::TTxOperationReply {};
  300. #define DefineTTxOperationReply(TEvType, TxType) \
  301. template<> \
  302. struct TSchemeShard::TTxOperationReply<TEvType>: public NTabletFlatExecutor::TTransactionBase<TSchemeShard> { \
  303. TOperationId OperationId; \
  304. TEvType::TPtr EvReply; \
  305. TSideEffects OnComplete; \
  306. TMemoryChanges MemChanges; \
  307. TStorageChanges DbChanges; \
  308. \
  309. TTxType GetTxType() const override { return TxType; } \
  310. \
  311. TTxOperationReply(TSchemeShard* self, TOperationId id, TEvType::TPtr& ev) \
  312. : TBase(self) \
  313. , OperationId(id) \
  314. , EvReply(ev) \
  315. { \
  316. Y_VERIFY(TEvType::EventType != TEvPrivate::TEvOperationPlan::EventType); \
  317. Y_VERIFY(TEvType::EventType != TEvTxProcessing::TEvPlanStep::EventType); \
  318. } \
  319. \
  320. bool Execute(NTabletFlatExecutor::TTransactionContext& txc, const TActorContext& ctx) override { \
  321. LOG_DEBUG_S(ctx, NKikimrServices::FLAT_TX_SCHEMESHARD, \
  322. "TTxOperationReply<" #TEvType "> execute " \
  323. << ", operationId: " << OperationId \
  324. << ", at schemeshard: " << Self->TabletID() \
  325. << ", message: " << ISubOperationState::DebugReply(EvReply)); \
  326. if (!Self->Operations.contains(OperationId.GetTxId())) { \
  327. return true; \
  328. } \
  329. TOperation::TPtr operation = Self->Operations.at(OperationId.GetTxId()); \
  330. if (operation->DoneParts.contains(OperationId.GetSubTxId())) { \
  331. LOG_DEBUG_S(ctx, NKikimrServices::FLAT_TX_SCHEMESHARD, \
  332. "TTxOperationReply<" #TEvType "> execute " \
  333. << ", operation already done" \
  334. << ", operationId: " << OperationId \
  335. << ", at schemeshard: " << Self->TabletID()); \
  336. return true; \
  337. } \
  338. ISubOperation::TPtr part = operation->Parts.at(ui64(OperationId.GetSubTxId())); \
  339. TOperationContext context{Self, txc, ctx, OnComplete, MemChanges, DbChanges}; \
  340. Y_VERIFY(EvReply); \
  341. part->HandleReply(EvReply, context); \
  342. OnComplete.ApplyOnExecute(Self, txc, ctx); \
  343. DbChanges.Apply(Self, txc, ctx); \
  344. return true; \
  345. } \
  346. void Complete(const TActorContext& ctx) override { \
  347. OnComplete.ApplyOnComplete(Self, ctx); \
  348. } \
  349. };
  350. SCHEMESHARD_INCOMING_EVENTS(DefineTTxOperationReply)
  351. #undef DefineTxOperationReply
  352. struct TSchemeShard::TTxOperationPlanStep: public NTabletFlatExecutor::TTransactionBase<TSchemeShard> {
  353. TEvTxProcessing::TEvPlanStep::TPtr Ev;
  354. TSideEffects OnComplete;
  355. TMemoryChanges MemChanges;
  356. TStorageChanges DbChanges;
  357. TTxOperationPlanStep(TSchemeShard* self, TEvTxProcessing::TEvPlanStep::TPtr ev)
  358. : TBase(self)
  359. , Ev(ev)
  360. {}
  361. TTxType GetTxType() const override { return TXTYPE_PLAN_STEP; }
  362. bool Execute(NTabletFlatExecutor::TTransactionContext& txc, const TActorContext& ctx) override {
  363. const NKikimrTx::TEvMediatorPlanStep& record = Ev->Get()->Record;
  364. const auto step = TStepId(record.GetStep());
  365. const size_t txCount = record.TransactionsSize();
  366. LOG_NOTICE_S(ctx, NKikimrServices::FLAT_TX_SCHEMESHARD,
  367. "TTxOperationPlanStep Execute"
  368. << ", stepId: " << step
  369. << ", transactions count in step: " << txCount
  370. << ", at schemeshard: " << Self->TabletID());
  371. LOG_DEBUG_S(ctx, NKikimrServices::FLAT_TX_SCHEMESHARD,
  372. "TTxOperationPlanStep Execute"
  373. << ", message: " << record.ShortDebugString()
  374. << ", at schemeshard: " << Self->TabletID());
  375. for (size_t i = 0; i < txCount; ++i) {
  376. const auto txId = TTxId(record.GetTransactions(i).GetTxId());
  377. const auto coordinator = ActorIdFromProto(record.GetTransactions(i).GetAckTo());
  378. const auto coordinatorId = TTabletId(record.GetTransactions(i).GetCoordinator());
  379. if (!Self->Operations.contains(txId)) {
  380. LOG_INFO_S(ctx, NKikimrServices::FLAT_TX_SCHEMESHARD,
  381. "TTxOperationPlanStep Execute"
  382. << " unknown operation, assumed as already done"
  383. << ", transaction Id: " << txId);
  384. OnComplete.CoordinatorAck(coordinator, step, txId);
  385. continue;
  386. }
  387. TOperation::TPtr operation = Self->Operations.at(txId);
  388. for (ui32 partIdx = 0; partIdx < operation->Parts.size(); ++partIdx) {
  389. auto opId = TOperationId(txId, partIdx);
  390. if (operation->DoneParts.contains(TSubTxId(partIdx))) {
  391. LOG_DEBUG_S(ctx, NKikimrServices::FLAT_TX_SCHEMESHARD,
  392. "TTxOperationPlanStep Execute"
  393. << " operation part is already done"
  394. << ", operationId: " << opId);
  395. continue;
  396. }
  397. TOperationContext context{Self, txc, ctx, OnComplete, MemChanges, DbChanges};
  398. THolder<TEvPrivate::TEvOperationPlan> msg = MakeHolder<TEvPrivate::TEvOperationPlan>(ui64(step), ui64(txId));
  399. TEvPrivate::TEvOperationPlan::TPtr personalEv = (TEventHandle<TEvPrivate::TEvOperationPlan>*) new IEventHandle(
  400. context.SS->SelfId(), context.SS->SelfId(), msg.Release());
  401. operation->Parts.at(partIdx)->HandleReply(personalEv, context);
  402. }
  403. OnComplete.CoordinatorAck(coordinator, step, txId);
  404. OnComplete.UnbindMsgFromPipe(TOperationId(txId, InvalidSubTxId), coordinatorId, TPipeMessageId(0, txId));
  405. }
  406. const TActorId mediator = Ev->Sender;
  407. OnComplete.MediatorAck(mediator, step);
  408. OnComplete.ApplyOnExecute(Self, txc, ctx);
  409. DbChanges.Apply(Self, txc, ctx);
  410. return true;
  411. }
  412. void Complete(const TActorContext& ctx) override {
  413. OnComplete.ApplyOnComplete(Self, ctx);
  414. }
  415. };
  416. NTabletFlatExecutor::ITransaction* TSchemeShard::CreateTxOperationPropose(TEvSchemeShard::TEvCancelTx::TPtr& ev) {
  417. return new TTxOperationProposeCancelTx(this, ev);
  418. }
  419. NTabletFlatExecutor::ITransaction* TSchemeShard::CreateTxOperationPropose(TEvSchemeShard::TEvModifySchemeTransaction::TPtr& ev) {
  420. return new TTxOperationPropose(this, ev);
  421. }
  422. NTabletFlatExecutor::ITransaction* TSchemeShard::CreateTxOperationPlanStep(TEvTxProcessing::TEvPlanStep::TPtr& ev) {
  423. return new TTxOperationPlanStep(this, ev);
  424. }
  425. NTabletFlatExecutor::ITransaction* TSchemeShard::CreateTxOperationProgress(TOperationId opId) {
  426. return new TTxOperationProgress(this, opId);
  427. }
  428. #define DefineCreateTxOperationReply(TEvType, TxType) \
  429. NTabletFlatExecutor::ITransaction* TSchemeShard::CreateTxOperationReply(TOperationId id, TEvType::TPtr& ev) { \
  430. return new TTxOperationReply<TEvType>(this, id, ev); \
  431. }
  432. SCHEMESHARD_INCOMING_EVENTS(DefineCreateTxOperationReply)
  433. #undef DefineTxOperationReply
  434. TString JoinPath(const TString& workingDir, const TString& name) {
  435. Y_VERIFY(!name.StartsWith('/') && !name.EndsWith('/'));
  436. return TStringBuilder()
  437. << workingDir
  438. << (workingDir.EndsWith('/') ? "" : "/")
  439. << name;
  440. }
  441. TOperation::TConsumeQuotaResult TOperation::ConsumeQuota(const TTxTransaction& tx, TOperationContext& context) {
  442. TConsumeQuotaResult result;
  443. // Internal operations never consume quota
  444. if (tx.GetInternal()) {
  445. return result;
  446. }
  447. // These operations never consume quota
  448. switch (tx.GetOperationType()) {
  449. case NKikimrSchemeOp::EOperationType::ESchemeOpCreateSubDomain:
  450. case NKikimrSchemeOp::EOperationType::ESchemeOpDropSubDomain:
  451. case NKikimrSchemeOp::EOperationType::ESchemeOpForceDropSubDomain:
  452. case NKikimrSchemeOp::EOperationType::ESchemeOpAlterSubDomain:
  453. case NKikimrSchemeOp::EOperationType::ESchemeOpForceDropUnsafe:
  454. case NKikimrSchemeOp::EOperationType::ESchemeOpCreateExtSubDomain:
  455. case NKikimrSchemeOp::EOperationType::ESchemeOpAlterExtSubDomain:
  456. case NKikimrSchemeOp::EOperationType::ESchemeOpForceDropExtSubDomain:
  457. case NKikimrSchemeOp::EOperationType::ESchemeOpUpgradeSubDomain:
  458. case NKikimrSchemeOp::EOperationType::ESchemeOpUpgradeSubDomainDecision:
  459. return result;
  460. default:
  461. break;
  462. }
  463. const TString workingDir = tx.GetWorkingDir();
  464. TPath path = TPath::Resolve(workingDir, context.SS);
  465. // Find the first directory that actually exists
  466. path.RiseUntilExisted();
  467. // Don't fail on some completely invalid path
  468. if (!path.IsResolved()) {
  469. return result;
  470. }
  471. auto domainPathId = path.GetPathIdForDomain();
  472. auto domainInfo = path.DomainInfo();
  473. if (!domainInfo->TryConsumeSchemeQuota(context.Ctx.Now())) {
  474. result.Status = NKikimrScheme::StatusQuotaExceeded;
  475. result.Reason = "Request exceeded a limit on the number of schema operations, try again later.";
  476. }
  477. // Even if operation fails later we want to persist updated/consumed quotas
  478. NIceDb::TNiceDb db(context.GetTxc().DB); // write quotas directly in db even if operation fails
  479. context.SS->PersistSubDomainSchemeQuotas(db, domainPathId, *domainInfo);
  480. return result;
  481. }
  482. TOperation::TSplitTransactionsResult TOperation::SplitIntoTransactions(const TTxTransaction& tx, const TOperationContext& context) {
  483. TSplitTransactionsResult result;
  484. const TPath parentPath = TPath::Resolve(tx.GetWorkingDir(), context.SS);
  485. {
  486. TPath::TChecker checks = parentPath.Check();
  487. checks
  488. .NotUnderDomainUpgrade()
  489. .IsAtLocalSchemeShard()
  490. .IsResolved()
  491. .NotDeleted()
  492. .NotUnderDeleting()
  493. .IsCommonSensePath()
  494. .IsLikeDirectory();
  495. if (!checks) {
  496. result.Transactions.push_back(tx);
  497. return result;
  498. }
  499. }
  500. TString targetName;
  501. switch (tx.GetOperationType()) {
  502. case NKikimrSchemeOp::EOperationType::ESchemeOpMkDir:
  503. targetName = tx.GetMkDir().GetName();
  504. break;
  505. case NKikimrSchemeOp::EOperationType::ESchemeOpCreateTable:
  506. if (tx.GetCreateTable().HasCopyFromTable()) {
  507. result.Transactions.push_back(tx);
  508. return result;
  509. }
  510. targetName = tx.GetCreateTable().GetName();
  511. break;
  512. case NKikimrSchemeOp::EOperationType::ESchemeOpCreatePersQueueGroup:
  513. targetName = tx.GetCreatePersQueueGroup().GetName();
  514. break;
  515. case NKikimrSchemeOp::EOperationType::ESchemeOpAllocatePersQueueGroup:
  516. targetName = tx.GetAllocatePersQueueGroup().GetName();
  517. break;
  518. case NKikimrSchemeOp::EOperationType::ESchemeOpDeallocatePersQueueGroup:
  519. targetName = tx.GetDeallocatePersQueueGroup().GetName();
  520. break;
  521. case NKikimrSchemeOp::EOperationType::ESchemeOpCreateSubDomain:
  522. case NKikimrSchemeOp::EOperationType::ESchemeOpCreateExtSubDomain:
  523. targetName = tx.GetSubDomain().GetName();
  524. break;
  525. case NKikimrSchemeOp::EOperationType::ESchemeOpCreateRtmrVolume:
  526. targetName = tx.GetCreateRtmrVolume().GetName();
  527. break;
  528. case NKikimrSchemeOp::EOperationType::ESchemeOpCreateBlockStoreVolume:
  529. targetName = tx.GetCreateBlockStoreVolume().GetName();
  530. break;
  531. case NKikimrSchemeOp::EOperationType::ESchemeOpCreateFileStore:
  532. targetName = tx.GetCreateFileStore().GetName();
  533. break;
  534. case NKikimrSchemeOp::EOperationType::ESchemeOpCreateKesus:
  535. targetName = tx.GetKesus().GetName();
  536. break;
  537. case NKikimrSchemeOp::EOperationType::ESchemeOpCreateSolomonVolume:
  538. targetName = tx.GetCreateSolomonVolume().GetName();
  539. break;
  540. case NKikimrSchemeOp::EOperationType::ESchemeOpCreateIndexedTable:
  541. targetName = tx.GetCreateIndexedTable().GetTableDescription().GetName();
  542. break;
  543. case NKikimrSchemeOp::EOperationType::ESchemeOpCreateColumnStore:
  544. targetName = tx.GetCreateColumnStore().GetName();
  545. break;
  546. case NKikimrSchemeOp::EOperationType::ESchemeOpCreateColumnTable:
  547. targetName = tx.GetCreateColumnTable().GetName();
  548. break;
  549. case NKikimrSchemeOp::EOperationType::ESchemeOpCreateExternalTable:
  550. targetName = tx.GetCreateExternalTable().GetName();
  551. break;
  552. default:
  553. result.Transactions.push_back(tx);
  554. return result;
  555. }
  556. if (!targetName || targetName.StartsWith('/') || targetName.EndsWith('/')) {
  557. result.Transactions.push_back(tx);
  558. return result;
  559. }
  560. TPath path = TPath::Resolve(JoinPath(tx.GetWorkingDir(), targetName), context.SS);
  561. {
  562. TPath::TChecker checks = path.Check();
  563. checks.IsAtLocalSchemeShard();
  564. bool exists = false;
  565. if (path.IsResolved()) {
  566. checks.IsResolved();
  567. exists = !path.IsDeleted();
  568. } else {
  569. checks
  570. .NotEmpty()
  571. .NotResolved();
  572. }
  573. if (checks && !exists) {
  574. checks
  575. .IsValidLeafName()
  576. .DepthLimit()
  577. .PathsLimit();
  578. }
  579. if (checks && !exists && path.Parent().IsResolved()) {
  580. checks.DirChildrenLimit();
  581. }
  582. if (!checks) {
  583. result.Status = checks.GetStatus();
  584. result.Reason = checks.GetError();
  585. result.Transactions.push_back(tx);
  586. return result;
  587. }
  588. const TString name = path.LeafName();
  589. path.Rise();
  590. TTxTransaction create(tx);
  591. create.SetWorkingDir(path.PathString());
  592. create.SetFailOnExist(tx.GetFailOnExist());
  593. switch (tx.GetOperationType()) {
  594. case NKikimrSchemeOp::EOperationType::ESchemeOpMkDir:
  595. create.MutableMkDir()->SetName(name);
  596. break;
  597. case NKikimrSchemeOp::EOperationType::ESchemeOpCreateTable:
  598. create.MutableCreateTable()->SetName(name);
  599. break;
  600. case NKikimrSchemeOp::EOperationType::ESchemeOpCreatePersQueueGroup:
  601. create.MutableCreatePersQueueGroup()->SetName(name);
  602. break;
  603. case NKikimrSchemeOp::EOperationType::ESchemeOpAllocatePersQueueGroup:
  604. create.MutableAllocatePersQueueGroup()->SetName(name);
  605. break;
  606. case NKikimrSchemeOp::EOperationType::ESchemeOpDeallocatePersQueueGroup:
  607. create.MutableDeallocatePersQueueGroup()->SetName(name);
  608. break;
  609. case NKikimrSchemeOp::EOperationType::ESchemeOpCreateSubDomain:
  610. case NKikimrSchemeOp::EOperationType::ESchemeOpCreateExtSubDomain:
  611. create.MutableSubDomain()->SetName(name);
  612. break;
  613. case NKikimrSchemeOp::EOperationType::ESchemeOpCreateRtmrVolume:
  614. create.MutableCreateRtmrVolume()->SetName(name);
  615. break;
  616. case NKikimrSchemeOp::EOperationType::ESchemeOpCreateBlockStoreVolume:
  617. create.MutableCreateBlockStoreVolume()->SetName(name);
  618. break;
  619. case NKikimrSchemeOp::EOperationType::ESchemeOpCreateFileStore:
  620. create.MutableCreateFileStore()->SetName(name);
  621. break;
  622. case NKikimrSchemeOp::EOperationType::ESchemeOpCreateKesus:
  623. create.MutableKesus()->SetName(name);
  624. break;
  625. case NKikimrSchemeOp::EOperationType::ESchemeOpCreateSolomonVolume:
  626. create.MutableCreateSolomonVolume()->SetName(name);
  627. break;
  628. case NKikimrSchemeOp::EOperationType::ESchemeOpCreateIndexedTable:
  629. create.MutableCreateIndexedTable()->MutableTableDescription()->SetName(name);
  630. break;
  631. case NKikimrSchemeOp::EOperationType::ESchemeOpCreateColumnStore:
  632. create.MutableCreateColumnStore()->SetName(name);
  633. break;
  634. case NKikimrSchemeOp::EOperationType::ESchemeOpCreateColumnTable:
  635. create.MutableCreateColumnTable()->SetName(name);
  636. break;
  637. case NKikimrSchemeOp::EOperationType::ESchemeOpCreateExternalTable:
  638. create.MutableCreateExternalTable()->SetName(name);
  639. break;
  640. default:
  641. Y_UNREACHABLE();
  642. }
  643. result.Transactions.push_back(create);
  644. if (exists) {
  645. return result;
  646. }
  647. }
  648. while (path != parentPath) {
  649. TPath::TChecker checks = path.Check();
  650. checks
  651. .NotUnderDomainUpgrade()
  652. .IsAtLocalSchemeShard();
  653. if (path.IsResolved()) {
  654. checks.IsResolved();
  655. if (path.IsDeleted()) {
  656. checks.IsDeleted();
  657. } else {
  658. checks
  659. .NotDeleted()
  660. .NotUnderDeleting()
  661. .IsCommonSensePath()
  662. .IsLikeDirectory();
  663. if (checks) {
  664. break;
  665. }
  666. }
  667. } else {
  668. checks
  669. .NotEmpty()
  670. .NotResolved();
  671. }
  672. if (checks) {
  673. checks
  674. .IsValidLeafName()
  675. .DepthLimit()
  676. .PathsLimit(result.Transactions.size() + 1);
  677. }
  678. if (checks && path.Parent().IsResolved()) {
  679. checks.DirChildrenLimit();
  680. }
  681. if (!checks) {
  682. result.Status = checks.GetStatus();
  683. result.Reason = checks.GetError();
  684. result.Transactions.clear();
  685. result.Transactions.push_back(tx);
  686. return result;
  687. }
  688. const TString name = path.LeafName();
  689. path.Rise();
  690. TTxTransaction mkdir;
  691. mkdir.SetFailOnExist(true);
  692. mkdir.SetOperationType(NKikimrSchemeOp::EOperationType::ESchemeOpMkDir);
  693. mkdir.SetWorkingDir(path.PathString());
  694. mkdir.MutableMkDir()->SetName(name);
  695. result.Transactions.push_back(mkdir);
  696. }
  697. Reverse(result.Transactions.begin(), result.Transactions.end());
  698. return result;
  699. }
  700. ISubOperation::TPtr TOperation::RestorePart(TTxState::ETxType txType, TTxState::ETxState txState) const {
  701. switch (txType) {
  702. case TTxState::ETxType::TxMkDir:
  703. return CreateMkDir(NextPartId(), txState);
  704. case TTxState::ETxType::TxRmDir:
  705. return CreateRmDir(NextPartId(), txState);
  706. case TTxState::ETxType::TxModifyACL:
  707. return CreateModifyACL(NextPartId(), txState);
  708. case TTxState::ETxType::TxAlterUserAttributes:
  709. return CreateAlterUserAttrs(NextPartId(), txState);
  710. case TTxState::ETxType::TxCreateTable:
  711. return CreateNewTable(NextPartId(), txState);
  712. case TTxState::ETxType::TxCopyTable:
  713. return CreateCopyTable(NextPartId(), txState);
  714. case TTxState::ETxType::TxAlterTable:
  715. return CreateAlterTable(NextPartId(), txState);
  716. case TTxState::ETxType::TxSplitTablePartition:
  717. case TTxState::ETxType::TxMergeTablePartition:
  718. return CreateSplitMerge(NextPartId(), txState);
  719. case TTxState::ETxType::TxBackup:
  720. return CreateBackup(NextPartId(), txState);
  721. case TTxState::ETxType::TxRestore:
  722. return CreateRestore(NextPartId(), txState);
  723. case TTxState::ETxType::TxDropTable:
  724. return CreateDropTable(NextPartId(), txState);
  725. case TTxState::ETxType::TxCreateTableIndex:
  726. return CreateNewTableIndex(NextPartId(), txState);
  727. case TTxState::ETxType::TxDropTableIndex:
  728. return CreateDropTableIndex(NextPartId(), txState);
  729. case TTxState::ETxType::TxCreateRtmrVolume:
  730. return CreateNewRTMR(NextPartId(), txState);
  731. case TTxState::ETxType::TxCreateOlapStore:
  732. return CreateNewOlapStore(NextPartId(), txState);
  733. case TTxState::ETxType::TxAlterOlapStore:
  734. return CreateAlterOlapStore(NextPartId(), txState);
  735. case TTxState::ETxType::TxDropOlapStore:
  736. return CreateDropOlapStore(NextPartId(), txState);
  737. case TTxState::ETxType::TxCreateColumnTable:
  738. return CreateNewColumnTable(NextPartId(), txState);
  739. case TTxState::ETxType::TxAlterColumnTable:
  740. return CreateAlterColumnTable(NextPartId(), txState);
  741. case TTxState::ETxType::TxDropColumnTable:
  742. return CreateDropColumnTable(NextPartId(), txState);
  743. case TTxState::ETxType::TxCreatePQGroup:
  744. return CreateNewPQ(NextPartId(), txState);
  745. case TTxState::ETxType::TxAlterPQGroup:
  746. return CreateAlterPQ(NextPartId(), txState);
  747. case TTxState::ETxType::TxDropPQGroup:
  748. return CreateDropPQ(NextPartId(), txState);
  749. case TTxState::ETxType::TxAllocatePQ:
  750. return CreateAllocatePQ(NextPartId(), txState);
  751. case TTxState::ETxType::TxCreateSolomonVolume:
  752. return CreateNewSolomon(NextPartId(), txState);
  753. case TTxState::ETxType::TxDropSolomonVolume:
  754. return CreateDropSolomon(NextPartId(), txState);
  755. case TTxState::ETxType::TxCreateSubDomain:
  756. return CreateSubDomain(NextPartId(), txState);
  757. case TTxState::ETxType::TxAlterSubDomain:
  758. return CreateAlterSubDomain(NextPartId(), txState);
  759. case TTxState::ETxType::TxUpgradeSubDomain:
  760. return CreateUpgradeSubDomain(NextPartId(), txState);
  761. case TTxState::ETxType::TxUpgradeSubDomainDecision:
  762. return CreateUpgradeSubDomainDecision(NextPartId(), txState);
  763. case TTxState::ETxType::TxDropSubDomain:
  764. return CreateDropSubdomain(NextPartId(), txState);
  765. case TTxState::ETxType::TxForceDropSubDomain:
  766. return CreateForceDropSubDomain(NextPartId(), txState);
  767. case TTxState::ETxType::TxCreateKesus:
  768. return CreateNewKesus(NextPartId(), txState);
  769. case TTxState::ETxType::TxAlterKesus:
  770. return CreateAlterKesus(NextPartId(), txState);
  771. case TTxState::ETxType::TxDropKesus:
  772. return CreateDropKesus(NextPartId(), txState);
  773. case TTxState::ETxType::TxInitializeBuildIndex:
  774. return CreateInitializeBuildIndexMainTable(NextPartId(), txState);
  775. case TTxState::ETxType::TxFinalizeBuildIndex:
  776. return CreateFinalizeBuildIndexMainTable(NextPartId(), txState);
  777. case TTxState::ETxType::TxDropTableIndexAtMainTable:
  778. return CreateDropTableIndexAtMainTable(NextPartId(), txState);
  779. case TTxState::ETxType::TxUpdateMainTableOnIndexMove:
  780. return CreateUpdateMainTableOnIndexMove(NextPartId(), txState);
  781. case TTxState::ETxType::TxCreateLock:
  782. return CreateLock(NextPartId(), txState);
  783. case TTxState::ETxType::TxDropLock:
  784. return DropLock(NextPartId(), txState);
  785. case TTxState::ETxType::TxAlterTableIndex:
  786. return CreateAlterTableIndex(NextPartId(), txState);
  787. case TTxState::ETxType::TxAlterSolomonVolume:
  788. return CreateAlterSolomon(NextPartId(), txState);
  789. // ExtSubDomain
  790. case TTxState::ETxType::TxCreateExtSubDomain:
  791. return CreateExtSubDomain(NextPartId(), txState);
  792. case TTxState::ETxType::TxAlterExtSubDomain:
  793. return CreateAlterExtSubDomain(NextPartId(), txState);
  794. case TTxState::ETxType::TxAlterExtSubDomainCreateHive:
  795. return CreateAlterExtSubDomainCreateHive(NextPartId(), txState);
  796. case TTxState::ETxType::TxForceDropExtSubDomain:
  797. return CreateForceDropExtSubDomain(NextPartId(), txState);
  798. // BlockStore
  799. case TTxState::ETxType::TxCreateBlockStoreVolume:
  800. return CreateNewBSV(NextPartId(), txState);
  801. case TTxState::ETxType::TxAssignBlockStoreVolume:
  802. return CreateAssignBSV(NextPartId(), txState);
  803. case TTxState::ETxType::TxAlterBlockStoreVolume:
  804. return CreateAlterBSV(NextPartId(), txState);
  805. case TTxState::ETxType::TxDropBlockStoreVolume:
  806. return CreateDropBSV(NextPartId(), txState);
  807. // FileStore
  808. case TTxState::ETxType::TxCreateFileStore:
  809. return CreateNewFileStore(NextPartId(), txState);
  810. case TTxState::ETxType::TxAlterFileStore:
  811. return CreateAlterFileStore(NextPartId(), txState);
  812. case TTxState::ETxType::TxDropFileStore:
  813. return CreateDropFileStore(NextPartId(), txState);
  814. // CDC
  815. case TTxState::ETxType::TxCreateCdcStream:
  816. return CreateNewCdcStreamImpl(NextPartId(), txState);
  817. case TTxState::ETxType::TxCreateCdcStreamAtTable:
  818. return CreateNewCdcStreamAtTable(NextPartId(), txState, false);
  819. case TTxState::ETxType::TxCreateCdcStreamAtTableWithInitialScan:
  820. return CreateNewCdcStreamAtTable(NextPartId(), txState, true);
  821. case TTxState::ETxType::TxAlterCdcStream:
  822. return CreateAlterCdcStreamImpl(NextPartId(), txState);
  823. case TTxState::ETxType::TxAlterCdcStreamAtTable:
  824. return CreateAlterCdcStreamAtTable(NextPartId(), txState, false);
  825. case TTxState::ETxType::TxAlterCdcStreamAtTableDropSnapshot:
  826. return CreateAlterCdcStreamAtTable(NextPartId(), txState, true);
  827. case TTxState::ETxType::TxDropCdcStream:
  828. return CreateDropCdcStreamImpl(NextPartId(), txState);
  829. case TTxState::ETxType::TxDropCdcStreamAtTable:
  830. return CreateDropCdcStreamAtTable(NextPartId(), txState, false);
  831. case TTxState::ETxType::TxDropCdcStreamAtTableDropSnapshot:
  832. return CreateDropCdcStreamAtTable(NextPartId(), txState, true);
  833. // Sequences
  834. case TTxState::ETxType::TxCreateSequence:
  835. return CreateNewSequence(NextPartId(), txState);
  836. case TTxState::ETxType::TxAlterSequence:
  837. Y_FAIL("TODO: implement");
  838. case TTxState::ETxType::TxDropSequence:
  839. return CreateDropSequence(NextPartId(), txState);
  840. case TTxState::ETxType::TxFillIndex:
  841. Y_FAIL("deprecated");
  842. case TTxState::ETxType::TxMoveTable:
  843. return CreateMoveTable(NextPartId(), txState);
  844. case TTxState::ETxType::TxMoveTableIndex:
  845. return CreateMoveTableIndex(NextPartId(), txState);
  846. // Replication
  847. case TTxState::ETxType::TxCreateReplication:
  848. return CreateNewReplication(NextPartId(), txState);
  849. case TTxState::ETxType::TxAlterReplication:
  850. Y_FAIL("TODO: implement");
  851. case TTxState::ETxType::TxDropReplication:
  852. return CreateDropReplication(NextPartId(), txState);
  853. // BlobDepot
  854. case TTxState::ETxType::TxCreateBlobDepot:
  855. return CreateNewBlobDepot(NextPartId(), txState);
  856. case TTxState::ETxType::TxAlterBlobDepot:
  857. return CreateAlterBlobDepot(NextPartId(), txState);
  858. case TTxState::ETxType::TxDropBlobDepot:
  859. return CreateDropBlobDepot(NextPartId(), txState);
  860. case TTxState::ETxType::TxCreateExternalTable:
  861. return CreateNewExternalTable(NextPartId(), txState);
  862. case TTxState::ETxType::TxDropExternalTable:
  863. return CreateDropExternalTable(NextPartId(), txState);
  864. case TTxState::ETxType::TxAlterExternalTable:
  865. Y_FAIL("TODO: implement");
  866. case TTxState::ETxType::TxInvalid:
  867. Y_UNREACHABLE();
  868. }
  869. Y_UNREACHABLE();
  870. }
  871. ISubOperation::TPtr TOperation::ConstructPart(NKikimrSchemeOp::EOperationType opType, const TTxTransaction& tx) const {
  872. switch (opType) {
  873. case NKikimrSchemeOp::EOperationType::ESchemeOpMkDir:
  874. return CreateMkDir(NextPartId(), tx);
  875. case NKikimrSchemeOp::EOperationType::ESchemeOpRmDir:
  876. return CreateRmDir(NextPartId(), tx);
  877. case NKikimrSchemeOp::EOperationType::ESchemeOpModifyACL:
  878. return CreateModifyACL(NextPartId(), tx);
  879. case NKikimrSchemeOp::EOperationType::ESchemeOpAlterUserAttributes:
  880. return CreateAlterUserAttrs(NextPartId(), tx);
  881. case NKikimrSchemeOp::EOperationType::ESchemeOpForceDropUnsafe:
  882. return CreateForceDropUnsafe(NextPartId(), tx);
  883. case NKikimrSchemeOp::EOperationType::ESchemeOpCreateTable:
  884. return CreateNewTable(NextPartId(), tx);
  885. case NKikimrSchemeOp::EOperationType::ESchemeOpAlterTable:
  886. Y_FAIL("in general, alter table is multipart operation now due table indexes");
  887. case NKikimrSchemeOp::EOperationType::ESchemeOpSplitMergeTablePartitions:
  888. return CreateSplitMerge(NextPartId(), tx);
  889. case NKikimrSchemeOp::EOperationType::ESchemeOpBackup:
  890. return CreateBackup(NextPartId(), tx);
  891. case NKikimrSchemeOp::EOperationType::ESchemeOpRestore:
  892. return CreateRestore(NextPartId(), tx);
  893. case NKikimrSchemeOp::EOperationType::ESchemeOpDropTable:
  894. Y_FAIL("in general, drop table is multipart operation now due table indexes");
  895. case NKikimrSchemeOp::EOperationType::ESchemeOpCreateIndexedTable:
  896. Y_FAIL("multipart operations are handled before, also they require transaction details");
  897. case NKikimrSchemeOp::EOperationType::ESchemeOpCreateTableIndex:
  898. Y_FAIL("is handled as part of ESchemeOpCreateIndexedTable");
  899. case NKikimrSchemeOp::EOperationType::ESchemeOpDropTableIndex:
  900. Y_FAIL("is handled as part of ESchemeOpDropTable");
  901. case NKikimrSchemeOp::EOperationType::ESchemeOpCreateConsistentCopyTables:
  902. Y_FAIL("multipart operations are handled before, also they require transaction details");
  903. case NKikimrSchemeOp::EOperationType::ESchemeOpCreateRtmrVolume:
  904. return CreateNewRTMR(NextPartId(), tx);
  905. case NKikimrSchemeOp::EOperationType::ESchemeOpCreateColumnStore:
  906. return CreateNewOlapStore(NextPartId(), tx);
  907. case NKikimrSchemeOp::EOperationType::ESchemeOpAlterColumnStore:
  908. return CreateAlterOlapStore(NextPartId(), tx);
  909. case NKikimrSchemeOp::EOperationType::ESchemeOpDropColumnStore:
  910. return CreateDropOlapStore(NextPartId(), tx);
  911. case NKikimrSchemeOp::EOperationType::ESchemeOpCreateColumnTable:
  912. return CreateNewColumnTable(NextPartId(), tx);
  913. case NKikimrSchemeOp::EOperationType::ESchemeOpAlterColumnTable:
  914. return CreateAlterColumnTable(NextPartId(), tx);
  915. case NKikimrSchemeOp::EOperationType::ESchemeOpDropColumnTable:
  916. return CreateDropColumnTable(NextPartId(), tx);
  917. case NKikimrSchemeOp::EOperationType::ESchemeOpCreatePersQueueGroup:
  918. return CreateNewPQ(NextPartId(), tx);
  919. case NKikimrSchemeOp::EOperationType::ESchemeOpAlterPersQueueGroup:
  920. return CreateAlterPQ(NextPartId(), tx);
  921. case NKikimrSchemeOp::EOperationType::ESchemeOpDropPersQueueGroup:
  922. return CreateDropPQ(NextPartId(), tx);
  923. case NKikimrSchemeOp::EOperationType::ESchemeOpAllocatePersQueueGroup:
  924. return CreateAllocatePQ(NextPartId(), tx);
  925. case NKikimrSchemeOp::EOperationType::ESchemeOpDeallocatePersQueueGroup:
  926. return CreateDeallocatePQ(NextPartId(), tx);
  927. case NKikimrSchemeOp::EOperationType::ESchemeOpCreateSolomonVolume:
  928. return CreateNewSolomon(NextPartId(), tx);
  929. case NKikimrSchemeOp::EOperationType::ESchemeOpAlterSolomonVolume:
  930. return CreateAlterSolomon(NextPartId(), tx);
  931. case NKikimrSchemeOp::EOperationType::ESchemeOpDropSolomonVolume:
  932. return CreateDropSolomon(NextPartId(), tx);
  933. case NKikimrSchemeOp::EOperationType::ESchemeOpCreateSubDomain:
  934. return CreateSubDomain(NextPartId(), tx);
  935. case NKikimrSchemeOp::EOperationType::ESchemeOpAlterSubDomain:
  936. Y_FAIL("run in compatible");
  937. case NKikimrSchemeOp::EOperationType::ESchemeOpDropSubDomain:
  938. return CreateDropSubdomain(NextPartId(), tx);
  939. case NKikimrSchemeOp::EOperationType::ESchemeOpForceDropSubDomain:
  940. Y_FAIL("run in compatible");
  941. // ExtSubDomain
  942. case NKikimrSchemeOp::EOperationType::ESchemeOpCreateExtSubDomain:
  943. return CreateExtSubDomain(NextPartId(), tx);
  944. case NKikimrSchemeOp::EOperationType::ESchemeOpAlterExtSubDomain:
  945. return CreateAlterExtSubDomain(NextPartId(), tx);
  946. case NKikimrSchemeOp::EOperationType::ESchemeOpAlterExtSubDomainCreateHive:
  947. Y_FAIL("multipart operations are handled before, also they require transaction details");
  948. case NKikimrSchemeOp::EOperationType::ESchemeOpForceDropExtSubDomain:
  949. return CreateForceDropExtSubDomain(NextPartId(), tx);
  950. case NKikimrSchemeOp::EOperationType::ESchemeOpCreateKesus:
  951. return CreateNewKesus(NextPartId(), tx);
  952. case NKikimrSchemeOp::EOperationType::ESchemeOpAlterKesus:
  953. return CreateAlterKesus(NextPartId(), tx);
  954. case NKikimrSchemeOp::EOperationType::ESchemeOpDropKesus:
  955. return CreateDropKesus(NextPartId(), tx);
  956. case NKikimrSchemeOp::EOperationType::ESchemeOpUpgradeSubDomain:
  957. return CreateUpgradeSubDomain(NextPartId(), tx);
  958. case NKikimrSchemeOp::EOperationType::ESchemeOpUpgradeSubDomainDecision:
  959. return CreateUpgradeSubDomainDecision(NextPartId(), tx);
  960. case NKikimrSchemeOp::EOperationType::ESchemeOpCreateIndexBuild:
  961. Y_FAIL("multipart operations are handled before, also they require transaction details");
  962. case NKikimrSchemeOp::EOperationType::ESchemeOpCreateLock:
  963. return CreateLock(NextPartId(), tx);
  964. case NKikimrSchemeOp::EOperationType::ESchemeOpDropLock:
  965. return DropLock(NextPartId(), tx);
  966. // BlockStore
  967. case NKikimrSchemeOp::EOperationType::ESchemeOpCreateBlockStoreVolume:
  968. return CreateNewBSV(NextPartId(), tx);
  969. case NKikimrSchemeOp::EOperationType::ESchemeOpAssignBlockStoreVolume:
  970. return CreateAssignBSV(NextPartId(), tx);
  971. case NKikimrSchemeOp::EOperationType::ESchemeOpAlterBlockStoreVolume:
  972. return CreateAlterBSV(NextPartId(), tx);
  973. case NKikimrSchemeOp::EOperationType::ESchemeOpDropBlockStoreVolume:
  974. return CreateDropBSV(NextPartId(), tx);
  975. // FileStore
  976. case NKikimrSchemeOp::EOperationType::ESchemeOpCreateFileStore:
  977. return CreateNewFileStore(NextPartId(), tx);
  978. case NKikimrSchemeOp::EOperationType::ESchemeOpAlterFileStore:
  979. return CreateAlterFileStore(NextPartId(), tx);
  980. case NKikimrSchemeOp::EOperationType::ESchemeOpDropFileStore:
  981. return CreateDropFileStore(NextPartId(), tx);
  982. // Login
  983. case NKikimrSchemeOp::EOperationType::ESchemeOpAlterLogin:
  984. return CreateAlterLogin(NextPartId(), tx);
  985. // Sequence
  986. case NKikimrSchemeOp::EOperationType::ESchemeOpCreateSequence:
  987. return CreateNewSequence(NextPartId(), tx);
  988. case NKikimrSchemeOp::EOperationType::ESchemeOpAlterSequence:
  989. Y_FAIL("TODO: implement");
  990. case NKikimrSchemeOp::EOperationType::ESchemeOpDropSequence:
  991. return CreateDropSequence(NextPartId(), tx);
  992. // Index
  993. case NKikimrSchemeOp::EOperationType::ESchemeOpApplyIndexBuild:
  994. Y_FAIL("multipart operations are handled before, also they require transaction details");
  995. case NKikimrSchemeOp::EOperationType::ESchemeOpAlterTableIndex:
  996. Y_FAIL("multipart operations are handled before, also they require transaction details");
  997. case NKikimrSchemeOp::EOperationType::ESchemeOpInitiateBuildIndexImplTable:
  998. Y_FAIL("multipart operations are handled before, also they require transaction details");
  999. case NKikimrSchemeOp::EOperationType::ESchemeOpFinalizeBuildIndexImplTable:
  1000. Y_FAIL("multipart operations are handled before, also they require transaction details");
  1001. case NKikimrSchemeOp::EOperationType::ESchemeOpInitiateBuildIndexMainTable:
  1002. Y_FAIL("multipart operations are handled before, also they require transaction details");
  1003. case NKikimrSchemeOp::EOperationType::ESchemeOpFinalizeBuildIndexMainTable:
  1004. Y_FAIL("multipart operations are handled before, also they require transaction details");
  1005. case NKikimrSchemeOp::EOperationType::ESchemeOpCancelIndexBuild:
  1006. Y_FAIL("multipart operations are handled before, also they require transaction details");
  1007. case NKikimrSchemeOp::EOperationType::ESchemeOpDropIndex:
  1008. Y_FAIL("multipart operations are handled before, also they require transaction details");
  1009. case NKikimrSchemeOp::EOperationType::ESchemeOpDropTableIndexAtMainTable:
  1010. Y_FAIL("multipart operations are handled before, also they require transaction details");
  1011. // CDC
  1012. case NKikimrSchemeOp::EOperationType::ESchemeOpCreateCdcStream:
  1013. case NKikimrSchemeOp::EOperationType::ESchemeOpCreateCdcStreamImpl:
  1014. case NKikimrSchemeOp::EOperationType::ESchemeOpCreateCdcStreamAtTable:
  1015. Y_FAIL("multipart operations are handled before, also they require transaction details");
  1016. case NKikimrSchemeOp::EOperationType::ESchemeOpAlterCdcStream:
  1017. case NKikimrSchemeOp::EOperationType::ESchemeOpAlterCdcStreamImpl:
  1018. case NKikimrSchemeOp::EOperationType::ESchemeOpAlterCdcStreamAtTable:
  1019. Y_FAIL("multipart operations are handled before, also they require transaction details");
  1020. case NKikimrSchemeOp::EOperationType::ESchemeOpDropCdcStream:
  1021. case NKikimrSchemeOp::EOperationType::ESchemeOpDropCdcStreamImpl:
  1022. case NKikimrSchemeOp::EOperationType::ESchemeOpDropCdcStreamAtTable:
  1023. Y_FAIL("multipart operations are handled before, also they require transaction details");
  1024. case NKikimrSchemeOp::EOperationType::ESchemeOp_DEPRECATED_35:
  1025. Y_FAIL("impossible");
  1026. // Move
  1027. case NKikimrSchemeOp::EOperationType::ESchemeOpMoveTable:
  1028. return CreateMoveTable(NextPartId(), tx);
  1029. case NKikimrSchemeOp::EOperationType::ESchemeOpMoveTableIndex:
  1030. return CreateMoveTableIndex(NextPartId(), tx);
  1031. case NKikimrSchemeOp::EOperationType::ESchemeOpMoveIndex:
  1032. Y_FAIL("impossible");
  1033. // Replication
  1034. case NKikimrSchemeOp::EOperationType::ESchemeOpCreateReplication:
  1035. return CreateNewReplication(NextPartId(), tx);
  1036. case NKikimrSchemeOp::EOperationType::ESchemeOpAlterReplication:
  1037. Y_FAIL("TODO: implement");
  1038. case NKikimrSchemeOp::EOperationType::ESchemeOpDropReplication:
  1039. return CreateDropReplication(NextPartId(), tx);
  1040. // BlobDepot
  1041. case NKikimrSchemeOp::EOperationType::ESchemeOpCreateBlobDepot:
  1042. return CreateNewBlobDepot(NextPartId(), tx);
  1043. case NKikimrSchemeOp::EOperationType::ESchemeOpAlterBlobDepot:
  1044. return CreateAlterBlobDepot(NextPartId(), tx);
  1045. case NKikimrSchemeOp::EOperationType::ESchemeOpDropBlobDepot:
  1046. return CreateDropBlobDepot(NextPartId(), tx);
  1047. // ExternalTable
  1048. case NKikimrSchemeOp::EOperationType::ESchemeOpCreateExternalTable:
  1049. return CreateNewExternalTable(NextPartId(), tx);
  1050. case NKikimrSchemeOp::EOperationType::ESchemeOpDropExternalTable:
  1051. return CreateDropExternalTable(NextPartId(), tx);
  1052. case NKikimrSchemeOp::EOperationType::ESchemeOpAlterExternalTable:
  1053. Y_FAIL("TODO: implement");
  1054. }
  1055. Y_UNREACHABLE();
  1056. }
  1057. TVector<ISubOperation::TPtr> TOperation::ConstructParts(const TTxTransaction& tx, TOperationContext& context) const {
  1058. const auto& opType = tx.GetOperationType();
  1059. switch (opType) {
  1060. case NKikimrSchemeOp::EOperationType::ESchemeOpCreateTable:
  1061. if (tx.GetCreateTable().HasCopyFromTable()) {
  1062. return CreateCopyTable(NextPartId(), tx, context); // Copy indexes table as well as common table
  1063. }
  1064. return {ConstructPart(opType, tx)};
  1065. case NKikimrSchemeOp::EOperationType::ESchemeOpCreateIndexedTable:
  1066. return CreateIndexedTable(NextPartId(), tx, context);
  1067. case NKikimrSchemeOp::EOperationType::ESchemeOpCreateConsistentCopyTables:
  1068. return CreateConsistentCopyTables(NextPartId(), tx, context);
  1069. case NKikimrSchemeOp::EOperationType::ESchemeOpDropTable:
  1070. return CreateDropIndexedTable(NextPartId(), tx, context);
  1071. case NKikimrSchemeOp::EOperationType::ESchemeOpForceDropSubDomain:
  1072. return {CreateCompatibleSubdomainDrop(context.SS, NextPartId(), tx)};
  1073. case NKikimrSchemeOp::EOperationType::ESchemeOpCreateIndexBuild:
  1074. return CreateBuildIndex(NextPartId(), tx, context);
  1075. case NKikimrSchemeOp::EOperationType::ESchemeOpApplyIndexBuild:
  1076. return ApplyBuildIndex(NextPartId(), tx, context);
  1077. case NKikimrSchemeOp::EOperationType::ESchemeOpDropIndex:
  1078. return CreateDropIndex(NextPartId(), tx, context);
  1079. case NKikimrSchemeOp::EOperationType::ESchemeOpCancelIndexBuild:
  1080. return CancelBuildIndex(NextPartId(), tx, context);
  1081. case NKikimrSchemeOp::EOperationType::ESchemeOpAlterSubDomain:
  1082. return {CreateCompatibleSubdomainAlter(context.SS, NextPartId(), tx)};
  1083. case NKikimrSchemeOp::EOperationType::ESchemeOpCreateCdcStream:
  1084. return CreateNewCdcStream(NextPartId(), tx, context);
  1085. case NKikimrSchemeOp::EOperationType::ESchemeOpAlterCdcStream:
  1086. return CreateAlterCdcStream(NextPartId(), tx, context);
  1087. case NKikimrSchemeOp::EOperationType::ESchemeOpDropCdcStream:
  1088. return CreateDropCdcStream(NextPartId(), tx, context);
  1089. case NKikimrSchemeOp::EOperationType::ESchemeOpMoveTable:
  1090. return CreateConsistentMoveTable(NextPartId(), tx, context);
  1091. case NKikimrSchemeOp::EOperationType::ESchemeOpAlterTable:
  1092. return CreateConsistentAlterTable(NextPartId(), tx, context);
  1093. case NKikimrSchemeOp::EOperationType::ESchemeOpMoveIndex:
  1094. return CreateConsistentMoveIndex(NextPartId(), tx, context);
  1095. case NKikimrSchemeOp::EOperationType::ESchemeOpAlterExtSubDomain:
  1096. return CreateCompatibleAlterExtSubDomain(NextPartId(), tx, context);
  1097. default:
  1098. return {ConstructPart(opType, tx)};
  1099. }
  1100. }
  1101. void TOperation::AddPart(ISubOperation::TPtr part) {
  1102. Parts.push_back(part);
  1103. }
  1104. bool TOperation::AddPublishingPath(TPathId pathId, ui64 version) {
  1105. Y_VERIFY(!IsReadyToNotify());
  1106. return Publications.emplace(pathId, version).second;
  1107. }
  1108. bool TOperation::IsPublished() const {
  1109. return Publications.empty();
  1110. }
  1111. void TOperation::ReadyToNotifyPart(TSubTxId partId) {
  1112. ReadyToNotifyParts.insert(partId);
  1113. }
  1114. bool TOperation::IsReadyToNotify(const TActorContext& ctx) const {
  1115. LOG_DEBUG_S(ctx, NKikimrServices::FLAT_TX_SCHEMESHARD,
  1116. "TOperation IsReadyToNotify"
  1117. << ", TxId: " << TxId
  1118. << ", ready parts: " << ReadyToNotifyParts.size() << "/" << Parts.size()
  1119. << ", is published: " << (IsPublished() ? "true" : "false"));
  1120. return IsReadyToNotify();
  1121. }
  1122. bool TOperation::IsReadyToNotify() const {
  1123. return IsPublished() && ReadyToNotifyParts.size() == Parts.size();
  1124. }
  1125. void TOperation::AddNotifySubscriber(const TActorId& actorId) {
  1126. Y_VERIFY(!IsReadyToNotify());
  1127. Subscribers.insert(actorId);
  1128. }
  1129. void TOperation::DoNotify(TSchemeShard*, TSideEffects& sideEffects, const TActorContext& ctx) {
  1130. Y_VERIFY(IsReadyToNotify());
  1131. for (auto& subscriber: Subscribers) {
  1132. THolder<TEvSchemeShard::TEvNotifyTxCompletionResult> msg = MakeHolder<TEvSchemeShard::TEvNotifyTxCompletionResult>(ui64(TxId));
  1133. LOG_DEBUG_S(ctx, NKikimrServices::FLAT_TX_SCHEMESHARD,
  1134. "TOperation DoNotify"
  1135. << " send TEvNotifyTxCompletionResult"
  1136. << " to actorId: " << subscriber
  1137. << " message: " << msg->Record.ShortDebugString());
  1138. sideEffects.Send(subscriber, msg.Release(), ui64(TxId));
  1139. }
  1140. Subscribers.clear();
  1141. }
  1142. bool TOperation::IsReadyToDone(const TActorContext& ctx) const {
  1143. LOG_DEBUG_S(ctx, NKikimrServices::FLAT_TX_SCHEMESHARD,
  1144. "TOperation IsReadyToDone "
  1145. << " TxId: " << TxId
  1146. << " ready parts: " << DoneParts.size() << "/" << Parts.size());
  1147. return DoneParts.size() == Parts.size();
  1148. }
  1149. bool TOperation::IsReadyToPropose(const TActorContext& ctx) const {
  1150. LOG_DEBUG_S(ctx, NKikimrServices::FLAT_TX_SCHEMESHARD,
  1151. "TOperation IsReadyToPropose "
  1152. << ", TxId: " << TxId
  1153. << " ready parts: " << ReadyToProposeParts.size() << "/" << Parts.size());
  1154. return IsReadyToPropose();
  1155. }
  1156. bool TOperation::IsReadyToPropose() const {
  1157. return ReadyToProposeParts.size() == Parts.size();
  1158. }
  1159. void TOperation::ProposePart(TSubTxId partId, TPathId pathId, TStepId minStep) {
  1160. Proposes.push_back(TProposeRec(partId, pathId, minStep));
  1161. ReadyToProposeParts.insert(partId);
  1162. }
  1163. void TOperation::ProposePart(TSubTxId partId, TTabletId tableId) {
  1164. ShardsProposes.push_back(TProposeShards(partId, tableId));
  1165. ReadyToProposeParts.insert(partId);
  1166. }
  1167. void TOperation::DoPropose(TSchemeShard* ss, TSideEffects& sideEffects, const TActorContext& ctx) const {
  1168. Y_VERIFY(IsReadyToPropose());
  1169. //aggregate
  1170. TTabletId selfTabletId = ss->SelfTabletId();
  1171. TTabletId coordinatorId = InvalidTabletId; //common for all parts
  1172. TStepId effectiveMinStep = TStepId(0);
  1173. for (auto [_, pathId, minStep]: Proposes) {
  1174. {
  1175. TTabletId curCoordinatorId = ss->SelectCoordinator(TxId, pathId);
  1176. if (coordinatorId == InvalidTabletId) {
  1177. coordinatorId = curCoordinatorId;
  1178. }
  1179. Y_VERIFY(coordinatorId == curCoordinatorId);
  1180. }
  1181. effectiveMinStep = Max<TStepId>(effectiveMinStep, minStep);
  1182. }
  1183. TSet<TTabletId> shards;
  1184. for (auto [partId, shard]: ShardsProposes) {
  1185. shards.insert(shard);
  1186. sideEffects.RouteByTablet(TOperationId(TxId, partId), shard);
  1187. }
  1188. shards.insert(selfTabletId);
  1189. {
  1190. const ui8 execLevel = 0;
  1191. const TStepId maxStep = TStepId(Max<ui64>());
  1192. THolder<TEvTxProxy::TEvProposeTransaction> message(
  1193. new TEvTxProxy::TEvProposeTransaction(ui64(coordinatorId), ui64(TxId), execLevel, ui64(effectiveMinStep), ui64(maxStep)));
  1194. auto* proposal = message->Record.MutableTransaction();
  1195. auto* reqAffectedSet = proposal->MutableAffectedSet();
  1196. reqAffectedSet->Reserve(shards.size());
  1197. for (auto affectedTablet : shards) {
  1198. auto* x = reqAffectedSet->Add();
  1199. x->SetTabletId(ui64(affectedTablet));
  1200. x->SetFlags(2 /*todo: use generic enum*/);
  1201. }
  1202. // TODO: probably want this for drops only
  1203. proposal->SetIgnoreLowDiskSpace(true);
  1204. LOG_DEBUG_S(ctx, NKikimrServices::FLAT_TX_SCHEMESHARD,
  1205. "TOperation DoPropose"
  1206. << " send propose"
  1207. << " to coordinator: " << coordinatorId
  1208. << " message:" << message->Record.ShortDebugString());
  1209. sideEffects.BindMsgToPipe(TOperationId(TxId, InvalidSubTxId), coordinatorId, TPipeMessageId(0, TxId), message.Release());
  1210. }
  1211. }
  1212. void TOperation::RegisterRelationByTabletId(TSubTxId partId, TTabletId tablet, const TActorContext& ctx) {
  1213. if (RelationsByTabletId.contains(tablet)) {
  1214. if (RelationsByTabletId.at(tablet) != partId) {
  1215. // it is Ok if Hive otherwise it is error
  1216. LOG_DEBUG_S(ctx, NKikimrServices::FLAT_TX_SCHEMESHARD,
  1217. "TOperation RegisterRelationByTabletId"
  1218. << " collision in routes has found"
  1219. << ", TxId: " << TxId
  1220. << ", partId: " << partId
  1221. << ", prev tablet: " << RelationsByTabletId.at(tablet)
  1222. << ", new tablet: " << tablet);
  1223. RelationsByTabletId.erase(tablet);
  1224. }
  1225. return;
  1226. }
  1227. LOG_DEBUG_S(ctx, NKikimrServices::FLAT_TX_SCHEMESHARD,
  1228. "TOperation RegisterRelationByTabletId"
  1229. << ", TxId: " << TxId
  1230. << ", partId: " << partId
  1231. << ", tablet: " << tablet);
  1232. RelationsByTabletId[tablet] = partId;
  1233. }
  1234. TSubTxId TOperation::FindRelatedPartByTabletId(TTabletId tablet, const TActorContext& ctx) const {
  1235. auto partIdPtr = RelationsByTabletId.FindPtr(tablet);
  1236. auto partId = partIdPtr == nullptr ? InvalidSubTxId : *partIdPtr;
  1237. LOG_DEBUG_S(ctx, NKikimrServices::FLAT_TX_SCHEMESHARD,
  1238. "TOperation FindRelatedPartByTabletId"
  1239. << ", TxId: " << TxId
  1240. << ", tablet: " << tablet
  1241. << ", partId: " << partId);
  1242. return partId;
  1243. }
  1244. void TOperation::RegisterRelationByShardIdx(TSubTxId partId, TShardIdx shardIdx, const TActorContext& ctx) {
  1245. if (RelationsByShardIdx.contains(shardIdx)) {
  1246. Y_VERIFY(RelationsByShardIdx.at(shardIdx) == partId);
  1247. return;
  1248. }
  1249. LOG_DEBUG_S(ctx, NKikimrServices::FLAT_TX_SCHEMESHARD,
  1250. "TOperation RegisterRelationByShardIdx"
  1251. << ", TxId: " << TxId
  1252. << ", shardIdx: " << shardIdx
  1253. << ", partId: " << partId);
  1254. RelationsByShardIdx[shardIdx] = partId;
  1255. }
  1256. TSubTxId TOperation::FindRelatedPartByShardIdx(TShardIdx shardIdx, const TActorContext& ctx) const {
  1257. auto partIdPtr = RelationsByShardIdx.FindPtr(shardIdx);
  1258. auto partId = partIdPtr == nullptr ? InvalidSubTxId : *partIdPtr;
  1259. LOG_DEBUG_S(ctx, NKikimrServices::FLAT_TX_SCHEMESHARD,
  1260. "TOperation FindRelatedPartByShardIdx"
  1261. << ", TxId: " << TxId
  1262. << ", shardIdx: " << shardIdx
  1263. << ", partId: " << partId);
  1264. return partId;
  1265. }
  1266. void TOperation::WaitShardCreated(TShardIdx shardIdx, TSubTxId partId) {
  1267. WaitingShardCreatedByShard[shardIdx].insert(partId);
  1268. WaitingShardCreatedByPart[partId].insert(shardIdx);
  1269. }
  1270. TVector<TSubTxId> TOperation::ActivateShardCreated(TShardIdx shardIdx) {
  1271. TVector<TSubTxId> parts;
  1272. auto it = WaitingShardCreatedByShard.find(shardIdx);
  1273. if (it != WaitingShardCreatedByShard.end()) {
  1274. for (auto partId : it->second) {
  1275. auto itByPart = WaitingShardCreatedByPart.find(partId);
  1276. Y_VERIFY(itByPart != WaitingShardCreatedByPart.end());
  1277. itByPart->second.erase(shardIdx);
  1278. if (itByPart->second.empty()) {
  1279. WaitingShardCreatedByPart.erase(itByPart);
  1280. parts.push_back(partId);
  1281. }
  1282. }
  1283. WaitingShardCreatedByShard.erase(it);
  1284. }
  1285. return parts;
  1286. }
  1287. void TOperation::RegisterWaitPublication(TSubTxId partId, TPathId pathId, ui64 pathVersion) {
  1288. auto publication = TPublishPath(pathId, pathVersion);
  1289. WaitingPublicationsByPart[partId].insert(publication);
  1290. WaitingPublicationsByPath[publication].insert(partId);
  1291. }
  1292. TSet<TOperationId> TOperation::ActivatePartsWaitPublication(TPathId pathId, ui64 pathVersion) {
  1293. TSet<TOperationId> activateParts;
  1294. auto it = WaitingPublicationsByPath.lower_bound({pathId, 0}); // iterate all path version [0; pathVersion]
  1295. while (it != WaitingPublicationsByPath.end()
  1296. && it->first.first == pathId && it->first.second <= pathVersion)
  1297. {
  1298. auto waitVersion = it->first.second;
  1299. for (const auto& partId: it->second) {
  1300. LOG_INFO_S(TlsActivationContext->AsActorContext(), NKikimrServices::FLAT_TX_SCHEMESHARD,
  1301. "ActivateWaitPublication, publication confirmed"
  1302. << ", opId: " << TOperationId(TxId, partId)
  1303. << ", pathId: " << pathId
  1304. << ", version: " << waitVersion);
  1305. WaitingPublicationsByPart[partId].erase(TPublishPath(pathId, waitVersion));
  1306. if (WaitingPublicationsByPart.at(partId).empty()) {
  1307. WaitingPublicationsByPart.erase(partId);
  1308. }
  1309. activateParts.insert(TOperationId(TxId, partId)); // activate on every path
  1310. }
  1311. it = WaitingPublicationsByPath.erase(it); // move iterator it forward to the next element
  1312. }
  1313. return activateParts;
  1314. }
  1315. ui64 TOperation::CountWaitPublication(TOperationId opId) const {
  1316. auto it = WaitingPublicationsByPart.find(opId.GetSubTxId());
  1317. if (it == WaitingPublicationsByPart.end()) {
  1318. return 0;
  1319. }
  1320. return it->second.size();
  1321. }
  1322. }