partition.h 34 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737738739740741742743744745746747748749750751752
  1. #pragma once
  2. #include "blob.h"
  3. #include "header.h"
  4. #include "key.h"
  5. #include "partition_init.h"
  6. #include "partition_sourcemanager.h"
  7. #include "partition_types.h"
  8. #include "quota_tracker.h"
  9. #include "sourceid.h"
  10. #include "subscriber.h"
  11. #include "user_info.h"
  12. #include "utils.h"
  13. #include "read_quoter.h"
  14. #include <ydb/core/keyvalue/keyvalue_events.h>
  15. #include <ydb/library/persqueue/counter_time_keeper/counter_time_keeper.h>
  16. #include <library/cpp/actors/core/actor.h>
  17. #include <library/cpp/actors/core/hfunc.h>
  18. #include <library/cpp/actors/core/log.h>
  19. #include <library/cpp/sliding_window/sliding_window.h>
  20. #include <util/generic/set.h>
  21. namespace NKikimr::NPQ {
  22. static const ui32 MAX_BLOB_PART_SIZE = 500_KB;
  23. using TPartitionLabeledCounters = TProtobufTabletLabeledCounters<EPartitionLabeledCounters_descriptor>;
  24. ui64 GetOffsetEstimate(const std::deque<TDataKey>& container, TInstant timestamp, ui64 headOffset);
  25. class TKeyLevel;
  26. struct TMirrorerInfo;
  27. struct TTransaction {
  28. explicit TTransaction(TSimpleSharedPtr<TEvPQ::TEvTxCalcPredicate> tx,
  29. TMaybe<bool> predicate = Nothing()) :
  30. Tx(tx),
  31. Predicate(predicate)
  32. {
  33. Y_ABORT_UNLESS(Tx);
  34. }
  35. TTransaction(TSimpleSharedPtr<TEvPQ::TEvChangePartitionConfig> changeConfig,
  36. bool sendReply) :
  37. ChangeConfig(changeConfig),
  38. SendReply(sendReply)
  39. {
  40. Y_ABORT_UNLESS(ChangeConfig);
  41. }
  42. explicit TTransaction(TSimpleSharedPtr<TEvPQ::TEvProposePartitionConfig> proposeConfig) :
  43. ProposeConfig(proposeConfig)
  44. {
  45. Y_ABORT_UNLESS(ProposeConfig);
  46. }
  47. TSimpleSharedPtr<TEvPQ::TEvTxCalcPredicate> Tx;
  48. TMaybe<bool> Predicate;
  49. TSimpleSharedPtr<TEvPQ::TEvChangePartitionConfig> ChangeConfig;
  50. bool SendReply;
  51. TSimpleSharedPtr<TEvPQ::TEvProposePartitionConfig> ProposeConfig;
  52. };
  53. class TPartition : public TActorBootstrapped<TPartition> {
  54. friend TInitializer;
  55. friend TInitializerStep;
  56. friend TInitConfigStep;
  57. friend TInitInternalFieldsStep;
  58. friend TInitDiskStatusStep;
  59. friend TInitMetaStep;
  60. friend TInitInfoRangeStep;
  61. friend TInitDataRangeStep;
  62. friend TInitDataStep;
  63. friend TPartitionSourceManager;
  64. public:
  65. const TString& TopicName() const;
  66. private:
  67. static const ui32 MAX_ERRORS_COUNT_TO_STORE = 10;
  68. private:
  69. struct THasDataReq;
  70. struct THasDataDeadline;
  71. bool CanWrite() const;
  72. bool CanEnqueue() const;
  73. void ReplyError(const TActorContext& ctx, const ui64 dst, NPersQueue::NErrorCode::EErrorCode errorCode, const TString& error);
  74. void ReplyPropose(const TActorContext& ctx, const NKikimrPQ::TEvProposeTransaction& event, NKikimrPQ::TEvProposeTransactionResult::EStatus statusCode);
  75. void ReplyErrorForStoredWrites(const TActorContext& ctx);
  76. void ReplyGetClientOffsetOk(const TActorContext& ctx, const ui64 dst, const i64 offset, const TInstant writeTimestamp, const TInstant createTimestamp);
  77. void ReplyOk(const TActorContext& ctx, const ui64 dst);
  78. void ReplyOwnerOk(const TActorContext& ctx, const ui64 dst, const TString& ownerCookie);
  79. void ReplyWrite(const TActorContext& ctx, ui64 dst, const TString& sourceId, ui64 seqNo, ui16 partNo, ui16 totalParts, ui64 offset, TInstant writeTimestamp, bool already, ui64 maxSeqNo, TDuration partitionQuotedTime, TDuration topicQuotedTime, TDuration queueTime, TDuration writeTime);
  80. void AddNewWriteBlob(std::pair<TKey, ui32>& res, TEvKeyValue::TEvRequest* request, bool headCleared, const TActorContext& ctx);
  81. void AnswerCurrentWrites(const TActorContext& ctx);
  82. void CancelAllWritesOnIdle(const TActorContext& ctx);
  83. void CancelAllWritesOnWrite(const TActorContext& ctx, TEvKeyValue::TEvRequest* request, const TString& errorStr, const TWriteMsg& p, TPartitionSourceManager::TModificationBatch& sourceIdBatch, NPersQueue::NErrorCode::EErrorCode errorCode = NPersQueue::NErrorCode::BAD_REQUEST);
  84. void ClearOldHead(const ui64 offset, const ui16 partNo, TEvKeyValue::TEvRequest* request);
  85. void CreateMirrorerActor();
  86. void DoRead(TEvPQ::TEvRead::TPtr ev, TDuration waitQuotaTime, const TActorContext& ctx);
  87. void FailBadClient(const TActorContext& ctx);
  88. void FillReadFromTimestamps(const NKikimrPQ::TPQTabletConfig& config, const TActorContext& ctx);
  89. void FilterDeadlinedWrites(const TActorContext& ctx);
  90. void Handle(NReadQuoterEvents::TEvAccountQuotaCountersUpdated::TPtr& ev, const TActorContext& ctx);
  91. void Handle(NReadQuoterEvents::TEvQuotaCountersUpdated::TPtr& ev, const TActorContext& ctx);
  92. void Handle(NReadQuoterEvents::TEvQuotaUpdated::TPtr& ev, const TActorContext& ctx);
  93. void Handle(TEvPQ::TEvApproveQuota::TPtr& ev, const TActorContext& ctx);
  94. void Handle(TEvKeyValue::TEvResponse::TPtr& ev, const TActorContext& ctx);
  95. void Handle(TEvPQ::TEvBlobResponse::TPtr& ev, const TActorContext& ctx);
  96. void Handle(TEvPQ::TEvChangeOwner::TPtr& ev, const TActorContext& ctx);
  97. void Handle(TEvPQ::TEvChangePartitionConfig::TPtr& ev, const TActorContext& ctx);
  98. void Handle(TEvPQ::TEvError::TPtr& ev, const TActorContext& ctx);
  99. void Handle(TEvPQ::TEvGetClientOffset::TPtr& ev, const TActorContext& ctx);
  100. void Handle(TEvPQ::TEvGetMaxSeqNoRequest::TPtr& ev, const TActorContext& ctx);
  101. void Handle(TEvPQ::TEvGetPartitionClientInfo::TPtr& ev, const TActorContext& ctx);
  102. void Handle(TEvPQ::TEvHandleWriteResponse::TPtr&, const TActorContext& ctx);
  103. void Handle(TEvPQ::TEvMirrorerCounters::TPtr& ev, const TActorContext& ctx);
  104. void Handle(TEvPQ::TEvPartitionOffsets::TPtr& ev, const TActorContext& ctx);
  105. void Handle(TEvPQ::TEvPartitionStatus::TPtr& ev, const TActorContext& ctx);
  106. void Handle(TEvPQ::TEvPipeDisconnected::TPtr& ev, const TActorContext& ctx);
  107. void Handle(TEvPQ::TEvProxyResponse::TPtr& ev, const TActorContext& ctx);
  108. void Handle(TEvPQ::TEvQuotaDeadlineCheck::TPtr& ev, const TActorContext& ctx);
  109. void Handle(TEvPQ::TEvRead::TPtr& ev, const TActorContext& ctx);
  110. void Handle(TEvPQ::TEvReadTimeout::TPtr& ev, const TActorContext& ctx);
  111. void Handle(TEvPQ::TEvReserveBytes::TPtr& ev, const TActorContext& ctx);
  112. void Handle(TEvPQ::TEvSetClientInfo::TPtr& ev, const TActorContext& ctx);
  113. void Handle(TEvPQ::TEvUpdateWriteTimestamp::TPtr& ev, const TActorContext& ctx);
  114. void Handle(TEvPersQueue::TEvHasDataInfo::TPtr& ev, const TActorContext& ctx);
  115. void Handle(TEvPersQueue::TEvProposeTransaction::TPtr& ev, const TActorContext& ctx);
  116. void Handle(TEvPQ::TEvTxCalcPredicate::TPtr& ev, const TActorContext& ctx);
  117. void Handle(TEvPQ::TEvTxCommit::TPtr& ev, const TActorContext& ctx);
  118. void Handle(TEvPQ::TEvTxRollback::TPtr& ev, const TActorContext& ctx);
  119. void Handle(TEvPersQueue::TEvReportPartitionError::TPtr& ev, const TActorContext& ctx);
  120. void Handle(TEvQuota::TEvClearance::TPtr& ev, const TActorContext& ctx);
  121. void Handle(TEvents::TEvPoisonPill::TPtr& ev, const TActorContext& ctx);
  122. void Handle(TEvPQ::TEvSubDomainStatus::TPtr& ev, const TActorContext& ctx);
  123. void HandleMonitoring(TEvPQ::TEvMonRequest::TPtr& ev, const TActorContext& ctx);
  124. void HandleOnIdle(TEvPQ::TEvDeregisterMessageGroup::TPtr& ev, const TActorContext& ctx);
  125. void HandleOnIdle(TEvPQ::TEvRegisterMessageGroup::TPtr& ev, const TActorContext& ctx);
  126. void HandleOnIdle(TEvPQ::TEvSplitMessageGroup::TPtr& ev, const TActorContext& ctx);
  127. void HandleOnIdle(TEvPQ::TEvUpdateAvailableSize::TPtr& ev, const TActorContext& ctx);
  128. void HandleOnIdle(TEvPQ::TEvWrite::TPtr& ev, const TActorContext& ctx);
  129. void HandleOnInit(TEvPQ::TEvPartitionOffsets::TPtr& ev, const TActorContext& ctx);
  130. void HandleOnInit(TEvPQ::TEvPartitionStatus::TPtr& ev, const TActorContext& ctx);
  131. void HandleOnWrite(TEvPQ::TEvDeregisterMessageGroup::TPtr& ev, const TActorContext& ctx);
  132. void HandleOnWrite(TEvPQ::TEvRegisterMessageGroup::TPtr& ev, const TActorContext& ctx);
  133. void HandleOnWrite(TEvPQ::TEvSplitMessageGroup::TPtr& ev, const TActorContext& ctx);
  134. void HandleOnWrite(TEvPQ::TEvUpdateAvailableSize::TPtr& ev, const TActorContext& ctx);
  135. void HandleOnWrite(TEvPQ::TEvWrite::TPtr& ev, const TActorContext& ctx);
  136. void HandleWakeup(const TActorContext& ctx);
  137. void HandleWriteResponse(const TActorContext& ctx);
  138. void InitComplete(const TActorContext& ctx);
  139. void InitUserInfoForImportantClients(const TActorContext& ctx);
  140. void LogAndCollectError(NKikimrServices::EServiceKikimr service, const TString& msg, const TActorContext& ctx);
  141. void LogAndCollectError(const NKikimrPQ::TStatusResponse::TErrorMessage& error, const TActorContext& ctx);
  142. void OnReadRequestFinished(ui64 cookie, ui64 answerSize, const TString& consumer, const TActorContext& ctx);
  143. void ProcessChangeOwnerRequest(TAutoPtr<TEvPQ::TEvChangeOwner> ev, const TActorContext& ctx);
  144. void ProcessChangeOwnerRequests(const TActorContext& ctx);
  145. void ProcessHasDataRequests(const TActorContext& ctx);
  146. void ProcessRead(const TActorContext& ctx, TReadInfo&& info, const ui64 cookie, bool subscription);
  147. void ProcessReserveRequests(const TActorContext& ctx);
  148. void ProcessTimestampRead(const TActorContext& ctx);
  149. void ProcessTimestampsForNewData(const ui64 prevEndOffset, const TActorContext& ctx);
  150. void ProcessMaxSeqNoRequest(const TActorContext& ctx);
  151. void ReadTimestampForOffset(const TString& user, TUserInfo& ui, const TActorContext& ctx);
  152. void ReportCounters(const TActorContext& ctx, bool force = false);
  153. bool UpdateCounters(const TActorContext& ctx, bool force = false);
  154. void ScheduleUpdateAvailableSize(const TActorContext& ctx);
  155. void SetDeadlinesForWrites(const TActorContext& ctx);
  156. void SetupStreamCounters(const TActorContext& ctx);
  157. void SetupTopicCounters(const TActorContext& ctx);
  158. void SyncMemoryStateWithKVState(const TActorContext& ctx);
  159. void UpdateAvailableSize(const TActorContext& ctx);
  160. void AddMetaKey(TEvKeyValue::TEvRequest* request);
  161. void BecomeIdle(const TActorContext& ctx);
  162. void CheckHeadConsistency() const;
  163. void HandleWrites(const TActorContext& ctx);
  164. void RequestQuotaForWriteBlobRequest(size_t dataSize, ui64 cookie);
  165. void WriteBlobWithQuota(const TActorContext& ctx, THolder<TEvKeyValue::TEvRequest>&& request);
  166. void UpdateUserInfoEndOffset(const TInstant& now);
  167. void UpdateWriteBufferIsFullState(const TInstant& now);
  168. TInstant GetWriteTimeEstimate(ui64 offset) const;
  169. bool AppendHeadWithNewWrites(TEvKeyValue::TEvRequest* request, const TActorContext& ctx,
  170. TPartitionSourceManager::TModificationBatch& sourceIdBatch);
  171. bool CleanUp(TEvKeyValue::TEvRequest* request, const TActorContext& ctx);
  172. // Removes blobs that are no longer required. Blobs are no longer required if the storage time of all messages
  173. // stored in this blob has expired and they have been read by all important consumers.
  174. bool CleanUpBlobs(TEvKeyValue::TEvRequest *request, const TActorContext& ctx);
  175. bool IsQuotingEnabled() const;
  176. bool ProcessWrites(TEvKeyValue::TEvRequest* request, TInstant now, const TActorContext& ctx);
  177. bool WaitingForPreviousBlobQuota() const;
  178. bool WaitingForSubDomainQuota(const TActorContext& ctx, const ui64 withSize = 0) const;
  179. size_t GetQuotaRequestSize(const TEvKeyValue::TEvRequest& request);
  180. std::pair<TInstant, TInstant> GetTime(const TUserInfo& userInfo, ui64 offset) const;
  181. std::pair<TKey, ui32> Compact(const TKey& key, const ui32 size, bool headCleared);
  182. ui32 NextChannel(bool isHead, ui32 blobSize);
  183. ui64 GetSizeLag(i64 offset);
  184. std::pair<TKey, ui32> GetNewWriteKey(bool headCleared);
  185. THashMap<TString, TOwnerInfo>::iterator DropOwner(THashMap<TString, TOwnerInfo>::iterator& it,
  186. const TActorContext& ctx);
  187. // will return rcount and rsize also
  188. TVector<TRequestedBlob> GetReadRequestFromBody(const ui64 startOffset, const ui16 partNo, const ui32 maxCount, const ui32 maxSize, ui32* rcount, ui32* rsize);
  189. TVector<TClientBlob> GetReadRequestFromHead(const ui64 startOffset, const ui16 partNo, const ui32 maxCount, const ui32 maxSize, const ui64 readTimestampMs, ui32* rcount, ui32* rsize, ui64* insideHeadOffset);
  190. ui64 GetUsedStorage(const TActorContext& ctx);
  191. void ProcessTxsAndUserActs(const TActorContext& ctx);
  192. void ContinueProcessTxsAndUserActs(const TActorContext& ctx);
  193. void PushBackDistrTx(TSimpleSharedPtr<TEvPQ::TEvTxCalcPredicate> event);
  194. void PushBackDistrTx(TSimpleSharedPtr<TEvPQ::TEvChangePartitionConfig> event);
  195. void PushFrontDistrTx(TSimpleSharedPtr<TEvPQ::TEvChangePartitionConfig> event);
  196. void PushBackDistrTx(TSimpleSharedPtr<TEvPQ::TEvProposePartitionConfig> event);
  197. void RemoveDistrTx();
  198. void ProcessDistrTxs(const TActorContext& ctx);
  199. void ProcessDistrTx(const TActorContext& ctx);
  200. void AddImmediateTx(TSimpleSharedPtr<TEvPersQueue::TEvProposeTransaction> event);
  201. void RemoveImmediateTx();
  202. void ProcessImmediateTxs(const TActorContext& ctx);
  203. void ProcessImmediateTx(const NKikimrPQ::TEvProposeTransaction& tx,
  204. const TActorContext& ctx);
  205. void AddUserAct(TSimpleSharedPtr<TEvPQ::TEvSetClientInfo> act);
  206. void RemoveUserAct();
  207. size_t GetUserActCount(const TString& consumer) const;
  208. void ProcessUserActs(const TActorContext& ctx);
  209. void ProcessUserAct(TEvPQ::TEvSetClientInfo& act,
  210. const TActorContext& ctx);
  211. void EmulatePostProcessUserAct(const TEvPQ::TEvSetClientInfo& act,
  212. TUserInfoBase& userInfo,
  213. const TActorContext& ctx);
  214. void ScheduleReplyOk(const ui64 dst);
  215. void ScheduleReplyGetClientOffsetOk(const ui64 dst,
  216. const i64 offset,
  217. const TInstant writeTimestamp, const TInstant createTimestamp);
  218. void ScheduleReplyError(const ui64 dst,
  219. NPersQueue::NErrorCode::EErrorCode errorCode,
  220. const TString& error);
  221. void ScheduleReplyPropose(const NKikimrPQ::TEvProposeTransaction& event,
  222. NKikimrPQ::TEvProposeTransactionResult::EStatus statusCode);
  223. void ScheduleReplyCommitDone(ui64 step, ui64 txId);
  224. void ScheduleDropPartitionLabeledCounters(const TString& group);
  225. void SchedulePartitionConfigChanged();
  226. void AddCmdWrite(NKikimrClient::TKeyValueRequest& request,
  227. const TKeyPrefix& ikey, const TKeyPrefix& ikeyDeprecated,
  228. ui64 offset, ui32 gen, ui32 step, const TString& session,
  229. ui64 readOffsetRewindSum,
  230. ui64 readRuleGeneration);
  231. void AddCmdWriteTxMeta(NKikimrClient::TKeyValueRequest& request,
  232. ui64 step, ui64 txId);
  233. void AddCmdWriteUserInfos(NKikimrClient::TKeyValueRequest& request);
  234. void AddCmdWriteConfig(NKikimrClient::TKeyValueRequest& request);
  235. void AddCmdDeleteRange(NKikimrClient::TKeyValueRequest& request,
  236. const TKeyPrefix& ikey, const TKeyPrefix& ikeyDeprecated);
  237. TUserInfoBase& GetOrCreatePendingUser(const TString& user, TMaybe<ui64> readRuleGeneration = {});
  238. TUserInfoBase* GetPendingUserIfExists(const TString& user);
  239. THolder<TEvPQ::TEvProxyResponse> MakeReplyOk(const ui64 dst);
  240. THolder<TEvPQ::TEvProxyResponse> MakeReplyGetClientOffsetOk(const ui64 dst,
  241. const i64 offset,
  242. const TInstant writeTimestamp, const TInstant createTimestamp);
  243. THolder<TEvPQ::TEvError> MakeReplyError(const ui64 dst,
  244. NPersQueue::NErrorCode::EErrorCode errorCode,
  245. const TString& error);
  246. THolder<TEvPersQueue::TEvProposeTransactionResult> MakeReplyPropose(const NKikimrPQ::TEvProposeTransaction& event,
  247. NKikimrPQ::TEvProposeTransactionResult::EStatus statusCode);
  248. THolder<TEvPQ::TEvTxCommitDone> MakeCommitDone(ui64 step, ui64 txId);
  249. bool BeginTransaction(const TEvPQ::TEvTxCalcPredicate& event,
  250. const TActorContext& ctx);
  251. bool BeginTransaction(const TEvPQ::TEvProposePartitionConfig& event);
  252. void EndTransaction(const TEvPQ::TEvTxCommit& event,
  253. const TActorContext& ctx);
  254. void EndTransaction(const TEvPQ::TEvTxRollback& event,
  255. const TActorContext& ctx);
  256. void BeginChangePartitionConfig(const NKikimrPQ::TPQTabletConfig& config,
  257. const TActorContext& ctx);
  258. void OnProcessTxsAndUserActsWriteComplete(ui64 cookie, const TActorContext& ctx);
  259. void EndChangePartitionConfig(const NKikimrPQ::TPQTabletConfig& config,
  260. NPersQueue::TTopicConverterPtr topicConverter,
  261. const TActorContext& ctx);
  262. TString GetKeyConfig() const;
  263. void InitPendingUserInfoForImportantClients(const NKikimrPQ::TPQTabletConfig& config,
  264. const TActorContext& ctx);
  265. void Initialize(const TActorContext& ctx);
  266. template <typename T>
  267. void EmplaceRequest(T&& body, const TActorContext& ctx) {
  268. const auto now = ctx.Now();
  269. Requests.emplace_back(body, WriteQuota->GetQuotedTime(now), now - TInstant::Zero());
  270. }
  271. void EmplaceResponse(TMessage&& message, const TActorContext& ctx);
  272. void Handle(TEvPQ::TEvProposePartitionConfig::TPtr& ev, const TActorContext& ctx);
  273. void HandleOnInit(TEvPQ::TEvTxCalcPredicate::TPtr& ev, const TActorContext& ctx);
  274. void HandleOnInit(TEvPQ::TEvTxCommit::TPtr& ev, const TActorContext& ctx);
  275. void HandleOnInit(TEvPQ::TEvTxRollback::TPtr& ev, const TActorContext& ctx);
  276. void HandleOnInit(TEvPQ::TEvProposePartitionConfig::TPtr& ev, const TActorContext& ctx);
  277. void ChangePlanStepAndTxId(ui64 step, ui64 txId);
  278. void ResendPendingEvents(const TActorContext& ctx);
  279. void Handle(TEvPQ::TEvSourceIdRequest::TPtr& ev, const TActorContext& ctx);
  280. TString LogPrefix() const;
  281. public:
  282. static constexpr NKikimrServices::TActivity::EType ActorActivityType() {
  283. return NKikimrServices::TActivity::PERSQUEUE_PARTITION_ACTOR;
  284. }
  285. TPartition(ui64 tabletId, ui32 partition, const TActorId& tablet, const TActorId& blobCache,
  286. const NPersQueue::TTopicConverterPtr& topicConverter, TString dcId, bool isServerless,
  287. const NKikimrPQ::TPQTabletConfig& config, const TTabletCountersBase& counters, bool SubDomainOutOfSpace, ui32 numChannels,
  288. bool newPartition = false,
  289. TVector<TTransaction> distrTxs = {});
  290. void Bootstrap(const TActorContext& ctx);
  291. ui64 Size() const {
  292. return BodySize + Head.PackedSize;
  293. }
  294. // The size of the data realy was persisted in the storage by the partition
  295. ui64 MeteringDataSize(const TActorContext& ctx) const;
  296. // The size of the storage that was reserved by the partition
  297. ui64 ReserveSize() const;
  298. // The size of the storage that usud by the partition. That included combination of the reserver and realy persisted data.
  299. ui64 StorageSize(const TActorContext& ctx) const;
  300. ui64 UsedReserveSize(const TActorContext& ctx) const;
  301. // Minimal offset, the data from which cannot be deleted, because it is required by an important consumer
  302. ui64 ImportantClientsMinOffset() const;
  303. //Bootstrap sends kvRead
  304. //Become StateInit
  305. //StateInit
  306. //wait for correct result, cache all
  307. //Become StateIdle
  308. //StateIdle
  309. //got read - make kvRead
  310. //got kvReadResult - answer read
  311. //got write - make kvWrite, Become StateWrite
  312. //StateWrite
  313. // got read - ...
  314. // got kwReadResult - ...
  315. //got write - store it inflight
  316. //got kwWriteResult - check it, become StateIdle of StateWrite(and write inflight)
  317. private:
  318. template <typename TEv>
  319. TString EventStr(const char * func, const TEv& ev) {
  320. TStringStream ss;
  321. ss << func << " event# " << ev->GetTypeRewrite() << " (" << ev->GetTypeName() << "), Tablet " << Tablet << ", Partition " << Partition
  322. << ", Sender " << ev->Sender.ToString() << ", Recipient " << ev->Recipient.ToString() << ", Cookie: " << ev->Cookie;
  323. return ss.Str();
  324. }
  325. TInitializer Initializer;
  326. STFUNC(StateInit)
  327. {
  328. NPersQueue::TCounterTimeKeeper keeper(TabletCounters.Cumulative()[COUNTER_PQ_TABLET_CPU_USAGE]);
  329. ALOG_TRACE(NKikimrServices::PERSQUEUE, EventStr("StateInit", ev));
  330. TRACE_EVENT(NKikimrServices::PERSQUEUE);
  331. switch (ev->GetTypeRewrite()) {
  332. CFunc(TEvents::TSystem::Wakeup, HandleWakeup);
  333. HFuncTraced(TEvents::TEvPoisonPill, Handle);
  334. HFuncTraced(TEvPQ::TEvMonRequest, HandleMonitoring);
  335. HFuncTraced(TEvPQ::TEvChangePartitionConfig, Handle);
  336. HFuncTraced(TEvPQ::TEvPartitionOffsets, HandleOnInit);
  337. HFuncTraced(TEvPQ::TEvPartitionStatus, HandleOnInit);
  338. HFuncTraced(TEvPersQueue::TEvReportPartitionError, Handle);
  339. HFuncTraced(TEvPersQueue::TEvHasDataInfo, Handle);
  340. HFuncTraced(TEvPQ::TEvMirrorerCounters, Handle);
  341. HFuncTraced(TEvPQ::TEvGetPartitionClientInfo, Handle);
  342. HFuncTraced(TEvPQ::TEvTxCalcPredicate, HandleOnInit);
  343. HFuncTraced(TEvPQ::TEvProposePartitionConfig, HandleOnInit);
  344. HFuncTraced(TEvPQ::TEvTxCommit, HandleOnInit);
  345. HFuncTraced(TEvPQ::TEvTxRollback, HandleOnInit);
  346. HFuncTraced(TEvPQ::TEvSubDomainStatus, Handle);
  347. HFuncTraced(NReadQuoterEvents::TEvQuotaUpdated, Handle);
  348. HFuncTraced(NReadQuoterEvents::TEvAccountQuotaCountersUpdated, Handle);
  349. HFuncTraced(NReadQuoterEvents::TEvQuotaCountersUpdated, Handle);
  350. default:
  351. if (!Initializer.Handle(ev)) {
  352. ALOG_ERROR(NKikimrServices::PERSQUEUE, "Unexpected " << EventStr("StateInit", ev));
  353. }
  354. break;
  355. };
  356. }
  357. STFUNC(StateIdle)
  358. {
  359. NPersQueue::TCounterTimeKeeper keeper(TabletCounters.Cumulative()[COUNTER_PQ_TABLET_CPU_USAGE]);
  360. ALOG_TRACE(NKikimrServices::PERSQUEUE, EventStr("StateIdle", ev));
  361. TRACE_EVENT(NKikimrServices::PERSQUEUE);
  362. switch (ev->GetTypeRewrite()) {
  363. CFunc(TEvents::TSystem::Wakeup, HandleWakeup);
  364. HFuncTraced(TEvKeyValue::TEvResponse, Handle);
  365. HFuncTraced(TEvPQ::TEvBlobResponse, Handle);
  366. HFuncTraced(TEvPQ::TEvWrite, HandleOnIdle);
  367. HFuncTraced(TEvPQ::TEvRead, Handle);
  368. HFuncTraced(TEvPQ::TEvApproveQuota, Handle);
  369. HFuncTraced(TEvPQ::TEvReadTimeout, Handle);
  370. HFuncTraced(TEvents::TEvPoisonPill, Handle);
  371. HFuncTraced(TEvPQ::TEvMonRequest, HandleMonitoring);
  372. HFuncTraced(TEvPQ::TEvGetMaxSeqNoRequest, Handle);
  373. HFuncTraced(TEvPQ::TEvChangePartitionConfig, Handle);
  374. HFuncTraced(TEvPQ::TEvGetClientOffset, Handle);
  375. HFuncTraced(TEvPQ::TEvUpdateWriteTimestamp, Handle);
  376. HFuncTraced(TEvPQ::TEvSetClientInfo, Handle);
  377. HFuncTraced(TEvPQ::TEvPartitionOffsets, Handle);
  378. HFuncTraced(TEvPQ::TEvPartitionStatus, Handle);
  379. HFuncTraced(TEvPersQueue::TEvReportPartitionError, Handle);
  380. HFuncTraced(TEvPQ::TEvChangeOwner, Handle);
  381. HFuncTraced(TEvPersQueue::TEvHasDataInfo, Handle);
  382. HFuncTraced(TEvPQ::TEvMirrorerCounters, Handle);
  383. HFuncTraced(TEvPQ::TEvProxyResponse, Handle);
  384. HFuncTraced(TEvPQ::TEvError, Handle);
  385. HFuncTraced(TEvPQ::TEvGetPartitionClientInfo, Handle);
  386. HFuncTraced(TEvPQ::TEvUpdateAvailableSize, HandleOnIdle);
  387. HFuncTraced(TEvPQ::TEvReserveBytes, Handle);
  388. HFuncTraced(TEvPQ::TEvPipeDisconnected, Handle);
  389. HFuncTraced(TEvQuota::TEvClearance, Handle);
  390. HFuncTraced(TEvPQ::TEvQuotaDeadlineCheck, Handle);
  391. HFuncTraced(TEvPQ::TEvRegisterMessageGroup, HandleOnIdle);
  392. HFuncTraced(TEvPQ::TEvDeregisterMessageGroup, HandleOnIdle);
  393. HFuncTraced(TEvPQ::TEvSplitMessageGroup, HandleOnIdle);
  394. HFuncTraced(TEvPersQueue::TEvProposeTransaction, Handle);
  395. HFuncTraced(TEvPQ::TEvTxCalcPredicate, Handle);
  396. HFuncTraced(TEvPQ::TEvProposePartitionConfig, Handle);
  397. HFuncTraced(TEvPQ::TEvTxCommit, Handle);
  398. HFuncTraced(TEvPQ::TEvTxRollback, Handle);
  399. HFuncTraced(TEvPQ::TEvSubDomainStatus, Handle);
  400. HFuncTraced(TEvPQ::TEvSourceIdRequest, Handle);
  401. HFuncTraced(TEvPQ::TEvSourceIdResponse, SourceManager.Handle);
  402. HFuncTraced(NReadQuoterEvents::TEvQuotaUpdated, Handle);
  403. HFuncTraced(NReadQuoterEvents::TEvAccountQuotaCountersUpdated, Handle);
  404. HFuncTraced(NReadQuoterEvents::TEvQuotaCountersUpdated, Handle);
  405. default:
  406. ALOG_ERROR(NKikimrServices::PERSQUEUE, "Unexpected " << EventStr("StateIdle", ev));
  407. break;
  408. };
  409. }
  410. STFUNC(StateWrite)
  411. {
  412. NPersQueue::TCounterTimeKeeper keeper(TabletCounters.Cumulative()[COUNTER_PQ_TABLET_CPU_USAGE]);
  413. ALOG_TRACE(NKikimrServices::PERSQUEUE, EventStr("StateWrite", ev));
  414. TRACE_EVENT(NKikimrServices::PERSQUEUE);
  415. switch (ev->GetTypeRewrite()) {
  416. CFunc(TEvents::TSystem::Wakeup, HandleWakeup);
  417. HFuncTraced(TEvKeyValue::TEvResponse, Handle);
  418. HFuncTraced(TEvPQ::TEvHandleWriteResponse, Handle);
  419. HFuncTraced(TEvPQ::TEvBlobResponse, Handle);
  420. HFuncTraced(TEvPQ::TEvWrite, HandleOnWrite);
  421. HFuncTraced(TEvPQ::TEvRead, Handle);
  422. HFuncTraced(TEvPQ::TEvApproveQuota, Handle);
  423. HFuncTraced(TEvPQ::TEvReadTimeout, Handle);
  424. HFuncTraced(TEvents::TEvPoisonPill, Handle);
  425. HFuncTraced(TEvPQ::TEvMonRequest, HandleMonitoring);
  426. HFuncTraced(TEvPQ::TEvGetMaxSeqNoRequest, Handle);
  427. HFuncTraced(TEvPQ::TEvGetClientOffset, Handle);
  428. HFuncTraced(TEvPQ::TEvUpdateWriteTimestamp, Handle);
  429. HFuncTraced(TEvPQ::TEvSetClientInfo, Handle);
  430. HFuncTraced(TEvPQ::TEvPartitionOffsets, Handle);
  431. HFuncTraced(TEvPQ::TEvPartitionStatus, Handle);
  432. HFuncTraced(TEvPersQueue::TEvReportPartitionError, Handle);
  433. HFuncTraced(TEvPQ::TEvChangeOwner, Handle);
  434. HFuncTraced(TEvPQ::TEvChangePartitionConfig, Handle);
  435. HFuncTraced(TEvPersQueue::TEvHasDataInfo, Handle);
  436. HFuncTraced(TEvPQ::TEvMirrorerCounters, Handle);
  437. HFuncTraced(TEvPQ::TEvProxyResponse, Handle);
  438. HFuncTraced(TEvPQ::TEvError, Handle);
  439. HFuncTraced(TEvPQ::TEvReserveBytes, Handle);
  440. HFuncTraced(TEvPQ::TEvGetPartitionClientInfo, Handle);
  441. HFuncTraced(TEvPQ::TEvPipeDisconnected, Handle);
  442. HFuncTraced(TEvPQ::TEvUpdateAvailableSize, HandleOnWrite);
  443. HFuncTraced(TEvPQ::TEvQuotaDeadlineCheck, Handle);
  444. HFuncTraced(TEvQuota::TEvClearance, Handle);
  445. HFuncTraced(TEvPQ::TEvRegisterMessageGroup, HandleOnWrite);
  446. HFuncTraced(TEvPQ::TEvDeregisterMessageGroup, HandleOnWrite);
  447. HFuncTraced(TEvPQ::TEvSplitMessageGroup, HandleOnWrite);
  448. HFuncTraced(TEvPersQueue::TEvProposeTransaction, Handle);
  449. HFuncTraced(TEvPQ::TEvTxCalcPredicate, Handle);
  450. HFuncTraced(TEvPQ::TEvProposePartitionConfig, Handle);
  451. HFuncTraced(TEvPQ::TEvTxCommit, Handle);
  452. HFuncTraced(TEvPQ::TEvTxRollback, Handle);
  453. HFuncTraced(TEvPQ::TEvSubDomainStatus, Handle);
  454. HFuncTraced(TEvPQ::TEvSourceIdRequest, Handle);
  455. HFuncTraced(TEvPQ::TEvSourceIdResponse, SourceManager.Handle);
  456. HFuncTraced(NReadQuoterEvents::TEvQuotaUpdated, Handle);
  457. HFuncTraced(NReadQuoterEvents::TEvAccountQuotaCountersUpdated, Handle);
  458. HFuncTraced(NReadQuoterEvents::TEvQuotaCountersUpdated, Handle);
  459. default:
  460. ALOG_ERROR(NKikimrServices::PERSQUEUE, "Unexpected " << EventStr("StateWrite", ev));
  461. break;
  462. };
  463. }
  464. private:
  465. enum class ProcessResult {
  466. Continue,
  467. Abort,
  468. Break
  469. };
  470. struct ProcessParameters {
  471. ProcessParameters(TPartitionSourceManager::TModificationBatch& sourceIdBatch)
  472. : SourceIdBatch(sourceIdBatch) {
  473. }
  474. TPartitionSourceManager::TModificationBatch& SourceIdBatch;
  475. ui64 CurOffset;
  476. bool OldPartsCleared;
  477. bool HeadCleared;
  478. };
  479. ProcessResult ProcessRequest(TRegisterMessageGroupMsg& msg, ProcessParameters& parameters);
  480. ProcessResult ProcessRequest(TDeregisterMessageGroupMsg& msg, ProcessParameters& parameters);
  481. ProcessResult ProcessRequest(TSplitMessageGroupMsg& msg, ProcessParameters& parameters);
  482. ProcessResult ProcessRequest(TWriteMsg& msg, ProcessParameters& parameters, TEvKeyValue::TEvRequest* request, const TActorContext& ctx);
  483. private:
  484. ui64 TabletID;
  485. ui32 Partition;
  486. NKikimrPQ::TPQTabletConfig Config;
  487. NKikimrPQ::TPQTabletConfig TabletConfig;
  488. const NKikimrPQ::TPQTabletConfig::TPartition* PartitionConfig = nullptr;
  489. const NKikimrPQ::TPQTabletConfig::TPartition* PendingPartitionConfig = nullptr;
  490. const TTabletCountersBase& Counters;
  491. NPersQueue::TTopicConverterPtr TopicConverter;
  492. bool IsLocalDC;
  493. TString DCId;
  494. TPartitionGraph PartitionGraph;
  495. TPartitionSourceManager SourceManager;
  496. ui32 MaxBlobSize;
  497. const ui32 TotalLevels = 4;
  498. TVector<ui32> CompactLevelBorder;
  499. ui32 TotalMaxCount;
  500. ui32 MaxSizeCheck;
  501. // [ 8+Mb][ 8+Mb ][not compacted data ] [ data sended to KV but not yet confirmed]
  502. //ofsets in partition: 101 102|103 104|105 106 107 108 109 110|111 112 113
  503. // ^ ^ ^
  504. // StartOffset HeadOffset EndOffset
  505. // [DataKeysBody ][DataKeysHead ]
  506. ui64 StartOffset;
  507. ui64 EndOffset;
  508. ui64 WriteInflightSize;
  509. TActorId Tablet;
  510. TActorId BlobCache;
  511. std::deque<TMessage> Requests;
  512. std::deque<TMessage> Responses;
  513. std::deque<TEvPQ::TEvGetMaxSeqNoRequest::TPtr> MaxSeqNoRequests;
  514. THead Head;
  515. THead NewHead;
  516. TPartitionedBlob PartitionedBlob;
  517. std::deque<std::pair<TKey, ui32>> CompactedKeys; //key and blob size
  518. TDataKey NewHeadKey;
  519. ui64 BodySize;
  520. ui32 MaxWriteResponsesSize;
  521. std::deque<TDataKey> DataKeysBody;
  522. TVector<TKeyLevel> DataKeysHead;
  523. std::deque<TDataKey> HeadKeys;
  524. std::deque<std::pair<ui64,ui64>> GapOffsets;
  525. ui64 GapSize;
  526. TString CloudId;
  527. TString DbId;
  528. TString DbPath;
  529. bool IsServerless;
  530. TString FolderId;
  531. TMaybe<TUsersInfoStorage> UsersInfoStorage;
  532. //
  533. // user actions and transactions
  534. //
  535. std::deque<TSimpleSharedPtr<TEvPQ::TEvSetClientInfo>> UserActs;
  536. std::deque<TSimpleSharedPtr<TEvPersQueue::TEvProposeTransaction>> ImmediateTxs;
  537. std::deque<TTransaction> DistrTxs;
  538. THashMap<TString, size_t> UserActCount;
  539. THashMap<TString, TUserInfoBase> PendingUsersInfo;
  540. TVector<std::pair<TActorId, std::unique_ptr<IEventBase>>> Replies;
  541. THashSet<TString> AffectedUsers;
  542. bool UsersInfoWriteInProgress = false;
  543. bool TxInProgress = false;
  544. TMaybe<ui64> PlanStep;
  545. TMaybe<ui64> TxId;
  546. bool TxIdHasChanged = false;
  547. TSimpleSharedPtr<TEvPQ::TEvChangePartitionConfig> ChangeConfig;
  548. bool SendChangeConfigReply = true;
  549. //
  550. //
  551. //
  552. std::deque<std::pair<TString, ui64>> UpdateUserInfoTimestamp;
  553. bool ReadingTimestamp;
  554. TString ReadingForUser;
  555. ui64 ReadingForUserReadRuleGeneration;
  556. ui64 ReadingForOffset; // log only
  557. THashMap<ui64, TReadInfo> ReadInfo; // cookie -> {...}
  558. ui64 Cookie;
  559. TInstant CreationTime;
  560. TDuration InitDuration;
  561. bool InitDone;
  562. const bool NewPartition;
  563. THashMap<TString, NKikimr::NPQ::TOwnerInfo> Owners;
  564. THashSet<TActorId> OwnerPipes;
  565. TSourceIdStorage SourceIdStorage;
  566. std::deque<THolder<TEvPQ::TEvChangeOwner>> WaitToChangeOwner;
  567. TTabletCountersBase TabletCounters;
  568. THolder<TPartitionLabeledCounters> PartitionCountersLabeled;
  569. TInstant LastCountersUpdate;
  570. TSubscriber Subscriber;
  571. TInstant WriteCycleStartTime;
  572. ui32 WriteCycleSize;
  573. ui32 WriteNewSize;
  574. ui32 WriteNewSizeInternal;
  575. ui64 WriteNewSizeUncompressed;
  576. ui32 WriteNewMessages;
  577. ui32 WriteNewMessagesInternal;
  578. TInstant CurrentTimestamp;
  579. bool DiskIsFull;
  580. bool SubDomainOutOfSpace;
  581. TSet<THasDataReq> HasDataRequests;
  582. TSet<THasDataDeadline> HasDataDeadlines;
  583. ui64 HasDataReqNum;
  584. TMaybe<TQuotaTracker> WriteQuota;
  585. TActorId ReadQuotaTrackerActor;
  586. THolder<TPercentileCounter> PartitionWriteQuotaWaitCounter;
  587. TInstant QuotaDeadline = TInstant::Zero();
  588. TVector<NSlidingWindow::TSlidingWindow<NSlidingWindow::TSumOperation<ui64>>> AvgWriteBytes;
  589. NSlidingWindow::TSlidingWindow<NSlidingWindow::TSumOperation<ui64>> AvgReadBytes;
  590. TVector<NSlidingWindow::TSlidingWindow<NSlidingWindow::TSumOperation<ui64>>> AvgQuotaBytes;
  591. ui64 ReservedSize;
  592. std::deque<THolder<TEvPQ::TEvReserveBytes>> ReserveRequests;
  593. ui32 Channel;
  594. ui32 NumChannels;
  595. TVector<ui32> TotalChannelWritesByHead;
  596. TWorkingTimeCounter WriteBufferIsFullCounter;
  597. TInstant WriteTimestamp;
  598. TInstant WriteTimestampEstimate;
  599. bool ManageWriteTimestampEstimate = true;
  600. NSlidingWindow::TSlidingWindow<NSlidingWindow::TMaxOperation<ui64>> WriteLagMs;
  601. THolder<TPercentileCounter> InputTimeLag;
  602. THolder<TPercentileCounter> MessageSize;
  603. TPercentileCounter WriteLatency;
  604. NKikimr::NPQ::TMultiCounter SLIBigLatency;
  605. NKikimr::NPQ::TMultiCounter WritesTotal;
  606. NKikimr::NPQ::TMultiCounter BytesWrittenTotal;
  607. NKikimr::NPQ::TMultiCounter BytesWrittenGrpc;
  608. NKikimr::NPQ::TMultiCounter BytesWrittenUncompressed;
  609. NKikimr::NPQ::TMultiCounter BytesWrittenComp;
  610. NKikimr::NPQ::TMultiCounter MsgsWrittenTotal;
  611. NKikimr::NPQ::TMultiCounter MsgsWrittenGrpc;;
  612. // Writing blob with topic quota variables
  613. ui64 TopicQuotaRequestCookie = 0;
  614. // Wait topic quota metrics
  615. THolder<TPercentileCounter> TopicWriteQuotaWaitCounter;
  616. TInstant StartTopicQuotaWaitTimeForCurrentBlob;
  617. TInstant WriteStartTime;
  618. TDuration TopicQuotaWaitTimeForCurrentBlob;
  619. // Topic quota parameters
  620. TString TopicWriteQuoterPath;
  621. TString TopicWriteQuotaResourcePath;
  622. ui64 NextTopicWriteQuotaRequestCookie = 1;
  623. TDeque<NKikimrPQ::TStatusResponse::TErrorMessage> Errors;
  624. THolder<TMirrorerInfo> Mirrorer;
  625. TInstant LastUsedStorageMeterTimestamp;
  626. TDeque<std::unique_ptr<IEventBase>> PendingEvents;
  627. };
  628. } // namespace NKikimr::NPQ