schemeshard__operation.h 5.5 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156
  1. #pragma once
  2. #include "schemeshard__operation_part.h"
  3. #include "schemeshard_tx_infly.h"
  4. #include <util/generic/set.h>
  5. namespace NKikimr::NSchemeShard {
  6. struct TOperation: TSimpleRefCount<TOperation> {
  7. using TPtr = TIntrusivePtr<TOperation>;
  8. const TTxId TxId;
  9. TVector<ISubOperation::TPtr> Parts;
  10. THashSet<TActorId> Subscribers;
  11. THashSet<TTxId> DependentOperations;
  12. THashSet<TTxId> WaitOperations;
  13. struct TPreSerializedMessage {
  14. ui32 Type;
  15. TIntrusivePtr<TEventSerializedData> Data;
  16. TOperationId OpId;
  17. TPreSerializedMessage() = default;
  18. TPreSerializedMessage(ui32 type, TIntrusivePtr<TEventSerializedData> data, TOperationId opId)
  19. : Type(type)
  20. , Data(std::move(data))
  21. , OpId(opId)
  22. { }
  23. };
  24. THashMap<TTabletId, TMap<TPipeMessageId, TPreSerializedMessage>> PipeBindedMessages; // std::pair<ui64, ui64> it's a cookie
  25. THashMap<TTabletId, TSubTxId> RelationsByTabletId;
  26. THashMap<TShardIdx, TSubTxId> RelationsByShardIdx;
  27. using TProposeRec = std::tuple<TSubTxId, TPathId, TStepId>;
  28. TDeque<TProposeRec> Proposes;
  29. using TProposeShards = std::tuple<TSubTxId, TTabletId>;
  30. TDeque<TProposeShards> ShardsProposes;
  31. using TPublishPath = std::pair<TPathId, ui64>;
  32. TSet<TPublishPath> Publications;
  33. TSet<TSubTxId> ReadyToProposeParts;
  34. THashSet<TSubTxId> ReadyToNotifyParts;
  35. THashSet<TSubTxId> DoneParts;
  36. THashMap<TPathId, NKikimrSchemeOp::EPathState> ReleasePathAtDone;
  37. THashMap<TShardIdx, THashSet<TSubTxId>> WaitingShardCreatedByShard;
  38. THashMap<TSubTxId, THashSet<TShardIdx>> WaitingShardCreatedByPart;
  39. TMap<TSubTxId, TSet<TPublishPath>> WaitingPublicationsByPart;
  40. TMap<TPublishPath, TSet<TSubTxId>> WaitingPublicationsByPath;
  41. TMap<TString, TSet<TSubTxId>> Barriers;
  42. struct TConsumeQuotaResult {
  43. NKikimrScheme::EStatus Status = NKikimrScheme::StatusSuccess;
  44. TString Reason;
  45. };
  46. struct TSplitTransactionsResult {
  47. NKikimrScheme::EStatus Status = NKikimrScheme::StatusSuccess;
  48. TString Reason;
  49. TVector<TTxTransaction> Transactions;
  50. };
  51. TOperation(TTxId txId)
  52. : TxId(txId)
  53. {}
  54. ~TOperation() = default;
  55. TTxId GetTxId() const { return TxId; }
  56. static TConsumeQuotaResult ConsumeQuota(const TTxTransaction& tx, TOperationContext& context);
  57. static TSplitTransactionsResult SplitIntoTransactions(const TTxTransaction& tx, const TOperationContext& context);
  58. ISubOperation::TPtr RestorePart(TTxState::ETxType opType, TTxState::ETxState opState) const;
  59. ISubOperation::TPtr ConstructPart(NKikimrSchemeOp::EOperationType opType, const TTxTransaction& tx) const;
  60. TVector<ISubOperation::TPtr> ConstructParts(const TTxTransaction& tx, TOperationContext& context) const;
  61. void AddPart(ISubOperation::TPtr part);
  62. bool AddPublishingPath(TPathId pathId, ui64 version);
  63. bool IsPublished() const;
  64. void ReadyToNotifyPart(TSubTxId partId);
  65. bool IsReadyToNotify(const TActorContext& ctx) const;
  66. bool IsReadyToNotify() const;
  67. void AddNotifySubscriber(const TActorId& actorId);
  68. void DoNotify(TSchemeShard* ss, TSideEffects& sideEffects, const TActorContext& ctx);
  69. bool IsReadyToDone(const TActorContext& ctx) const;
  70. // propose operation to coordinator
  71. bool IsReadyToPropose(const TActorContext& ctx) const;
  72. bool IsReadyToPropose() const;
  73. void ProposePart(TSubTxId partId, TPathId pathId, TStepId minStep);
  74. void ProposePart(TSubTxId partId, TTabletId tableId);
  75. void DoPropose(TSchemeShard* ss, TSideEffects& sideEffects, const TActorContext& ctx) const;
  76. // route incoming messages to suboperations (parts)
  77. void RegisterRelationByTabletId(TSubTxId partId, TTabletId tablet, const TActorContext& ctx);
  78. void RegisterRelationByShardIdx(TSubTxId partId, TShardIdx shardIdx, const TActorContext& ctx);
  79. TSubTxId FindRelatedPartByTabletId(TTabletId tablet, const TActorContext& ctx) const;
  80. TSubTxId FindRelatedPartByShardIdx(TShardIdx shardIdx, const TActorContext& ctx) const;
  81. void WaitShardCreated(TShardIdx shardIdx, TSubTxId partId);
  82. TVector<TSubTxId> ActivateShardCreated(TShardIdx shardIdx);
  83. void RegisterWaitPublication(TSubTxId partId, TPathId pathId, ui64 pathVersion);
  84. TSet<TOperationId> ActivatePartsWaitPublication(TPathId pathId, ui64 pathVersion);
  85. ui64 CountWaitPublication(TOperationId opId) const;
  86. void RegisterBarrier(TSubTxId partId, const TString& name) {
  87. Barriers[name].insert(partId);
  88. Y_VERIFY(Barriers.size() == 1);
  89. }
  90. bool HasBarrier() const {
  91. Y_VERIFY(Barriers.size() <= 1);
  92. return Barriers.size() == 1;
  93. }
  94. bool IsDoneBarrier() const {
  95. Y_VERIFY(Barriers.size() <= 1);
  96. for (const auto& [_, subTxIds] : Barriers) {
  97. for (const auto blocked : subTxIds) {
  98. Y_VERIFY_S(!DoneParts.contains(blocked), "part is blocked and done: " << blocked);
  99. }
  100. return subTxIds.size() + DoneParts.size() == Parts.size();
  101. }
  102. return false;
  103. }
  104. void DropBarrier(const TString& name) {
  105. Y_VERIFY(IsDoneBarrier());
  106. Y_VERIFY(Barriers.begin()->first == name);
  107. Barriers.erase(name);
  108. }
  109. TOperationId NextPartId() const {
  110. return TOperationId(TxId, TSubTxId(Parts.size()));
  111. }
  112. };
  113. inline TOperationId NextPartId(const TOperationId& opId, const TVector<ISubOperation::TPtr>& parts) {
  114. return TOperationId(opId.GetTxId(), opId.GetSubTxId() + parts.size());
  115. }
  116. }