datashard.cpp 170 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737738739740741742743744745746747748749750751752753754755756757758759760761762763764765766767768769770771772773774775776777778779780781782783784785786787788789790791792793794795796797798799800801802803804805806807808809810811812813814815816817818819820821822823824825826827828829830831832833834835836837838839840841842843844845846847848849850851852853854855856857858859860861862863864865866867868869870871872873874875876877878879880881882883884885886887888889890891892893894895896897898899900901902903904905906907908909910911912913914915916917918919920921922923924925926927928929930931932933934935936937938939940941942943944945946947948949950951952953954955956957958959960961962963964965966967968969970971972973974975976977978979980981982983984985986987988989990991992993994995996997998999100010011002100310041005100610071008100910101011101210131014101510161017101810191020102110221023102410251026102710281029103010311032103310341035103610371038103910401041104210431044104510461047104810491050105110521053105410551056105710581059106010611062106310641065106610671068106910701071107210731074107510761077107810791080108110821083108410851086108710881089109010911092109310941095109610971098109911001101110211031104110511061107110811091110111111121113111411151116111711181119112011211122112311241125112611271128112911301131113211331134113511361137113811391140114111421143114411451146114711481149115011511152115311541155115611571158115911601161116211631164116511661167116811691170117111721173117411751176117711781179118011811182118311841185118611871188118911901191119211931194119511961197119811991200120112021203120412051206120712081209121012111212121312141215121612171218121912201221122212231224122512261227122812291230123112321233123412351236123712381239124012411242124312441245124612471248124912501251125212531254125512561257125812591260126112621263126412651266126712681269127012711272127312741275127612771278127912801281128212831284128512861287128812891290129112921293129412951296129712981299130013011302130313041305130613071308130913101311131213131314131513161317131813191320132113221323132413251326132713281329133013311332133313341335133613371338133913401341134213431344134513461347134813491350135113521353135413551356135713581359136013611362136313641365136613671368136913701371137213731374137513761377137813791380138113821383138413851386138713881389139013911392139313941395139613971398139914001401140214031404140514061407140814091410141114121413141414151416141714181419142014211422142314241425142614271428142914301431143214331434143514361437143814391440144114421443144414451446144714481449145014511452145314541455145614571458145914601461146214631464146514661467146814691470147114721473147414751476147714781479148014811482148314841485148614871488148914901491149214931494149514961497149814991500150115021503150415051506150715081509151015111512151315141515151615171518151915201521152215231524152515261527152815291530153115321533153415351536153715381539154015411542154315441545154615471548154915501551155215531554155515561557155815591560156115621563156415651566156715681569157015711572157315741575157615771578157915801581158215831584158515861587158815891590159115921593159415951596159715981599160016011602160316041605160616071608160916101611161216131614161516161617161816191620162116221623162416251626162716281629163016311632163316341635163616371638163916401641164216431644164516461647164816491650165116521653165416551656165716581659166016611662166316641665166616671668166916701671167216731674167516761677167816791680168116821683168416851686168716881689169016911692169316941695169616971698169917001701170217031704170517061707170817091710171117121713171417151716171717181719172017211722172317241725172617271728172917301731173217331734173517361737173817391740174117421743174417451746174717481749175017511752175317541755175617571758175917601761176217631764176517661767176817691770177117721773177417751776177717781779178017811782178317841785178617871788178917901791179217931794179517961797179817991800180118021803180418051806180718081809181018111812181318141815181618171818181918201821182218231824182518261827182818291830183118321833183418351836183718381839184018411842184318441845184618471848184918501851185218531854185518561857185818591860186118621863186418651866186718681869187018711872187318741875187618771878187918801881188218831884188518861887188818891890189118921893189418951896189718981899190019011902190319041905190619071908190919101911191219131914191519161917191819191920192119221923192419251926192719281929193019311932193319341935193619371938193919401941194219431944194519461947194819491950195119521953195419551956195719581959196019611962196319641965196619671968196919701971197219731974197519761977197819791980198119821983198419851986198719881989199019911992199319941995199619971998199920002001200220032004200520062007200820092010201120122013201420152016201720182019202020212022202320242025202620272028202920302031203220332034203520362037203820392040204120422043204420452046204720482049205020512052205320542055205620572058205920602061206220632064206520662067206820692070207120722073207420752076207720782079208020812082208320842085208620872088208920902091209220932094209520962097209820992100210121022103210421052106210721082109211021112112211321142115211621172118211921202121212221232124212521262127212821292130213121322133213421352136213721382139214021412142214321442145214621472148214921502151215221532154215521562157215821592160216121622163216421652166216721682169217021712172217321742175217621772178217921802181218221832184218521862187218821892190219121922193219421952196219721982199220022012202220322042205220622072208220922102211221222132214221522162217221822192220222122222223222422252226222722282229223022312232223322342235223622372238223922402241224222432244224522462247224822492250225122522253225422552256225722582259226022612262226322642265226622672268226922702271227222732274227522762277227822792280228122822283228422852286228722882289229022912292229322942295229622972298229923002301230223032304230523062307230823092310231123122313231423152316231723182319232023212322232323242325232623272328232923302331233223332334233523362337233823392340234123422343234423452346234723482349235023512352235323542355235623572358235923602361236223632364236523662367236823692370237123722373237423752376237723782379238023812382238323842385238623872388238923902391239223932394239523962397239823992400240124022403240424052406240724082409241024112412241324142415241624172418241924202421242224232424242524262427242824292430243124322433243424352436243724382439244024412442244324442445244624472448244924502451245224532454245524562457245824592460246124622463246424652466246724682469247024712472247324742475247624772478247924802481248224832484248524862487248824892490249124922493249424952496249724982499250025012502250325042505250625072508250925102511251225132514251525162517251825192520252125222523252425252526252725282529253025312532253325342535253625372538253925402541254225432544254525462547254825492550255125522553255425552556255725582559256025612562256325642565256625672568256925702571257225732574257525762577257825792580258125822583258425852586258725882589259025912592259325942595259625972598259926002601260226032604260526062607260826092610261126122613261426152616261726182619262026212622262326242625262626272628262926302631263226332634263526362637263826392640264126422643264426452646264726482649265026512652265326542655265626572658265926602661266226632664266526662667266826692670267126722673267426752676267726782679268026812682268326842685268626872688268926902691269226932694269526962697269826992700270127022703270427052706270727082709271027112712271327142715271627172718271927202721272227232724272527262727272827292730273127322733273427352736273727382739274027412742274327442745274627472748274927502751275227532754275527562757275827592760276127622763276427652766276727682769277027712772277327742775277627772778277927802781278227832784278527862787278827892790279127922793279427952796279727982799280028012802280328042805280628072808280928102811281228132814281528162817281828192820282128222823282428252826282728282829283028312832283328342835283628372838283928402841284228432844284528462847284828492850285128522853285428552856285728582859286028612862286328642865286628672868286928702871287228732874287528762877287828792880288128822883288428852886288728882889289028912892289328942895289628972898289929002901290229032904290529062907290829092910291129122913291429152916291729182919292029212922292329242925292629272928292929302931293229332934293529362937293829392940294129422943294429452946294729482949295029512952295329542955295629572958295929602961296229632964296529662967296829692970297129722973297429752976297729782979298029812982298329842985298629872988298929902991299229932994299529962997299829993000300130023003300430053006300730083009301030113012301330143015301630173018301930203021302230233024302530263027302830293030303130323033303430353036303730383039304030413042304330443045304630473048304930503051305230533054305530563057305830593060306130623063306430653066306730683069307030713072307330743075307630773078307930803081308230833084308530863087308830893090309130923093309430953096309730983099310031013102310331043105310631073108310931103111311231133114311531163117311831193120312131223123312431253126312731283129313031313132313331343135313631373138313931403141314231433144314531463147314831493150315131523153315431553156315731583159316031613162316331643165316631673168316931703171317231733174317531763177317831793180318131823183318431853186318731883189319031913192319331943195319631973198319932003201320232033204320532063207320832093210321132123213321432153216321732183219322032213222322332243225322632273228322932303231323232333234323532363237323832393240324132423243324432453246324732483249325032513252325332543255325632573258325932603261326232633264326532663267326832693270327132723273327432753276327732783279328032813282328332843285328632873288328932903291329232933294329532963297329832993300330133023303330433053306330733083309331033113312331333143315331633173318331933203321332233233324332533263327332833293330333133323333333433353336333733383339334033413342334333443345334633473348334933503351335233533354335533563357335833593360336133623363336433653366336733683369337033713372337333743375337633773378337933803381338233833384338533863387338833893390339133923393339433953396339733983399340034013402340334043405340634073408340934103411341234133414341534163417341834193420342134223423342434253426342734283429343034313432343334343435343634373438343934403441344234433444344534463447344834493450345134523453345434553456345734583459346034613462346334643465346634673468346934703471347234733474347534763477347834793480348134823483348434853486348734883489349034913492349334943495349634973498349935003501350235033504350535063507350835093510351135123513351435153516351735183519352035213522352335243525352635273528352935303531353235333534353535363537353835393540354135423543354435453546354735483549355035513552355335543555355635573558355935603561356235633564356535663567356835693570357135723573357435753576357735783579358035813582358335843585358635873588358935903591359235933594359535963597359835993600360136023603360436053606360736083609361036113612361336143615361636173618361936203621362236233624362536263627362836293630363136323633363436353636363736383639364036413642364336443645364636473648364936503651365236533654365536563657365836593660366136623663366436653666366736683669367036713672367336743675367636773678367936803681368236833684368536863687368836893690369136923693369436953696369736983699370037013702370337043705370637073708370937103711371237133714371537163717371837193720372137223723372437253726372737283729373037313732373337343735373637373738373937403741374237433744374537463747374837493750375137523753375437553756375737583759376037613762376337643765376637673768376937703771377237733774377537763777377837793780378137823783378437853786378737883789379037913792379337943795379637973798379938003801380238033804380538063807380838093810381138123813381438153816381738183819382038213822382338243825382638273828382938303831383238333834383538363837383838393840384138423843384438453846384738483849385038513852385338543855385638573858385938603861386238633864386538663867386838693870387138723873387438753876387738783879388038813882388338843885388638873888388938903891389238933894389538963897389838993900390139023903390439053906390739083909391039113912391339143915391639173918391939203921392239233924392539263927392839293930393139323933393439353936393739383939394039413942394339443945394639473948394939503951395239533954395539563957395839593960396139623963396439653966396739683969397039713972397339743975397639773978397939803981398239833984398539863987398839893990399139923993399439953996399739983999400040014002400340044005400640074008400940104011401240134014401540164017401840194020402140224023402440254026402740284029403040314032403340344035403640374038403940404041404240434044404540464047404840494050405140524053405440554056405740584059406040614062406340644065406640674068406940704071407240734074407540764077407840794080408140824083408440854086408740884089409040914092409340944095409640974098409941004101410241034104410541064107410841094110411141124113411441154116411741184119412041214122412341244125412641274128412941304131413241334134413541364137413841394140414141424143414441454146414741484149415041514152415341544155415641574158415941604161416241634164416541664167416841694170417141724173417441754176417741784179418041814182418341844185418641874188418941904191419241934194419541964197419841994200420142024203420442054206420742084209421042114212421342144215421642174218421942204221422242234224422542264227422842294230423142324233423442354236423742384239424042414242424342444245424642474248424942504251425242534254425542564257425842594260426142624263426442654266426742684269427042714272427342744275427642774278427942804281428242834284428542864287428842894290429142924293429442954296429742984299430043014302430343044305430643074308430943104311431243134314431543164317431843194320432143224323432443254326432743284329433043314332433343344335433643374338433943404341434243434344434543464347434843494350435143524353435443554356435743584359436043614362436343644365436643674368436943704371437243734374437543764377437843794380438143824383438443854386438743884389439043914392439343944395439643974398439944004401440244034404440544064407440844094410441144124413441444154416441744184419442044214422442344244425442644274428442944304431443244334434443544364437443844394440444144424443444444454446444744484449445044514452445344544455
  1. #include "datashard_impl.h"
  2. #include "datashard_txs.h"
  3. #include "probes.h"
  4. #include <ydb/core/base/interconnect_channels.h>
  5. #include <ydb/core/engine/minikql/flat_local_tx_factory.h>
  6. #include <ydb/core/formats/arrow/arrow_batch_builder.h>
  7. #include <ydb/core/scheme/scheme_tablecell.h>
  8. #include <ydb/core/tablet/tablet_counters_protobuf.h>
  9. #include <ydb/core/tx/long_tx_service/public/events.h>
  10. #include <ydb/core/protos/datashard_config.pb.h>
  11. #include <ydb/library/actors/core/monotonic_provider.h>
  12. #include <library/cpp/monlib/service/pages/templates.h>
  13. #include <contrib/libs/apache/arrow/cpp/src/arrow/api.h>
  14. LWTRACE_USING(DATASHARD_PROVIDER)
  15. namespace NKikimr {
  16. IActor* CreateDataShard(const TActorId &tablet, TTabletStorageInfo *info) {
  17. return new NDataShard::TDataShard(tablet, info);
  18. }
  19. namespace NDataShard {
  20. using namespace NSchemeShard;
  21. using namespace NTabletFlatExecutor;
  22. // NOTE: We really want to batch log records by default in datashards!
  23. // But in unittests we want to test both scenarios
  24. bool gAllowLogBatchingDefaultValue = true;
  25. TDuration gDbStatsReportInterval = TDuration::Seconds(10);
  26. ui64 gDbStatsDataSizeResolution = 10*1024*1024;
  27. ui64 gDbStatsRowCountResolution = 100000;
  28. // The first byte is 0x01 so it would fail to parse as an internal tablet protobuf
  29. TStringBuf SnapshotTransferReadSetMagic("\x01SRS", 4);
  30. /**
  31. * A special subclass of TMiniKQLFactory that uses correct row versions for writes
  32. */
  33. class TDataShardMiniKQLFactory : public NMiniKQL::TMiniKQLFactory {
  34. public:
  35. TDataShardMiniKQLFactory(TDataShard* self)
  36. : Self(self)
  37. { }
  38. TRowVersion GetWriteVersion(const TTableId& tableId) const override {
  39. using Schema = TDataShard::Schema;
  40. Y_VERIFY_S(tableId.PathId.OwnerId == Self->TabletID(),
  41. "Unexpected table " << tableId.PathId.OwnerId << ":" << tableId.PathId.LocalPathId
  42. << " for datashard " << Self->TabletID()
  43. << " in a local minikql tx");
  44. if (tableId.PathId.LocalPathId < Schema::MinLocalTid) {
  45. // System tables are not versioned
  46. return TRowVersion::Min();
  47. }
  48. // Write user tables with a minimal safe version (avoiding snapshots)
  49. return Self->GetLocalReadWriteVersions().WriteVersion;
  50. }
  51. TRowVersion GetReadVersion(const TTableId& tableId) const override {
  52. using Schema = TDataShard::Schema;
  53. Y_VERIFY_S(tableId.PathId.OwnerId == Self->TabletID(),
  54. "Unexpected table " << tableId.PathId.OwnerId << ":" << tableId.PathId.LocalPathId
  55. << " for datashard " << Self->TabletID()
  56. << " in a local minikql tx");
  57. if (tableId.PathId.LocalPathId < Schema::MinLocalTid) {
  58. // System tables are not versioned
  59. return TRowVersion::Max();
  60. }
  61. return Self->GetLocalReadWriteVersions().ReadVersion;
  62. }
  63. private:
  64. TDataShard* const Self;
  65. };
  66. class TDatashardKeySampler : public NMiniKQL::IKeyAccessSampler {
  67. TDataShard& Self;
  68. public:
  69. TDatashardKeySampler(TDataShard& self) : Self(self)
  70. {}
  71. void AddSample(const TTableId& tableId, const TArrayRef<const TCell>& key) override {
  72. Self.SampleKeyAccess(tableId, key);
  73. }
  74. };
  75. TDataShard::TDataShard(const TActorId &tablet, TTabletStorageInfo *info)
  76. : TActor(&TThis::StateInit)
  77. , TTabletExecutedFlat(info, tablet, new TDataShardMiniKQLFactory(this))
  78. , PipeClientCacheConfig(new NTabletPipe::TBoundedClientCacheConfig())
  79. , PipeClientCache(NTabletPipe::CreateBoundedClientCache(PipeClientCacheConfig, GetPipeClientConfig()))
  80. , ResendReadSetPipeTracker(*PipeClientCache)
  81. , SchemeShardPipeRetryPolicy({})
  82. , PathOwnerId(INVALID_TABLET_ID)
  83. , CurrentSchemeShardId(INVALID_TABLET_ID)
  84. , LastKnownMediator(INVALID_TABLET_ID)
  85. , RegistrationSended(false)
  86. , LoanReturnTracker(info->TabletID)
  87. , MvccSwitchState(TSwitchState::READY)
  88. , SplitSnapshotStarted(false)
  89. , SplitSrcSnapshotSender(this)
  90. , DstSplitOpId(0)
  91. , SrcSplitOpId(0)
  92. , State(TShardState::Uninitialized)
  93. , LastLocalTid(Schema::MinLocalTid)
  94. , NextSeqno(1)
  95. , NextChangeRecordOrder(1)
  96. , LastChangeRecordGroup(1)
  97. , TxReadSizeLimit(0)
  98. , StatisticsDisabled(0)
  99. , DisabledKeySampler(new NMiniKQL::TNoopKeySampler())
  100. , EnabledKeySampler(new TDatashardKeySampler(*this))
  101. , CurrentKeySampler(DisabledKeySampler)
  102. , TransQueue(this)
  103. , OutReadSets(this)
  104. , Pipeline(this)
  105. , SysLocks(this)
  106. , SnapshotManager(this)
  107. , SchemaSnapshotManager(this)
  108. , VolatileTxManager(this)
  109. , ConflictsCache(this)
  110. , DisableByKeyFilter(0, 0, 1)
  111. , MaxTxInFly(15000, 0, 100000)
  112. , MaxTxLagMilliseconds(5*60*1000, 0, 30*24*3600*1000ll)
  113. , CanCancelROWithReadSets(0, 0, 1)
  114. , PerShardReadSizeLimit(5368709120, 0, 107374182400)
  115. , CpuUsageReportThreshlodPercent(60, -1, 146)
  116. , CpuUsageReportIntervalSeconds(60, 0, 365*86400)
  117. , HighDataSizeReportThreshlodBytes(10ull<<30, -1, Max<i64>())
  118. , HighDataSizeReportIntervalSeconds(60, 0, 365*86400)
  119. , DataTxProfileLogThresholdMs(0, 0, 86400000)
  120. , DataTxProfileBufferThresholdMs(0, 0, 86400000)
  121. , DataTxProfileBufferSize(0, 1000, 100)
  122. , ReadColumnsScanEnabled(1, 0, 1)
  123. , ReadColumnsScanInUserPool(0, 0, 1)
  124. , BackupReadAheadLo(0, 0, 64*1024*1024)
  125. , BackupReadAheadHi(0, 0, 128*1024*1024)
  126. , TtlReadAheadLo(0, 0, 64*1024*1024)
  127. , TtlReadAheadHi(0, 0, 128*1024*1024)
  128. , EnablePrioritizedMvccSnapshotReads(1, 0, 1)
  129. , EnableUnprotectedMvccSnapshotReads(1, 0, 1)
  130. , EnableLockedWrites(1, 0, 1)
  131. , MaxLockedWritesPerKey(1000, 0, 1000000)
  132. , EnableLeaderLeases(1, 0, 1)
  133. , MinLeaderLeaseDurationUs(250000, 1000, 5000000)
  134. , DataShardSysTables(InitDataShardSysTables(this))
  135. , ChangeSenderActivator(info->TabletID)
  136. , ChangeExchangeSplitter(this)
  137. {
  138. TabletCountersPtr.Reset(new TProtobufTabletCounters<
  139. ESimpleCounters_descriptor,
  140. ECumulativeCounters_descriptor,
  141. EPercentileCounters_descriptor,
  142. ETxTypes_descriptor
  143. >());
  144. TabletCounters = TabletCountersPtr.Get();
  145. RegisterDataShardProbes();
  146. }
  147. NTabletPipe::TClientConfig TDataShard::GetPipeClientConfig() {
  148. NTabletPipe::TClientConfig config;
  149. config.CheckAliveness = true;
  150. config.RetryPolicy = {
  151. .RetryLimitCount = 30,
  152. .MinRetryTime = TDuration::MilliSeconds(10),
  153. .MaxRetryTime = TDuration::MilliSeconds(500),
  154. .BackoffMultiplier = 2,
  155. };
  156. return config;
  157. }
  158. void TDataShard::OnDetach(const TActorContext &ctx) {
  159. Cleanup(ctx);
  160. LOG_INFO_S(ctx, NKikimrServices::TX_DATASHARD, "OnDetach: " << TabletID());
  161. return Die(ctx);
  162. }
  163. void TDataShard::OnTabletStop(TEvTablet::TEvTabletStop::TPtr &ev, const TActorContext &ctx) {
  164. const auto* msg = ev->Get();
  165. LOG_INFO_S(ctx, NKikimrServices::TX_DATASHARD, "OnTabletStop: " << TabletID() << " reason = " << msg->GetReason());
  166. if (!IsFollower() && GetState() == TShardState::Ready) {
  167. if (!Stopping) {
  168. Stopping = true;
  169. OnStopGuardStarting(ctx);
  170. Execute(new TTxStopGuard(this), ctx);
  171. }
  172. switch (msg->GetReason()) {
  173. case TEvTablet::TEvTabletStop::ReasonStop:
  174. case TEvTablet::TEvTabletStop::ReasonDemoted:
  175. case TEvTablet::TEvTabletStop::ReasonIsolated:
  176. // Keep trying to stop gracefully
  177. return;
  178. case TEvTablet::TEvTabletStop::ReasonUnknown:
  179. case TEvTablet::TEvTabletStop::ReasonStorageBlocked:
  180. case TEvTablet::TEvTabletStop::ReasonStorageFailure:
  181. // New commits are impossible, stop immediately
  182. break;
  183. }
  184. } else {
  185. Stopping = true;
  186. }
  187. return TTabletExecutedFlat::OnTabletStop(ev, ctx);
  188. }
  189. void TDataShard::TTxStopGuard::Complete(const TActorContext &ctx) {
  190. Self->OnStopGuardComplete(ctx);
  191. }
  192. void TDataShard::OnStopGuardStarting(const TActorContext &ctx) {
  193. // Handle immediate ops that have completed BuildAndWaitDependencies
  194. for (const auto &kv : Pipeline.GetImmediateOps()) {
  195. const auto &op = kv.second;
  196. // Send reject result immediately, because we cannot control when
  197. // a new datashard tablet may start and block us from commiting
  198. // anything new. The usual progress queue is too slow for that.
  199. if (!op->Result() && !op->HasResultSentFlag()) {
  200. auto kind = static_cast<NKikimrTxDataShard::ETransactionKind>(op->GetKind());
  201. auto rejectStatus = NKikimrTxDataShard::TEvProposeTransactionResult::OVERLOADED;
  202. TString rejectReason = TStringBuilder()
  203. << "Rejecting immediate tx "
  204. << op->GetTxId()
  205. << " because datashard "
  206. << TabletID()
  207. << " is restarting";
  208. auto result = MakeHolder<TEvDataShard::TEvProposeTransactionResult>(
  209. kind, TabletID(), op->GetTxId(), rejectStatus);
  210. result->AddError(NKikimrTxDataShard::TError::WRONG_SHARD_STATE, rejectReason);
  211. LOG_NOTICE_S(ctx, NKikimrServices::TX_DATASHARD, rejectReason);
  212. ctx.Send(op->GetTarget(), result.Release(), 0, op->GetCookie());
  213. IncCounter(COUNTER_PREPARE_OVERLOADED);
  214. IncCounter(COUNTER_PREPARE_COMPLETE);
  215. op->SetResultSentFlag();
  216. }
  217. // Add op to candidates because IsReadyToExecute just became true
  218. Pipeline.AddCandidateOp(op);
  219. PlanQueue.Progress(ctx);
  220. }
  221. // Handle prepared ops by notifying about imminent shutdown
  222. for (const auto &kv : TransQueue.GetTxsInFly()) {
  223. const auto &op = kv.second;
  224. if (op->GetTarget() && !op->HasCompletedFlag()) {
  225. auto notify = MakeHolder<TEvDataShard::TEvProposeTransactionRestart>(
  226. TabletID(), op->GetTxId());
  227. ctx.Send(op->GetTarget(), notify.Release(), 0, op->GetCookie());
  228. }
  229. }
  230. }
  231. void TDataShard::OnStopGuardComplete(const TActorContext &ctx) {
  232. // We have cleanly completed the last commit
  233. ctx.Send(Tablet(), new TEvTablet::TEvTabletStopped());
  234. }
  235. void TDataShard::OnTabletDead(TEvTablet::TEvTabletDead::TPtr &ev, const TActorContext &ctx) {
  236. Y_UNUSED(ev);
  237. LOG_INFO_S(ctx, NKikimrServices::TX_DATASHARD, "OnTabletDead: " << TabletID());
  238. Cleanup(ctx);
  239. return Die(ctx);
  240. }
  241. void TDataShard::Cleanup(const TActorContext& ctx) {
  242. //PipeClientCache->Detach(ctx);
  243. if (RegistrationSended) {
  244. ctx.Send(MakeMediatorTimecastProxyID(), new TEvMediatorTimecast::TEvUnsubscribeReadStep());
  245. ctx.Send(MakeMediatorTimecastProxyID(), new TEvMediatorTimecast::TEvUnregisterTablet(TabletID()));
  246. }
  247. if (Pipeline.HasRestore()) {
  248. auto op = Pipeline.FindOp(Pipeline.CurrentSchemaTxId());
  249. if (op && op->IsWaitingForAsyncJob()) {
  250. TActiveTransaction* tx = dynamic_cast<TActiveTransaction*>(op.Get());
  251. Y_ABORT_UNLESS(tx);
  252. tx->KillAsyncJobActor(ctx);
  253. }
  254. }
  255. }
  256. void TDataShard::IcbRegister() {
  257. if (!IcbRegistered) {
  258. auto* appData = AppData();
  259. appData->Icb->RegisterSharedControl(DisableByKeyFilter, "DataShardControls.DisableByKeyFilter");
  260. appData->Icb->RegisterSharedControl(MaxTxInFly, "DataShardControls.MaxTxInFly");
  261. appData->Icb->RegisterSharedControl(MaxTxLagMilliseconds, "DataShardControls.MaxTxLagMilliseconds");
  262. appData->Icb->RegisterSharedControl(DataTxProfileLogThresholdMs, "DataShardControls.DataTxProfile.LogThresholdMs");
  263. appData->Icb->RegisterSharedControl(DataTxProfileBufferThresholdMs, "DataShardControls.DataTxProfile.BufferThresholdMs");
  264. appData->Icb->RegisterSharedControl(DataTxProfileBufferSize, "DataShardControls.DataTxProfile.BufferSize");
  265. appData->Icb->RegisterSharedControl(CanCancelROWithReadSets, "DataShardControls.CanCancelROWithReadSets");
  266. appData->Icb->RegisterSharedControl(PerShardReadSizeLimit, "TxLimitControls.PerShardReadSizeLimit");
  267. appData->Icb->RegisterSharedControl(CpuUsageReportThreshlodPercent, "DataShardControls.CpuUsageReportThreshlodPercent");
  268. appData->Icb->RegisterSharedControl(CpuUsageReportIntervalSeconds, "DataShardControls.CpuUsageReportIntervalSeconds");
  269. appData->Icb->RegisterSharedControl(HighDataSizeReportThreshlodBytes, "DataShardControls.HighDataSizeReportThreshlodBytes");
  270. appData->Icb->RegisterSharedControl(HighDataSizeReportIntervalSeconds, "DataShardControls.HighDataSizeReportIntervalSeconds");
  271. appData->Icb->RegisterSharedControl(ReadColumnsScanEnabled, "DataShardControls.ReadColumnsScanEnabled");
  272. appData->Icb->RegisterSharedControl(ReadColumnsScanInUserPool, "DataShardControls.ReadColumnsScanInUserPool");
  273. appData->Icb->RegisterSharedControl(BackupReadAheadLo, "DataShardControls.BackupReadAheadLo");
  274. appData->Icb->RegisterSharedControl(BackupReadAheadHi, "DataShardControls.BackupReadAheadHi");
  275. appData->Icb->RegisterSharedControl(TtlReadAheadLo, "DataShardControls.TtlReadAheadLo");
  276. appData->Icb->RegisterSharedControl(TtlReadAheadHi, "DataShardControls.TtlReadAheadHi");
  277. appData->Icb->RegisterSharedControl(EnablePrioritizedMvccSnapshotReads, "DataShardControls.PrioritizedMvccSnapshotReads");
  278. appData->Icb->RegisterSharedControl(EnableUnprotectedMvccSnapshotReads, "DataShardControls.UnprotectedMvccSnapshotReads");
  279. appData->Icb->RegisterSharedControl(EnableLockedWrites, "DataShardControls.EnableLockedWrites");
  280. appData->Icb->RegisterSharedControl(MaxLockedWritesPerKey, "DataShardControls.MaxLockedWritesPerKey");
  281. appData->Icb->RegisterSharedControl(EnableLeaderLeases, "DataShardControls.EnableLeaderLeases");
  282. appData->Icb->RegisterSharedControl(MinLeaderLeaseDurationUs, "DataShardControls.MinLeaderLeaseDurationUs");
  283. IcbRegistered = true;
  284. }
  285. }
  286. bool TDataShard::ReadOnlyLeaseEnabled() {
  287. IcbRegister();
  288. ui64 value = EnableLeaderLeases;
  289. return value != 0;
  290. }
  291. TDuration TDataShard::ReadOnlyLeaseDuration() {
  292. IcbRegister();
  293. ui64 value = MinLeaderLeaseDurationUs;
  294. return TDuration::MicroSeconds(value);
  295. }
  296. void TDataShard::OnActivateExecutor(const TActorContext& ctx) {
  297. LOG_INFO_S(ctx, NKikimrServices::TX_DATASHARD, "TDataShard::OnActivateExecutor: tablet " << TabletID() << " actor " << ctx.SelfID);
  298. IcbRegister();
  299. // OnActivateExecutor might be called multiple times for a follower
  300. // but the counters should be initialized only once
  301. if (TabletCountersPtr) {
  302. Executor()->RegisterExternalTabletCounters(TabletCountersPtr);
  303. }
  304. Y_ABORT_UNLESS(TabletCounters);
  305. AllocCounters = TAlignedPagePoolCounters(AppData(ctx)->Counters, "datashard");
  306. if (!Executor()->GetStats().IsFollower) {
  307. Execute(CreateTxInitSchema(), ctx);
  308. Become(&TThis::StateInactive);
  309. } else {
  310. SyncConfig();
  311. State = TShardState::Readonly;
  312. FollowerState = { };
  313. Become(&TThis::StateWorkAsFollower);
  314. SignalTabletActive(ctx);
  315. LOG_INFO_S(ctx, NKikimrServices::TX_DATASHARD, "Follower switched to work state: " << TabletID());
  316. }
  317. }
  318. void TDataShard::SwitchToWork(const TActorContext &ctx) {
  319. if (IsMvccEnabled() && (
  320. SnapshotManager.GetPerformedUnprotectedReads() ||
  321. SnapshotManager.GetImmediateWriteEdge().Step > SnapshotManager.GetCompleteEdge().Step))
  322. {
  323. // We will need to wait until mediator state is fully restored before
  324. // processing new immediate transactions.
  325. MediatorStateWaiting = true;
  326. CheckMediatorStateRestored();
  327. }
  328. SyncConfig();
  329. PlanQueue.Progress(ctx);
  330. OutReadSets.ResendAll(ctx);
  331. Become(&TThis::StateWork);
  332. LOG_INFO_S(ctx, NKikimrServices::TX_DATASHARD, "Switched to work state "
  333. << DatashardStateName(State) << " tabletId " << TabletID());
  334. if (State == TShardState::Ready && DstSplitDescription) {
  335. // This shard was created as a result of split/merge (and not e.g. copy table)
  336. // Signal executor that it should compact borrowed garbage even if this
  337. // shard has no private data.
  338. for (const auto& pr : TableInfos) {
  339. Executor()->AllowBorrowedGarbageCompaction(pr.second->LocalTid);
  340. }
  341. }
  342. // Cleanup any removed snapshots from the previous generation
  343. Execute(new TTxCleanupRemovedSnapshots(this), ctx);
  344. if (State != TShardState::Offline) {
  345. VolatileTxManager.Start(ctx);
  346. }
  347. SignalTabletActive(ctx);
  348. DoPeriodicTasks(ctx);
  349. NotifySchemeshard(ctx);
  350. CheckInitiateBorrowedPartsReturn(ctx);
  351. CheckStateChange(ctx);
  352. }
  353. void TDataShard::SyncConfig() {
  354. PipeClientCacheConfig->ClientPoolLimit = PipeClientCachePoolLimit();
  355. PipeClientCache->PopWhileOverflow();
  356. // TODO[serxa]: dynamic prepared in fly
  357. //3=SetDynamicPreparedInFly(Config.GetFlowControl().GetPreparedInFlyMax());
  358. }
  359. void TDataShard::SendRegistrationRequestTimeCast(const TActorContext &ctx) {
  360. if (RegistrationSended)
  361. return;
  362. if (!ProcessingParams)
  363. return;
  364. LOG_INFO_S(ctx, NKikimrServices::TX_DATASHARD, "Send registration request to time cast "
  365. << DatashardStateName(State) << " tabletId " << TabletID()
  366. << " mediators count is " << ProcessingParams->MediatorsSize()
  367. << " coordinators count is " << ProcessingParams->CoordinatorsSize()
  368. << " buckets per mediator " << ProcessingParams->GetTimeCastBucketsPerMediator());
  369. RegistrationSended = true;
  370. ctx.Send(MakeMediatorTimecastProxyID(), new TEvMediatorTimecast::TEvRegisterTablet(TabletID(), *ProcessingParams));
  371. // Subscribe to all known coordinators
  372. for (ui64 coordinatorId : ProcessingParams->GetCoordinators()) {
  373. size_t index = CoordinatorSubscriptions.size();
  374. auto res = CoordinatorSubscriptionById.emplace(coordinatorId, index);
  375. if (res.second) {
  376. auto& subscription = CoordinatorSubscriptions.emplace_back();
  377. subscription.CoordinatorId = coordinatorId;
  378. ctx.Send(MakeMediatorTimecastProxyID(), new TEvMediatorTimecast::TEvSubscribeReadStep(coordinatorId));
  379. ++CoordinatorSubscriptionsPending;
  380. }
  381. }
  382. }
  383. void TDataShard::PrepareAndSaveOutReadSets(ui64 step,
  384. ui64 txId,
  385. const TMap<std::pair<ui64, ui64>, TString>& txOutReadSets,
  386. TVector<THolder<TEvTxProcessing::TEvReadSet>> &preparedRS,
  387. TTransactionContext &txc,
  388. const TActorContext& ctx)
  389. {
  390. NIceDb::TNiceDb db(txc.DB);
  391. OutReadSets.Cleanup(db, ctx);
  392. if (txOutReadSets.empty())
  393. return;
  394. ui64 prevSeqno = NextSeqno;
  395. for (auto& kv : txOutReadSets) {
  396. ui64 source = kv.first.first;
  397. ui64 target = kv.first.second;
  398. TReadSetKey rsKey(txId, TabletID(), source, target);
  399. if (! OutReadSets.Has(rsKey)) {
  400. ui64 seqno = NextSeqno++;
  401. OutReadSets.SaveReadSet(db, seqno, step, rsKey, kv.second);
  402. preparedRS.push_back(PrepareReadSet(step, txId, source, target, kv.second, seqno));
  403. }
  404. }
  405. if (NextSeqno != prevSeqno) {
  406. PersistSys(db, Schema::Sys_NextSeqno, NextSeqno);
  407. }
  408. }
  409. void TDataShard::SendDelayedAcks(const TActorContext& ctx, TVector<THolder<IEventHandle>>& delayedAcks) const {
  410. for (auto& x : delayedAcks) {
  411. LOG_DEBUG(ctx, NKikimrServices::TX_DATASHARD,
  412. "Send delayed Ack RS Ack at %" PRIu64 " %s",
  413. TabletID(), x->ToString().data());
  414. ctx.ExecutorThread.Send(x.Release());
  415. IncCounter(COUNTER_ACK_SENT_DELAYED);
  416. }
  417. delayedAcks.clear();
  418. }
  419. void TDataShard::GetCleanupReplies(const TOperation::TPtr& op, std::vector<std::unique_ptr<IEventHandle>>& cleanupReplies) {
  420. if (!op->HasOutputData()) {
  421. // There are no replies
  422. return;
  423. }
  424. auto& delayedAcks = op->DelayedAcks();
  425. for (auto& x : delayedAcks) {
  426. LOG_DEBUG_S(*TlsActivationContext, NKikimrServices::TX_DATASHARD,
  427. "Cleanup TxId# " << op->GetTxId() << " at " << TabletID() << " Ack RS " << x->ToString());
  428. cleanupReplies.emplace_back(x.Release());
  429. IncCounter(COUNTER_ACK_SENT_DELAYED);
  430. }
  431. delayedAcks.clear();
  432. auto& expectedReadSets = op->ExpectedReadSets();
  433. for (auto& x : expectedReadSets) {
  434. for (const auto& recipient : x.second) {
  435. cleanupReplies.push_back(GenerateReadSetNoData(recipient, op->GetStep(), op->GetTxId(), x.first.first, x.first.second));
  436. }
  437. }
  438. expectedReadSets.clear();
  439. }
  440. void TDataShard::SendConfirmedReplies(TMonotonic ts, std::vector<std::unique_ptr<IEventHandle>>&& replies) {
  441. if (replies.empty()) {
  442. return;
  443. }
  444. struct TState : public TThrRefBase {
  445. std::vector<std::unique_ptr<IEventHandle>> Replies;
  446. TState(std::vector<std::unique_ptr<IEventHandle>>&& replies)
  447. : Replies(std::move(replies))
  448. {}
  449. };
  450. Executor()->ConfirmReadOnlyLease(ts,
  451. [state = MakeIntrusive<TState>(std::move(replies))] {
  452. for (auto& ev : state->Replies) {
  453. TActivationContext::Send(std::move(ev));
  454. }
  455. });
  456. }
  457. void TDataShard::SendCommittedReplies(std::vector<std::unique_ptr<IEventHandle>>&& replies) {
  458. for (auto& ev : replies) {
  459. TActivationContext::Send(std::move(ev));
  460. }
  461. }
  462. class TDataShard::TWaitVolatileDependencies final : public IVolatileTxCallback {
  463. public:
  464. TWaitVolatileDependencies(
  465. TDataShard* self, const absl::flat_hash_set<ui64>& dependencies,
  466. const TActorId& target,
  467. std::unique_ptr<IEventBase> event,
  468. ui64 cookie)
  469. : Self(self)
  470. , Dependencies(dependencies)
  471. , Target(target)
  472. , Event(std::move(event))
  473. , Cookie(cookie)
  474. { }
  475. void OnCommit(ui64 txId) override {
  476. Dependencies.erase(txId);
  477. if (Dependencies.empty()) {
  478. Finish();
  479. }
  480. }
  481. void OnAbort(ui64 txId) override {
  482. Dependencies.erase(txId);
  483. if (Dependencies.empty()) {
  484. Finish();
  485. }
  486. }
  487. void Finish() {
  488. Self->Send(Target, Event.release(), 0, Cookie);
  489. }
  490. private:
  491. TDataShard* Self;
  492. absl::flat_hash_set<ui64> Dependencies;
  493. TActorId Target;
  494. std::unique_ptr<IEventBase> Event;
  495. ui64 Cookie;
  496. };
  497. void TDataShard::WaitVolatileDependenciesThenSend(
  498. const absl::flat_hash_set<ui64>& dependencies,
  499. const TActorId& target, std::unique_ptr<IEventBase> event,
  500. ui64 cookie)
  501. {
  502. Y_ABORT_UNLESS(!dependencies.empty(), "Unexpected empty dependencies");
  503. auto callback = MakeIntrusive<TWaitVolatileDependencies>(this, dependencies, target, std::move(event), cookie);
  504. for (ui64 txId : dependencies) {
  505. bool ok = VolatileTxManager.AttachVolatileTxCallback(txId, callback);
  506. Y_VERIFY_S(ok, "Unexpected failure to attach callback to volatile tx " << txId);
  507. }
  508. }
  509. class TDataShard::TSendVolatileResult final : public IVolatileTxCallback {
  510. public:
  511. TSendVolatileResult(
  512. TDataShard* self, TOutputOpData::TResultPtr result,
  513. const TActorId& target,
  514. ui64 step, ui64 txId)
  515. : Self(self)
  516. , Result(std::move(result))
  517. , Target(target)
  518. , Step(step)
  519. , TxId(txId)
  520. { }
  521. void OnCommit(ui64) override {
  522. LOG_DEBUG_S(*TlsActivationContext, NKikimrServices::TX_DATASHARD,
  523. "Complete [" << Step << " : " << TxId << "] from " << Self->TabletID()
  524. << " at tablet " << Self->TabletID() << " send result to client "
  525. << Target << ", exec latency: " << Result->Record.GetExecLatency()
  526. << " ms, propose latency: " << Result->Record.GetProposeLatency() << " ms");
  527. ui64 resultSize = Result->GetTxResult().size();
  528. ui32 flags = IEventHandle::MakeFlags(TInterconnectChannels::GetTabletChannel(resultSize), 0);
  529. LWTRACK(ProposeTransactionSendResult, Result->Orbit);
  530. Self->Send(Target, Result.Release(), flags);
  531. }
  532. void OnAbort(ui64 txId) override {
  533. Result->Record.ClearTxResult();
  534. Result->Record.SetStatus(NKikimrTxDataShard::TEvProposeTransactionResult::ABORTED);
  535. Result->AddError(NKikimrTxDataShard::TError::EXECUTION_CANCELLED, "Distributed transaction aborted due to commit failure");
  536. OnCommit(txId);
  537. }
  538. private:
  539. TDataShard* Self;
  540. TOutputOpData::TResultPtr Result;
  541. TActorId Target;
  542. ui64 Step;
  543. ui64 TxId;
  544. };
  545. void TDataShard::SendResult(const TActorContext &ctx,
  546. TOutputOpData::TResultPtr &res,
  547. const TActorId &target,
  548. ui64 step,
  549. ui64 txId)
  550. {
  551. Y_ABORT_UNLESS(txId == res->GetTxId(), "%" PRIu64 " vs %" PRIu64, txId, res->GetTxId());
  552. if (VolatileTxManager.FindByTxId(txId)) {
  553. // This is a volatile transaction, and we need to wait until it is resolved
  554. bool ok = VolatileTxManager.AttachVolatileTxCallback(txId,
  555. new TSendVolatileResult(this, std::move(res), target, step, txId));
  556. Y_ABORT_UNLESS(ok);
  557. return;
  558. }
  559. LOG_DEBUG_S(ctx, NKikimrServices::TX_DATASHARD,
  560. "Complete [" << step << " : " << txId << "] from " << TabletID()
  561. << " at tablet " << TabletID() << " send result to client "
  562. << target << ", exec latency: " << res->Record.GetExecLatency()
  563. << " ms, propose latency: " << res->Record.GetProposeLatency() << " ms");
  564. ui64 resultSize = res->GetTxResult().size();
  565. ui32 flags = IEventHandle::MakeFlags(TInterconnectChannels::GetTabletChannel(resultSize), 0);
  566. LWTRACK(ProposeTransactionSendResult, res->Orbit);
  567. ctx.Send(target, res.Release(), flags);
  568. }
  569. void TDataShard::FillExecutionStats(const TExecutionProfile& execProfile, TEvDataShard::TEvProposeTransactionResult& result) const {
  570. TDuration totalCpuTime;
  571. for (const auto& unit : execProfile.UnitProfiles) {
  572. totalCpuTime += unit.second.ExecuteTime;
  573. totalCpuTime += unit.second.CompleteTime;
  574. }
  575. result.Record.MutableTxStats()->MutablePerShardStats()->Clear();
  576. auto& stats = *result.Record.MutableTxStats()->AddPerShardStats();
  577. stats.SetShardId(TabletID());
  578. stats.SetCpuTimeUsec(totalCpuTime.MicroSeconds());
  579. }
  580. ui64 TDataShard::AllocateChangeRecordOrder(NIceDb::TNiceDb& db, ui64 count) {
  581. const ui64 result = NextChangeRecordOrder;
  582. NextChangeRecordOrder = result + count;
  583. PersistSys(db, Schema::Sys_NextChangeRecordOrder, NextChangeRecordOrder);
  584. return result;
  585. }
  586. ui64 TDataShard::AllocateChangeRecordGroup(NIceDb::TNiceDb& db) {
  587. const ui64 now = TInstant::Now().MicroSeconds();
  588. const ui64 result = now > LastChangeRecordGroup ? now : (LastChangeRecordGroup + 1);
  589. LastChangeRecordGroup = result;
  590. PersistSys(db, Schema::Sys_LastChangeRecordGroup, LastChangeRecordGroup);
  591. return result;
  592. }
  593. ui64 TDataShard::GetNextChangeRecordLockOffset(ui64 lockId) {
  594. auto it = LockChangeRecords.find(lockId);
  595. if (it == LockChangeRecords.end() || it->second.Changes.empty()) {
  596. return 0;
  597. }
  598. return it->second.Changes.back().LockOffset + 1;
  599. }
  600. void TDataShard::PersistChangeRecord(NIceDb::TNiceDb& db, const TChangeRecord& record) {
  601. LOG_DEBUG_S(*TlsActivationContext, NKikimrServices::TX_DATASHARD, "PersistChangeRecord"
  602. << ": record: " << record
  603. << ", at tablet: " << TabletID());
  604. ui64 lockId = record.GetLockId();
  605. if (lockId == 0) {
  606. db.Table<Schema::ChangeRecords>().Key(record.GetOrder()).Update(
  607. NIceDb::TUpdate<Schema::ChangeRecords::Group>(record.GetGroup()),
  608. NIceDb::TUpdate<Schema::ChangeRecords::PlanStep>(record.GetStep()),
  609. NIceDb::TUpdate<Schema::ChangeRecords::TxId>(record.GetTxId()),
  610. NIceDb::TUpdate<Schema::ChangeRecords::PathOwnerId>(record.GetPathId().OwnerId),
  611. NIceDb::TUpdate<Schema::ChangeRecords::LocalPathId>(record.GetPathId().LocalPathId),
  612. NIceDb::TUpdate<Schema::ChangeRecords::BodySize>(record.GetBody().size()),
  613. NIceDb::TUpdate<Schema::ChangeRecords::SchemaVersion>(record.GetSchemaVersion()),
  614. NIceDb::TUpdate<Schema::ChangeRecords::TableOwnerId>(record.GetTableId().OwnerId),
  615. NIceDb::TUpdate<Schema::ChangeRecords::TablePathId>(record.GetTableId().LocalPathId));
  616. db.Table<Schema::ChangeRecordDetails>().Key(record.GetOrder()).Update(
  617. NIceDb::TUpdate<Schema::ChangeRecordDetails::Kind>(record.GetKind()),
  618. NIceDb::TUpdate<Schema::ChangeRecordDetails::Body>(record.GetBody()),
  619. NIceDb::TUpdate<Schema::ChangeRecordDetails::Source>(record.GetSource()));
  620. } else {
  621. auto& state = LockChangeRecords[lockId];
  622. Y_ABORT_UNLESS(state.Changes.empty() || state.Changes.back().LockOffset < record.GetLockOffset(),
  623. "Lock records must be added in their lock offset order");
  624. if (state.Changes.size() == state.PersistentCount) {
  625. db.GetDatabase().OnCommit([this, lockId] {
  626. // We mark all added records as persistent
  627. auto it = LockChangeRecords.find(lockId);
  628. Y_ABORT_UNLESS(it != LockChangeRecords.end());
  629. it->second.PersistentCount = it->second.Changes.size();
  630. });
  631. db.GetDatabase().OnRollback([this, lockId] {
  632. // We remove all change records that have not been committed
  633. auto it = LockChangeRecords.find(lockId);
  634. Y_ABORT_UNLESS(it != LockChangeRecords.end());
  635. it->second.Changes.erase(
  636. it->second.Changes.begin() + it->second.PersistentCount,
  637. it->second.Changes.end());
  638. if (it->second.Changes.empty()) {
  639. LockChangeRecords.erase(it);
  640. }
  641. });
  642. }
  643. state.Changes.push_back(IDataShardChangeCollector::TChange{
  644. .Order = record.GetOrder(),
  645. .Group = record.GetGroup(),
  646. .Step = record.GetStep(),
  647. .TxId = record.GetTxId(),
  648. .PathId = record.GetPathId(),
  649. .BodySize = record.GetBody().size(),
  650. .TableId = record.GetTableId(),
  651. .SchemaVersion = record.GetSchemaVersion(),
  652. .LockId = record.GetLockId(),
  653. .LockOffset = record.GetLockOffset(),
  654. });
  655. db.Table<Schema::LockChangeRecords>().Key(record.GetLockId(), record.GetLockOffset()).Update(
  656. NIceDb::TUpdate<Schema::LockChangeRecords::PathOwnerId>(record.GetPathId().OwnerId),
  657. NIceDb::TUpdate<Schema::LockChangeRecords::LocalPathId>(record.GetPathId().LocalPathId),
  658. NIceDb::TUpdate<Schema::LockChangeRecords::BodySize>(record.GetBody().size()),
  659. NIceDb::TUpdate<Schema::LockChangeRecords::SchemaVersion>(record.GetSchemaVersion()),
  660. NIceDb::TUpdate<Schema::LockChangeRecords::TableOwnerId>(record.GetTableId().OwnerId),
  661. NIceDb::TUpdate<Schema::LockChangeRecords::TablePathId>(record.GetTableId().LocalPathId));
  662. db.Table<Schema::LockChangeRecordDetails>().Key(record.GetLockId(), record.GetLockOffset()).Update(
  663. NIceDb::TUpdate<Schema::LockChangeRecordDetails::Kind>(record.GetKind()),
  664. NIceDb::TUpdate<Schema::LockChangeRecordDetails::Body>(record.GetBody()),
  665. NIceDb::TUpdate<Schema::LockChangeRecordDetails::Source>(record.GetSource()));
  666. }
  667. }
  668. bool TDataShard::HasLockChangeRecords(ui64 lockId) const {
  669. auto it = LockChangeRecords.find(lockId);
  670. return it != LockChangeRecords.end() && !it->second.Changes.empty();
  671. }
  672. void TDataShard::CommitLockChangeRecords(NIceDb::TNiceDb& db, ui64 lockId, ui64 group, const TRowVersion& rowVersion, TVector<IDataShardChangeCollector::TChange>& collected) {
  673. LOG_DEBUG_S(*TlsActivationContext, NKikimrServices::TX_DATASHARD, "CommitLockChangeRecords"
  674. << ": lockId# " << lockId
  675. << ", group# " << group
  676. << ", version# " << rowVersion
  677. << ", at tablet: " << TabletID());
  678. auto it = LockChangeRecords.find(lockId);
  679. Y_VERIFY_S(it != LockChangeRecords.end() && !it->second.Changes.empty(), "Cannot commit lock " << lockId << " change records: there are no pending change records");
  680. ui64 count = it->second.Changes.back().LockOffset + 1;
  681. ui64 order = AllocateChangeRecordOrder(db, count);
  682. // Transform uncommitted changes into their committed form
  683. collected.reserve(collected.size() + it->second.Changes.size());
  684. for (const auto& change : it->second.Changes) {
  685. auto committed = change;
  686. committed.Order = order + change.LockOffset;
  687. committed.Group = group;
  688. committed.Step = rowVersion.Step;
  689. committed.TxId = rowVersion.TxId;
  690. collected.push_back(committed);
  691. }
  692. Y_VERIFY_S(!CommittedLockChangeRecords.contains(lockId), "Cannot commit lock " << lockId << " more than once");
  693. auto& entry = CommittedLockChangeRecords[lockId];
  694. Y_VERIFY_S(entry.Order == Max<ui64>(), "Cannot commit lock " << lockId << " change records multiple times");
  695. entry.Order = order;
  696. entry.Group = group;
  697. entry.Step = rowVersion.Step;
  698. entry.TxId = rowVersion.TxId;
  699. entry.Count = it->second.Changes.size();
  700. db.Table<Schema::ChangeRecordCommits>().Key(order).Update(
  701. NIceDb::TUpdate<Schema::ChangeRecordCommits::LockId>(lockId),
  702. NIceDb::TUpdate<Schema::ChangeRecordCommits::Group>(group),
  703. NIceDb::TUpdate<Schema::ChangeRecordCommits::PlanStep>(rowVersion.Step),
  704. NIceDb::TUpdate<Schema::ChangeRecordCommits::TxId>(rowVersion.TxId));
  705. db.GetDatabase().OnCommit([this, lockId]() {
  706. // We expect operation to enqueue transformed change records,
  707. // so we no longer need original uncommitted records.
  708. auto it = LockChangeRecords.find(lockId);
  709. Y_VERIFY_S(it != LockChangeRecords.end(), "Unexpected failure to find lockId# " << lockId);
  710. LockChangeRecords.erase(it);
  711. });
  712. db.GetDatabase().OnRollback([this, lockId]() {
  713. CommittedLockChangeRecords.erase(lockId);
  714. });
  715. }
  716. void TDataShard::MoveChangeRecord(NIceDb::TNiceDb& db, ui64 order, const TPathId& pathId) {
  717. LOG_DEBUG_S(*TlsActivationContext, NKikimrServices::TX_DATASHARD, "MoveChangeRecord"
  718. << ": order: " << order
  719. << ": pathId: " << pathId
  720. << ", at tablet: " << TabletID());
  721. db.Table<Schema::ChangeRecords>().Key(order).Update(
  722. NIceDb::TUpdate<Schema::ChangeRecords::PathOwnerId>(pathId.OwnerId),
  723. NIceDb::TUpdate<Schema::ChangeRecords::LocalPathId>(pathId.LocalPathId));
  724. }
  725. void TDataShard::MoveChangeRecord(NIceDb::TNiceDb& db, ui64 lockId, ui64 lockOffset, const TPathId& pathId) {
  726. LOG_DEBUG_S(*TlsActivationContext, NKikimrServices::TX_DATASHARD, "MoveChangeRecord"
  727. << ": lockId: " << lockId
  728. << ", lockOffset: " << lockOffset
  729. << ": pathId: " << pathId
  730. << ", at tablet: " << TabletID());
  731. db.Table<Schema::LockChangeRecords>().Key(lockId, lockOffset).Update(
  732. NIceDb::TUpdate<Schema::LockChangeRecords::PathOwnerId>(pathId.OwnerId),
  733. NIceDb::TUpdate<Schema::LockChangeRecords::LocalPathId>(pathId.LocalPathId));
  734. }
  735. void TDataShard::RemoveChangeRecord(NIceDb::TNiceDb& db, ui64 order) {
  736. LOG_DEBUG_S(*TlsActivationContext, NKikimrServices::TX_DATASHARD, "RemoveChangeRecord"
  737. << ": order: " << order
  738. << ", at tablet: " << TabletID());
  739. auto it = ChangesQueue.find(order);
  740. if (it == ChangesQueue.end()) {
  741. Y_VERIFY_DEBUG_S(false, "Trying to remove non-enqueud record: " << order);
  742. return;
  743. }
  744. const auto& record = it->second;
  745. if (record.LockId) {
  746. db.Table<Schema::LockChangeRecords>().Key(record.LockId, record.LockOffset).Delete();
  747. db.Table<Schema::LockChangeRecordDetails>().Key(record.LockId, record.LockOffset).Delete();
  748. // Delete ChangeRecordCommits row when the last record is removed
  749. auto it = CommittedLockChangeRecords.find(record.LockId);
  750. if (it != CommittedLockChangeRecords.end()) {
  751. Y_DEBUG_ABORT_UNLESS(it->second.Count > 0);
  752. if (it->second.Count > 0 && 0 == --it->second.Count) {
  753. db.Table<Schema::ChangeRecordCommits>().Key(it->second.Order).Delete();
  754. CommittedLockChangeRecords.erase(it);
  755. LockChangeRecords.erase(record.LockId);
  756. }
  757. }
  758. } else {
  759. db.Table<Schema::ChangeRecords>().Key(order).Delete();
  760. db.Table<Schema::ChangeRecordDetails>().Key(order).Delete();
  761. }
  762. Y_ABORT_UNLESS(record.BodySize <= ChangesQueueBytes);
  763. ChangesQueueBytes -= record.BodySize;
  764. if (record.SchemaSnapshotAcquired) {
  765. Y_ABORT_UNLESS(record.TableId);
  766. auto tableIt = TableInfos.find(record.TableId.LocalPathId);
  767. if (tableIt != TableInfos.end()) {
  768. const auto snapshotKey = TSchemaSnapshotKey(record.TableId, record.SchemaVersion);
  769. const bool last = SchemaSnapshotManager.ReleaseReference(snapshotKey);
  770. if (last) {
  771. const auto* snapshot = SchemaSnapshotManager.FindSnapshot(snapshotKey);
  772. Y_ABORT_UNLESS(snapshot);
  773. if (snapshot->Schema->GetTableSchemaVersion() < tableIt->second->GetTableSchemaVersion()) {
  774. SchemaSnapshotManager.RemoveShapshot(db, snapshotKey);
  775. }
  776. }
  777. } else {
  778. Y_DEBUG_ABORT_UNLESS(State == TShardState::PreOffline);
  779. }
  780. }
  781. UpdateChangeExchangeLag(AppData()->TimeProvider->Now());
  782. ChangesQueue.erase(it);
  783. IncCounter(COUNTER_CHANGE_RECORDS_REMOVED);
  784. SetCounter(COUNTER_CHANGE_QUEUE_SIZE, ChangesQueue.size());
  785. CheckChangesQueueNoOverflow();
  786. }
  787. void TDataShard::EnqueueChangeRecords(TVector<IDataShardChangeCollector::TChange>&& records) {
  788. if (!records) {
  789. return;
  790. }
  791. if (OutChangeSenderSuspended) {
  792. LOG_NOTICE_S(*TlsActivationContext, NKikimrServices::TX_DATASHARD, "Cannot enqueue change records"
  793. << ": change sender suspended"
  794. << ", at tablet: " << TabletID()
  795. << ", records: " << JoinSeq(", ", records));
  796. return;
  797. }
  798. LOG_DEBUG_S(*TlsActivationContext, NKikimrServices::TX_DATASHARD, "EnqueueChangeRecords"
  799. << ": at tablet: " << TabletID()
  800. << ", records: " << JoinSeq(", ", records));
  801. const auto now = AppData()->TimeProvider->Now();
  802. TVector<TEvChangeExchange::TEvEnqueueRecords::TRecordInfo> forward(Reserve(records.size()));
  803. for (const auto& record : records) {
  804. forward.emplace_back(record.Order, record.PathId, record.BodySize);
  805. auto res = ChangesQueue.emplace(
  806. std::piecewise_construct,
  807. std::forward_as_tuple(record.Order),
  808. std::forward_as_tuple(record, now)
  809. );
  810. if (res.second) {
  811. ChangesList.PushBack(&res.first->second);
  812. Y_ABORT_UNLESS(ChangesQueueBytes <= (Max<ui64>() - record.BodySize));
  813. ChangesQueueBytes += record.BodySize;
  814. if (record.SchemaVersion) {
  815. res.first->second.SchemaSnapshotAcquired = SchemaSnapshotManager.AcquireReference(
  816. TSchemaSnapshotKey(record.TableId, record.SchemaVersion));
  817. }
  818. }
  819. }
  820. UpdateChangeExchangeLag(now);
  821. IncCounter(COUNTER_CHANGE_RECORDS_ENQUEUED, forward.size());
  822. SetCounter(COUNTER_CHANGE_QUEUE_SIZE, ChangesQueue.size());
  823. Y_ABORT_UNLESS(OutChangeSender);
  824. Send(OutChangeSender, new TEvChangeExchange::TEvEnqueueRecords(std::move(forward)));
  825. }
  826. void TDataShard::UpdateChangeExchangeLag(TInstant now) {
  827. if (!ChangesList.Empty()) {
  828. const auto* front = ChangesList.Front();
  829. SetCounter(COUNTER_CHANGE_DATA_LAG, Max(now - front->CreatedAt, TDuration::Zero()).MilliSeconds());
  830. SetCounter(COUNTER_CHANGE_DELIVERY_LAG, (now - front->EnqueuedAt).MilliSeconds());
  831. } else {
  832. SetCounter(COUNTER_CHANGE_DATA_LAG, 0);
  833. SetCounter(COUNTER_CHANGE_DELIVERY_LAG, 0);
  834. }
  835. }
  836. void TDataShard::CreateChangeSender(const TActorContext& ctx) {
  837. Y_ABORT_UNLESS(!OutChangeSender);
  838. OutChangeSender = Register(NDataShard::CreateChangeSender(this));
  839. LOG_DEBUG_S(ctx, NKikimrServices::TX_DATASHARD, "Change sender created"
  840. << ": at tablet: " << TabletID()
  841. << ", actorId: " << OutChangeSender);
  842. }
  843. void TDataShard::MaybeActivateChangeSender(const TActorContext& ctx) {
  844. LOG_DEBUG_S(ctx, NKikimrServices::TX_DATASHARD, "Trying to activate change sender"
  845. << ": at tablet: " << TabletID());
  846. OutChangeSenderSuspended = false;
  847. if (ReceiveActivationsFrom) {
  848. LOG_NOTICE_S(ctx, NKikimrServices::TX_DATASHARD, "Cannot activate change sender"
  849. << ": at tablet: " << TabletID()
  850. << ", wait to activation from: " << JoinSeq(", ", ReceiveActivationsFrom));
  851. return;
  852. }
  853. switch (State) {
  854. case TShardState::WaitScheme:
  855. case TShardState::SplitDstReceivingSnapshot:
  856. case TShardState::Offline:
  857. LOG_INFO_S(ctx, NKikimrServices::TX_DATASHARD, "Cannot activate change sender"
  858. << ": at tablet: " << TabletID()
  859. << ", state: " << DatashardStateName(State));
  860. return;
  861. case TShardState::SplitSrcMakeSnapshot:
  862. case TShardState::SplitSrcSendingSnapshot:
  863. case TShardState::SplitSrcWaitForPartitioningChanged:
  864. case TShardState::PreOffline:
  865. if (!ChangesQueue) {
  866. LOG_INFO_S(ctx, NKikimrServices::TX_DATASHARD, "Cannot activate change sender"
  867. << ": at tablet: " << TabletID()
  868. << ", state: " << DatashardStateName(State)
  869. << ", queue size: " << ChangesQueue.size());
  870. return;
  871. }
  872. break;
  873. }
  874. Y_ABORT_UNLESS(OutChangeSender);
  875. Send(OutChangeSender, new TEvChangeExchange::TEvActivateSender());
  876. LOG_INFO_S(ctx, NKikimrServices::TX_DATASHARD, "Change sender activated"
  877. << ": at tablet: " << TabletID());
  878. }
  879. void TDataShard::KillChangeSender(const TActorContext& ctx) {
  880. if (OutChangeSender) {
  881. Send(std::exchange(OutChangeSender, TActorId()), new TEvents::TEvPoison());
  882. LOG_INFO_S(ctx, NKikimrServices::TX_DATASHARD, "Change sender killed"
  883. << ": at tablet: " << TabletID());
  884. }
  885. }
  886. void TDataShard::SuspendChangeSender(const TActorContext& ctx) {
  887. KillChangeSender(ctx);
  888. OutChangeSenderSuspended = true;
  889. }
  890. bool TDataShard::LoadChangeRecords(NIceDb::TNiceDb& db, TVector<IDataShardChangeCollector::TChange>& records) {
  891. using Schema = TDataShard::Schema;
  892. LOG_DEBUG_S(*TlsActivationContext, NKikimrServices::TX_DATASHARD, "LoadChangeRecords"
  893. << ": QueueSize: " << ChangesQueue.size()
  894. << ", at tablet: " << TabletID());
  895. records.reserve(ChangesQueue.size());
  896. auto rowset = db.Table<Schema::ChangeRecords>().Range().Select();
  897. if (!rowset.IsReady()) {
  898. return false;
  899. }
  900. while (!rowset.EndOfSet()) {
  901. const ui64 order = rowset.GetValue<Schema::ChangeRecords::Order>();
  902. const ui64 group = rowset.GetValue<Schema::ChangeRecords::Group>();
  903. const ui64 step = rowset.GetValue<Schema::ChangeRecords::PlanStep>();
  904. const ui64 txId = rowset.GetValue<Schema::ChangeRecords::TxId>();
  905. const ui64 bodySize = rowset.GetValue<Schema::ChangeRecords::BodySize>();
  906. const ui64 schemaVersion = rowset.GetValue<Schema::ChangeRecords::SchemaVersion>();
  907. const auto pathId = TPathId(
  908. rowset.GetValue<Schema::ChangeRecords::PathOwnerId>(),
  909. rowset.GetValue<Schema::ChangeRecords::LocalPathId>()
  910. );
  911. const auto tableId = TPathId(
  912. rowset.GetValue<Schema::ChangeRecords::TableOwnerId>(),
  913. rowset.GetValue<Schema::ChangeRecords::TablePathId>()
  914. );
  915. records.push_back(IDataShardChangeCollector::TChange{
  916. .Order = order,
  917. .Group = group,
  918. .Step = step,
  919. .TxId = txId,
  920. .PathId = pathId,
  921. .BodySize = bodySize,
  922. .TableId = tableId,
  923. .SchemaVersion = schemaVersion,
  924. });
  925. if (!rowset.Next()) {
  926. return false;
  927. }
  928. }
  929. return true;
  930. }
  931. bool TDataShard::LoadLockChangeRecords(NIceDb::TNiceDb& db) {
  932. using Schema = TDataShard::Schema;
  933. LOG_DEBUG_S(*TlsActivationContext, NKikimrServices::TX_DATASHARD, "LoadLockChangeRecords"
  934. << " at tablet: " << TabletID());
  935. auto rowset = db.Table<Schema::LockChangeRecords>().Range().Select();
  936. if (!rowset.IsReady()) {
  937. return false;
  938. }
  939. while (!rowset.EndOfSet()) {
  940. const ui64 lockId = rowset.GetValue<Schema::LockChangeRecords::LockId>();
  941. const ui64 lockOffset = rowset.GetValue<Schema::LockChangeRecords::LockOffset>();
  942. const ui64 bodySize = rowset.GetValue<Schema::LockChangeRecords::BodySize>();
  943. const ui64 schemaVersion = rowset.GetValue<Schema::LockChangeRecords::SchemaVersion>();
  944. const auto pathId = TPathId(
  945. rowset.GetValue<Schema::LockChangeRecords::PathOwnerId>(),
  946. rowset.GetValue<Schema::LockChangeRecords::LocalPathId>()
  947. );
  948. const auto tableId = TPathId(
  949. rowset.GetValue<Schema::LockChangeRecords::TableOwnerId>(),
  950. rowset.GetValue<Schema::LockChangeRecords::TablePathId>()
  951. );
  952. auto& state = LockChangeRecords[lockId];
  953. state.Changes.push_back(IDataShardChangeCollector::TChange{
  954. .Order = Max<ui64>(),
  955. .Group = 0,
  956. .Step = 0,
  957. .TxId = 0,
  958. .PathId = pathId,
  959. .BodySize = bodySize,
  960. .TableId = tableId,
  961. .SchemaVersion = schemaVersion,
  962. .LockId = lockId,
  963. .LockOffset = lockOffset,
  964. });
  965. state.PersistentCount = state.Changes.size();
  966. if (!rowset.Next()) {
  967. return false;
  968. }
  969. }
  970. return true;
  971. }
  972. bool TDataShard::LoadChangeRecordCommits(NIceDb::TNiceDb& db, TVector<IDataShardChangeCollector::TChange>& records) {
  973. using Schema = TDataShard::Schema;
  974. LOG_DEBUG_S(*TlsActivationContext, NKikimrServices::TX_DATASHARD, "LoadChangeRecordCommits"
  975. << " at tablet: " << TabletID());
  976. bool needSort = false;
  977. auto rowset = db.Table<Schema::ChangeRecordCommits>().Range().Select();
  978. if (!rowset.IsReady()) {
  979. return false;
  980. }
  981. while (!rowset.EndOfSet()) {
  982. const ui64 order = rowset.GetValue<Schema::ChangeRecordCommits::Order>();
  983. const ui64 lockId = rowset.GetValue<Schema::ChangeRecordCommits::LockId>();
  984. const ui64 group = rowset.GetValue<Schema::ChangeRecordCommits::Group>();
  985. const ui64 step = rowset.GetValue<Schema::ChangeRecordCommits::PlanStep>();
  986. const ui64 txId = rowset.GetValue<Schema::ChangeRecordCommits::TxId>();
  987. auto& entry = CommittedLockChangeRecords[lockId];
  988. entry.Order = order;
  989. entry.Group = group;
  990. entry.Step = step;
  991. entry.TxId = txId;
  992. for (auto& record : LockChangeRecords[lockId].Changes) {
  993. records.push_back(IDataShardChangeCollector::TChange{
  994. .Order = order + record.LockOffset,
  995. .Group = group,
  996. .Step = step,
  997. .TxId = txId,
  998. .PathId = record.PathId,
  999. .BodySize = record.BodySize,
  1000. .TableId = record.TableId,
  1001. .SchemaVersion = record.SchemaVersion,
  1002. .LockId = record.LockId,
  1003. .LockOffset = record.LockOffset,
  1004. });
  1005. entry.Count++;
  1006. needSort = true;
  1007. }
  1008. LockChangeRecords.erase(lockId);
  1009. if (!rowset.Next()) {
  1010. return false;
  1011. }
  1012. }
  1013. if (needSort) {
  1014. std::sort(records.begin(), records.end(), [](const auto& a, const auto& b) -> bool {
  1015. return a.Order < b.Order;
  1016. });
  1017. }
  1018. return true;
  1019. }
  1020. void TDataShard::ScheduleRemoveLockChanges(ui64 lockId) {
  1021. if (LockChangeRecords.contains(lockId) && !CommittedLockChangeRecords.contains(lockId)) {
  1022. bool wasEmpty = PendingLockChangeRecordsToRemove.empty();
  1023. PendingLockChangeRecordsToRemove.push_back(lockId);
  1024. if (wasEmpty) {
  1025. Send(SelfId(), new TEvPrivate::TEvRemoveLockChangeRecords);
  1026. }
  1027. }
  1028. }
  1029. void TDataShard::ScheduleRemoveAbandonedLockChanges() {
  1030. bool wasEmpty = PendingLockChangeRecordsToRemove.empty();
  1031. for (const auto& pr : LockChangeRecords) {
  1032. ui64 lockId = pr.first;
  1033. if (CommittedLockChangeRecords.contains(lockId)) {
  1034. // Skip committed lock changes
  1035. continue;
  1036. }
  1037. auto lock = SysLocksTable().GetRawLock(lockId);
  1038. if (lock && lock->IsPersistent()) {
  1039. // Skip lock changes attached to persistent locks
  1040. continue;
  1041. }
  1042. if (auto* info = VolatileTxManager.FindByCommitTxId(lockId)) {
  1043. // Skip lock changes attached to volatile transactions
  1044. continue;
  1045. }
  1046. PendingLockChangeRecordsToRemove.push_back(lockId);
  1047. }
  1048. if (wasEmpty && !PendingLockChangeRecordsToRemove.empty()) {
  1049. Send(SelfId(), new TEvPrivate::TEvRemoveLockChangeRecords);
  1050. }
  1051. }
  1052. void TDataShard::PersistSchemeTxResult(NIceDb::TNiceDb &db, const TSchemaOperation &op) {
  1053. db.Table<Schema::SchemaOperations>().Key(op.TxId).Update(
  1054. NIceDb::TUpdate<Schema::SchemaOperations::Success>(op.Success),
  1055. NIceDb::TUpdate<Schema::SchemaOperations::Error>(op.Error),
  1056. NIceDb::TUpdate<Schema::SchemaOperations::DataSize>(op.BytesProcessed),
  1057. NIceDb::TUpdate<Schema::SchemaOperations::Rows>(op.RowsProcessed)
  1058. );
  1059. }
  1060. void TDataShard::NotifySchemeshard(const TActorContext& ctx, ui64 txId) {
  1061. if (!txId) {
  1062. for (const auto& op : TransQueue.GetSchemaOperations())
  1063. NotifySchemeshard(ctx, op.first);
  1064. return;
  1065. }
  1066. TSchemaOperation * op = TransQueue.FindSchemaTx(txId);
  1067. if (!op || !op->Done)
  1068. return;
  1069. LOG_INFO_S(ctx, NKikimrServices::TX_DATASHARD,
  1070. TabletID() << " Sending notify to schemeshard " << op->TabletId
  1071. << " txId " << txId << " state " << DatashardStateName(State) << " TxInFly " << TxInFly());
  1072. if (op->IsDrop()) {
  1073. Y_VERIFY_S(State == TShardState::PreOffline,
  1074. TabletID() << " is in wrong state (" << State << ") for drop");
  1075. Y_VERIFY_S(!TxInFly(),
  1076. TabletID() << " has " << TxInFly() << " txs in-fly "
  1077. << TransQueue.TxInFlyToString());
  1078. }
  1079. THolder<TEvDataShard::TEvSchemaChanged> event =
  1080. THolder(new TEvDataShard::TEvSchemaChanged(ctx.SelfID, TabletID(), State, op->TxId, op->PlanStep, Generation()));
  1081. switch (op->Type) {
  1082. case TSchemaOperation::ETypeBackup:
  1083. case TSchemaOperation::ETypeRestore: {
  1084. auto* result = event->Record.MutableOpResult();
  1085. result->SetSuccess(op->Success);
  1086. result->SetExplain(op->Error);
  1087. result->SetBytesProcessed(op->BytesProcessed);
  1088. result->SetRowsProcessed(op->RowsProcessed);
  1089. break;
  1090. }
  1091. default:
  1092. break;
  1093. }
  1094. SendViaSchemeshardPipe(ctx, op->TabletId, THolder(event.Release()));
  1095. }
  1096. bool TDataShard::CheckMediatorAuthorisation(ui64 mediatorId) {
  1097. if (!ProcessingParams || 0 == ProcessingParams->MediatorsSize()) {
  1098. return true;
  1099. }
  1100. auto it = std::find(ProcessingParams->GetMediators().begin(),
  1101. ProcessingParams->GetMediators().end(),
  1102. mediatorId);
  1103. return it != ProcessingParams->GetMediators().end();
  1104. }
  1105. void TDataShard::PersistSys(NIceDb::TNiceDb &db, ui64 key, const TString &value) const {
  1106. db.Table<Schema::Sys>().Key(key).Update(NIceDb::TUpdate<Schema::Sys::Bytes>(value));
  1107. }
  1108. void TDataShard::PersistSys(NIceDb::TNiceDb& db, ui64 key, ui64 value) const {
  1109. db.Table<Schema::Sys>().Key(key).Update(NIceDb::TUpdate<Schema::Sys::Uint64>(value));
  1110. }
  1111. void TDataShard::PersistSys(NIceDb::TNiceDb& db, ui64 key, ui32 value) const {
  1112. db.Table<Schema::Sys>().Key(key).Update(NIceDb::TUpdate<Schema::Sys::Uint64>(value));
  1113. }
  1114. void TDataShard::PersistSys(NIceDb::TNiceDb& db, ui64 key, bool value) const {
  1115. db.Table<Schema::Sys>().Key(key).Update(NIceDb::TUpdate<Schema::Sys::Uint64>(value ? 1 : 0));
  1116. }
  1117. void TDataShard::PersistUserTable(NIceDb::TNiceDb& db, ui64 tableId, const TUserTable& tableInfo) {
  1118. db.Table<Schema::UserTables>().Key(tableId).Update(
  1119. NIceDb::TUpdate<Schema::UserTables::LocalTid>(tableInfo.LocalTid),
  1120. NIceDb::TUpdate<Schema::UserTables::ShadowTid>(tableInfo.ShadowTid),
  1121. NIceDb::TUpdate<Schema::UserTables::Schema>(tableInfo.GetSchema()));
  1122. }
  1123. void TDataShard::PersistUserTableFullCompactionTs(NIceDb::TNiceDb& db, ui64 tableId, ui64 ts) {
  1124. db.Table<Schema::UserTablesStats>().Key(tableId).Update<Schema::UserTablesStats::FullCompactionTs>(ts);
  1125. }
  1126. void TDataShard::PersistMoveUserTable(NIceDb::TNiceDb& db, ui64 prevTableId, ui64 tableId, const TUserTable& tableInfo) {
  1127. db.Table<Schema::UserTables>().Key(prevTableId).Delete();
  1128. PersistUserTable(db, tableId, tableInfo);
  1129. db.Table<Schema::UserTablesStats>().Key(prevTableId).Delete();
  1130. if (tableInfo.Stats.LastFullCompaction) {
  1131. PersistUserTableFullCompactionTs(db, tableId, tableInfo.Stats.LastFullCompaction.Seconds());
  1132. }
  1133. }
  1134. TUserTable::TPtr TDataShard::AlterTableSchemaVersion(
  1135. const TActorContext&, TTransactionContext& txc,
  1136. const TPathId& pathId, const ui64 tableSchemaVersion, bool persist)
  1137. {
  1138. Y_ABORT_UNLESS(GetPathOwnerId() == pathId.OwnerId);
  1139. ui64 tableId = pathId.LocalPathId;
  1140. Y_ABORT_UNLESS(TableInfos.contains(tableId));
  1141. auto oldTableInfo = TableInfos[tableId];
  1142. Y_ABORT_UNLESS(oldTableInfo);
  1143. TUserTable::TPtr newTableInfo = new TUserTable(*oldTableInfo);
  1144. newTableInfo->SetTableSchemaVersion(tableSchemaVersion);
  1145. Y_VERIFY_DEBUG_S(oldTableInfo->GetTableSchemaVersion() < newTableInfo->GetTableSchemaVersion(),
  1146. "pathId " << pathId
  1147. << "old version " << oldTableInfo->GetTableSchemaVersion()
  1148. << "new version " << newTableInfo->GetTableSchemaVersion());
  1149. if (persist) {
  1150. NIceDb::TNiceDb db(txc.DB);
  1151. PersistUserTable(db, tableId, *newTableInfo);
  1152. }
  1153. return newTableInfo;
  1154. }
  1155. TUserTable::TPtr TDataShard::AlterTableAddIndex(
  1156. const TActorContext& ctx, TTransactionContext& txc,
  1157. const TPathId& pathId, ui64 tableSchemaVersion,
  1158. const NKikimrSchemeOp::TIndexDescription& indexDesc)
  1159. {
  1160. auto tableInfo = AlterTableSchemaVersion(ctx, txc, pathId, tableSchemaVersion, false);
  1161. tableInfo->AddIndex(indexDesc);
  1162. NIceDb::TNiceDb db(txc.DB);
  1163. PersistUserTable(db, pathId.LocalPathId, *tableInfo);
  1164. return tableInfo;
  1165. }
  1166. TUserTable::TPtr TDataShard::AlterTableDropIndex(
  1167. const TActorContext& ctx, TTransactionContext& txc,
  1168. const TPathId& pathId, ui64 tableSchemaVersion,
  1169. const TPathId& indexPathId)
  1170. {
  1171. auto tableInfo = AlterTableSchemaVersion(ctx, txc, pathId, tableSchemaVersion, false);
  1172. tableInfo->DropIndex(indexPathId);
  1173. NIceDb::TNiceDb db(txc.DB);
  1174. PersistUserTable(db, pathId.LocalPathId, *tableInfo);
  1175. return tableInfo;
  1176. }
  1177. TUserTable::TPtr TDataShard::AlterTableAddCdcStream(
  1178. const TActorContext& ctx, TTransactionContext& txc,
  1179. const TPathId& pathId, ui64 tableSchemaVersion,
  1180. const NKikimrSchemeOp::TCdcStreamDescription& streamDesc)
  1181. {
  1182. auto tableInfo = AlterTableSchemaVersion(ctx, txc, pathId, tableSchemaVersion, false);
  1183. tableInfo->AddCdcStream(streamDesc);
  1184. NIceDb::TNiceDb db(txc.DB);
  1185. PersistUserTable(db, pathId.LocalPathId, *tableInfo);
  1186. return tableInfo;
  1187. }
  1188. TUserTable::TPtr TDataShard::AlterTableSwitchCdcStreamState(
  1189. const TActorContext& ctx, TTransactionContext& txc,
  1190. const TPathId& pathId, ui64 tableSchemaVersion,
  1191. const TPathId& streamPathId, NKikimrSchemeOp::ECdcStreamState state)
  1192. {
  1193. auto tableInfo = AlterTableSchemaVersion(ctx, txc, pathId, tableSchemaVersion, false);
  1194. tableInfo->SwitchCdcStreamState(streamPathId, state);
  1195. NIceDb::TNiceDb db(txc.DB);
  1196. PersistUserTable(db, pathId.LocalPathId, *tableInfo);
  1197. return tableInfo;
  1198. }
  1199. TUserTable::TPtr TDataShard::AlterTableDropCdcStream(
  1200. const TActorContext& ctx, TTransactionContext& txc,
  1201. const TPathId& pathId, ui64 tableSchemaVersion,
  1202. const TPathId& streamPathId)
  1203. {
  1204. auto tableInfo = AlterTableSchemaVersion(ctx, txc, pathId, tableSchemaVersion, false);
  1205. tableInfo->DropCdcStream(streamPathId);
  1206. NIceDb::TNiceDb db(txc.DB);
  1207. PersistUserTable(db, pathId.LocalPathId, *tableInfo);
  1208. return tableInfo;
  1209. }
  1210. void TDataShard::AddSchemaSnapshot(const TPathId& pathId, ui64 tableSchemaVersion, ui64 step, ui64 txId,
  1211. TTransactionContext& txc, const TActorContext& ctx)
  1212. {
  1213. LOG_DEBUG_S(ctx, NKikimrServices::TX_DATASHARD, "Add schema snapshot"
  1214. << ": pathId# " << pathId
  1215. << ", version# " << tableSchemaVersion
  1216. << ", step# " << step
  1217. << ", txId# " << txId
  1218. << ", at tablet# " << TabletID());
  1219. Y_ABORT_UNLESS(GetPathOwnerId() == pathId.OwnerId);
  1220. Y_ABORT_UNLESS(TableInfos.contains(pathId.LocalPathId));
  1221. auto tableInfo = TableInfos[pathId.LocalPathId];
  1222. const auto key = TSchemaSnapshotKey(pathId.OwnerId, pathId.LocalPathId, tableSchemaVersion);
  1223. SchemaSnapshotManager.AddSnapshot(txc.DB, key, TSchemaSnapshot(tableInfo, step, txId));
  1224. }
  1225. void TDataShard::PersistLastLoanTableTid(NIceDb::TNiceDb& db, ui32 localTid) {
  1226. LastLoanTableTid = localTid;
  1227. PersistSys(db, Schema::Sys_LastLoanTableTid, LastLoanTableTid);
  1228. }
  1229. TUserTable::TPtr TDataShard::CreateUserTable(TTransactionContext& txc,
  1230. const NKikimrSchemeOp::TTableDescription& tableScheme)
  1231. {
  1232. const TString mainTableName = TDataShard::Schema::UserTablePrefix + tableScheme.GetName();
  1233. ui64 tableId = tableScheme.GetId_Deprecated();
  1234. if (tableScheme.HasPathId()) {
  1235. Y_ABORT_UNLESS(GetPathOwnerId() == tableScheme.GetPathId().GetOwnerId() || GetPathOwnerId() == INVALID_TABLET_ID);
  1236. tableId = tableScheme.GetPathId().GetLocalId();
  1237. }
  1238. ui32 localTid = ++LastLocalTid;
  1239. ui32 shadowTid = tableScheme.GetPartitionConfig().GetShadowData() ? ++LastLocalTid : 0;
  1240. TUserTable::TPtr tableInfo = new TUserTable(localTid, tableScheme, shadowTid);
  1241. tableInfo->ApplyCreate(txc, mainTableName, tableScheme.GetPartitionConfig());
  1242. if (shadowTid) {
  1243. const TString shadowTableName = TDataShard::Schema::ShadowTablePrefix + tableScheme.GetName();
  1244. tableInfo->ApplyCreateShadow(txc, shadowTableName, tableScheme.GetPartitionConfig());
  1245. }
  1246. NIceDb::TNiceDb db(txc.DB);
  1247. auto& partConfig = tableScheme.GetPartitionConfig();
  1248. if (partConfig.HasTxReadSizeLimit()) {
  1249. TxReadSizeLimit = partConfig.GetTxReadSizeLimit();
  1250. PersistSys(db, Schema::Sys_TxReadSizeLimit, TxReadSizeLimit);
  1251. }
  1252. if (partConfig.HasDisableStatisticsCalculation()) {
  1253. StatisticsDisabled = partConfig.GetDisableStatisticsCalculation() ? 1 : 0;
  1254. PersistSys(db, Schema::Sys_StatisticsDisabled, StatisticsDisabled);
  1255. }
  1256. Pipeline.UpdateConfig(db, partConfig.GetPipelineConfig());
  1257. if (partConfig.HasKeepSnapshotTimeout())
  1258. SnapshotManager.SetKeepSnapshotTimeout(db, partConfig.GetKeepSnapshotTimeout());
  1259. PersistSys(db, Schema::Sys_LastLocalTid, LastLocalTid);
  1260. PersistUserTable(db, tableId, *tableInfo);
  1261. return tableInfo;
  1262. }
  1263. THashMap<TPathId, TPathId> TDataShard::GetRemapIndexes(const NKikimrTxDataShard::TMoveTable& move) {
  1264. THashMap<TPathId, TPathId> remap;
  1265. for (const auto& item: move.GetReMapIndexes()) {
  1266. const auto prevId = PathIdFromPathId(item.GetSrcPathId());
  1267. const auto newId = PathIdFromPathId(item.GetDstPathId());
  1268. remap[prevId] = newId;
  1269. }
  1270. return remap;
  1271. }
  1272. TUserTable::TPtr TDataShard::MoveUserTable(TOperation::TPtr op, const NKikimrTxDataShard::TMoveTable& move,
  1273. const TActorContext& ctx, TTransactionContext& txc)
  1274. {
  1275. const auto prevId = PathIdFromPathId(move.GetPathId());
  1276. const auto newId = PathIdFromPathId(move.GetDstPathId());
  1277. Y_ABORT_UNLESS(GetPathOwnerId() == prevId.OwnerId);
  1278. Y_ABORT_UNLESS(TableInfos.contains(prevId.LocalPathId));
  1279. const auto version = move.GetTableSchemaVersion();
  1280. Y_ABORT_UNLESS(version);
  1281. auto newTableInfo = AlterTableSchemaVersion(ctx, txc, prevId, version, false);
  1282. newTableInfo->SetPath(move.GetDstPath());
  1283. const THashMap<TPathId, TPathId> remap = GetRemapIndexes(move);
  1284. NKikimrSchemeOp::TTableDescription schema;
  1285. newTableInfo->GetSchema(schema);
  1286. for (auto& indexDesc: *schema.MutableTableIndexes()) {
  1287. Y_ABORT_UNLESS(indexDesc.HasPathOwnerId() && indexDesc.HasLocalPathId());
  1288. auto prevPathId = TPathId(indexDesc.GetPathOwnerId(), indexDesc.GetLocalPathId());
  1289. Y_VERIFY_S(remap.contains(prevPathId), "no rule how to move index with pathId " << prevPathId); // we should remap all indexes
  1290. auto newPathId = remap.at(prevPathId);
  1291. indexDesc.SetPathOwnerId(newPathId.OwnerId);
  1292. indexDesc.SetLocalPathId(newPathId.LocalPathId);
  1293. newTableInfo->Indexes[newPathId] = newTableInfo->Indexes[prevPathId];
  1294. newTableInfo->Indexes.erase(prevPathId);
  1295. }
  1296. newTableInfo->SetSchema(schema);
  1297. //NOTE: Stats building is bound to table id, but move-table changes table id,
  1298. // so already built stats couldn't be inherited by moved table
  1299. // and have to be rebuilt from the ground up
  1300. newTableInfo->StatsUpdateInProgress = false;
  1301. newTableInfo->StatsNeedUpdate = true;
  1302. RemoveUserTable(prevId);
  1303. AddUserTable(newId, newTableInfo);
  1304. for (auto& [_, record] : ChangesQueue) {
  1305. if (record.TableId == prevId) {
  1306. record.TableId = newId;
  1307. }
  1308. }
  1309. SnapshotManager.RenameSnapshots(txc.DB, prevId, newId);
  1310. SchemaSnapshotManager.RenameSnapshots(txc.DB, prevId, newId);
  1311. if (newTableInfo->NeedSchemaSnapshots()) {
  1312. AddSchemaSnapshot(newId, version, op->GetStep(), op->GetTxId(), txc, ctx);
  1313. }
  1314. NIceDb::TNiceDb db(txc.DB);
  1315. PersistMoveUserTable(db, prevId.LocalPathId, newId.LocalPathId, *newTableInfo);
  1316. PersistOwnerPathId(newId.OwnerId, txc);
  1317. return newTableInfo;
  1318. }
  1319. TUserTable::TPtr TDataShard::MoveUserIndex(TOperation::TPtr op, const NKikimrTxDataShard::TMoveIndex& move,
  1320. const TActorContext& ctx, TTransactionContext& txc)
  1321. {
  1322. const auto pathId = PathIdFromPathId(move.GetPathId());
  1323. Y_ABORT_UNLESS(GetPathOwnerId() == pathId.OwnerId);
  1324. Y_ABORT_UNLESS(TableInfos.contains(pathId.LocalPathId));
  1325. const auto version = move.GetTableSchemaVersion();
  1326. Y_ABORT_UNLESS(version);
  1327. auto newTableInfo = AlterTableSchemaVersion(ctx, txc, pathId, version, false);
  1328. NKikimrSchemeOp::TTableDescription schema;
  1329. newTableInfo->GetSchema(schema);
  1330. if (move.GetReMapIndex().HasReplacedPathId()) {
  1331. const auto oldPathId = PathIdFromPathId(move.GetReMapIndex().GetReplacedPathId());
  1332. newTableInfo->Indexes.erase(oldPathId);
  1333. size_t id = 0;
  1334. bool found = false;
  1335. for (auto& indexDesc: *schema.MutableTableIndexes()) {
  1336. Y_ABORT_UNLESS(indexDesc.HasPathOwnerId() && indexDesc.HasLocalPathId());
  1337. auto pathId = TPathId(indexDesc.GetPathOwnerId(), indexDesc.GetLocalPathId());
  1338. if (oldPathId == pathId) {
  1339. found = true;
  1340. break;
  1341. } else {
  1342. id++;
  1343. }
  1344. }
  1345. if (found) {
  1346. schema.MutableTableIndexes()->DeleteSubrange(id, 1);
  1347. }
  1348. }
  1349. const auto remapPrevId = PathIdFromPathId(move.GetReMapIndex().GetSrcPathId());
  1350. const auto remapNewId = PathIdFromPathId(move.GetReMapIndex().GetDstPathId());
  1351. Y_ABORT_UNLESS(move.GetReMapIndex().HasDstName());
  1352. const auto dstIndexName = move.GetReMapIndex().GetDstName();
  1353. for (auto& indexDesc: *schema.MutableTableIndexes()) {
  1354. Y_ABORT_UNLESS(indexDesc.HasPathOwnerId() && indexDesc.HasLocalPathId());
  1355. auto prevPathId = TPathId(indexDesc.GetPathOwnerId(), indexDesc.GetLocalPathId());
  1356. if (remapPrevId != prevPathId) {
  1357. continue;
  1358. }
  1359. indexDesc.SetPathOwnerId(remapNewId.OwnerId);
  1360. indexDesc.SetLocalPathId(remapNewId.LocalPathId);
  1361. newTableInfo->Indexes[remapNewId] = newTableInfo->Indexes[prevPathId];
  1362. newTableInfo->Indexes.erase(prevPathId);
  1363. Y_ABORT_UNLESS(move.GetReMapIndex().HasDstName());
  1364. indexDesc.SetName(dstIndexName);
  1365. newTableInfo->Indexes[remapNewId].Name = dstIndexName;
  1366. }
  1367. newTableInfo->SetSchema(schema);
  1368. AddUserTable(pathId, newTableInfo);
  1369. if (newTableInfo->NeedSchemaSnapshots()) {
  1370. AddSchemaSnapshot(pathId, version, op->GetStep(), op->GetTxId(), txc, ctx);
  1371. }
  1372. NIceDb::TNiceDb db(txc.DB);
  1373. PersistUserTable(db, pathId.LocalPathId, *newTableInfo);
  1374. return newTableInfo;
  1375. }
  1376. TUserTable::TPtr TDataShard::AlterUserTable(const TActorContext& ctx, TTransactionContext& txc,
  1377. const NKikimrSchemeOp::TTableDescription& alter)
  1378. {
  1379. ui64 tableId = alter.GetId_Deprecated();
  1380. if (alter.HasPathId()) {
  1381. Y_ABORT_UNLESS(GetPathOwnerId() == alter.GetPathId().GetOwnerId());
  1382. tableId = alter.GetPathId().GetLocalId();
  1383. }
  1384. TUserTable::TCPtr oldTable = TableInfos[tableId];
  1385. Y_ABORT_UNLESS(oldTable);
  1386. TUserTable::TPtr tableInfo = new TUserTable(*oldTable, alter);
  1387. TString strError;
  1388. tableInfo->ApplyAlter(txc, *oldTable, alter, strError);
  1389. if (strError) {
  1390. LOG_ERROR(ctx, NKikimrServices::TX_DATASHARD,
  1391. "Cannot alter datashard %" PRIu64 " for table %" PRIu64 ": %s",
  1392. TabletID(), tableId, strError.data());
  1393. }
  1394. NIceDb::TNiceDb db(txc.DB);
  1395. if (alter.HasPartitionConfig()) {
  1396. // We are going to update table schema and save it
  1397. NKikimrSchemeOp::TTableDescription tableDescr;
  1398. tableInfo->GetSchema(tableDescr);
  1399. const auto& configDelta = alter.GetPartitionConfig();
  1400. auto& config = *tableDescr.MutablePartitionConfig();
  1401. if (configDelta.HasFreezeState()) {
  1402. auto cmd = configDelta.GetFreezeState();
  1403. State = cmd == NKikimrSchemeOp::EFreezeState::Freeze ? TShardState::Frozen : TShardState::Ready;
  1404. PersistSys(db, Schema::Sys_State, State);
  1405. }
  1406. if (configDelta.HasTxReadSizeLimit()) {
  1407. config.SetTxReadSizeLimit(configDelta.GetTxReadSizeLimit());
  1408. TxReadSizeLimit = configDelta.GetTxReadSizeLimit();
  1409. PersistSys(db, Schema::Sys_TxReadSizeLimit, TxReadSizeLimit);
  1410. }
  1411. if (configDelta.HasDisableStatisticsCalculation()) {
  1412. StatisticsDisabled = configDelta.GetDisableStatisticsCalculation() ? 1 : 0;
  1413. PersistSys(db, Schema::Sys_StatisticsDisabled, StatisticsDisabled);
  1414. }
  1415. if (configDelta.HasPipelineConfig()) {
  1416. config.ClearPipelineConfig();
  1417. config.MutablePipelineConfig()->CopyFrom(configDelta.GetPipelineConfig());
  1418. Pipeline.UpdateConfig(db, configDelta.GetPipelineConfig());
  1419. }
  1420. tableInfo->SetSchema(tableDescr);
  1421. if (configDelta.HasKeepSnapshotTimeout())
  1422. SnapshotManager.SetKeepSnapshotTimeout(db, configDelta.GetKeepSnapshotTimeout());
  1423. }
  1424. PersistUserTable(db, tableId, *tableInfo);
  1425. return tableInfo;
  1426. }
  1427. void TDataShard::DropUserTable(TTransactionContext& txc, ui64 tableId) {
  1428. auto ti = TableInfos.find(tableId);
  1429. Y_ABORT_UNLESS(ti != TableInfos.end(), "Table with id %" PRIu64 " doesn't exist on this datashard", tableId);
  1430. NIceDb::TNiceDb db(txc.DB);
  1431. txc.DB.NoMoreReadsForTx();
  1432. txc.DB.Alter().DropTable(ti->second->LocalTid);
  1433. if (ti->second->ShadowTid) {
  1434. txc.DB.Alter().DropTable(ti->second->ShadowTid);
  1435. }
  1436. db.Table<Schema::UserTables>().Key(ti->first).Delete();
  1437. db.Table<Schema::UserTablesStats>().Key(ti->first).Delete();
  1438. TableInfos.erase(ti);
  1439. }
  1440. void TDataShard::DropAllUserTables(TTransactionContext& txc) {
  1441. NIceDb::TNiceDb db(txc.DB);
  1442. txc.DB.NoMoreReadsForTx();
  1443. // All scheme changes must happen first
  1444. for (const auto& ti : TableInfos) {
  1445. txc.DB.Alter().DropTable(ti.second->LocalTid);
  1446. if (ti.second->ShadowTid) {
  1447. txc.DB.Alter().DropTable(ti.second->ShadowTid);
  1448. }
  1449. }
  1450. // Now remove all snapshots and their info
  1451. SnapshotManager.PersistRemoveAllSnapshots(db);
  1452. for (const auto& ti : TableInfos) {
  1453. db.Table<Schema::UserTables>().Key(ti.first).Delete();
  1454. db.Table<Schema::UserTablesStats>().Key(ti.first).Delete();
  1455. }
  1456. TableInfos.clear();
  1457. }
  1458. // Deletes user table and all system tables that are transfered during split/merge
  1459. // This allows their borrowed parts to be returned to the owner tablet
  1460. void TDataShard::PurgeTxTables(TTransactionContext& txc) {
  1461. TVector<ui32> tablesToDrop = {
  1462. Schema::TxMain::TableId,
  1463. Schema::TxDetails::TableId,
  1464. Schema::InReadSets::TableId,
  1465. Schema::PlanQueue::TableId,
  1466. Schema::DeadlineQueue::TableId
  1467. };
  1468. for (ui32 ti : tablesToDrop) {
  1469. txc.DB.Alter().DropTable(ti);
  1470. }
  1471. DropAllUserTables(txc);
  1472. }
  1473. void TDataShard::SnapshotComplete(TIntrusivePtr<NTabletFlatExecutor::TTableSnapshotContext> snapContext, const TActorContext &ctx) {
  1474. if (auto txSnapContext = dynamic_cast<TTxTableSnapshotContext*>(snapContext.Get())) {
  1475. auto stepOrder = txSnapContext->GetStepOrder();
  1476. auto op = Pipeline.GetActiveOp(stepOrder.TxId);
  1477. Y_DEBUG_ABORT_UNLESS(op, "The Tx that requested snapshot must be active!");
  1478. if (!op) {
  1479. LOG_CRIT_S(ctx, NKikimrServices::TX_DATASHARD,
  1480. "Got snapshot for missing operation " << stepOrder
  1481. << " at " << TabletID());
  1482. return;
  1483. }
  1484. Y_ABORT_UNLESS(txSnapContext->TablesToSnapshot().size() == 1,
  1485. "Currently only 1 table can be snapshotted");
  1486. ui32 tableId = txSnapContext->TablesToSnapshot()[0];
  1487. LOG_DEBUG(ctx, NKikimrServices::TX_DATASHARD,
  1488. "Got snapshot in active state at %" PRIu64 " for table %" PRIu32 " txId %" PRIu64,
  1489. TabletID(), tableId, stepOrder.TxId);
  1490. op->AddInputSnapshot(snapContext);
  1491. Pipeline.AddCandidateOp(op);
  1492. PlanQueue.Progress(ctx);
  1493. return;
  1494. }
  1495. if (auto splitSnapContext = dynamic_cast<TSplitSnapshotContext*>(snapContext.Get())) {
  1496. Execute(CreateTxSplitSnapshotComplete(splitSnapContext), ctx);
  1497. return;
  1498. }
  1499. Y_ABORT("Unexpected table snapshot context");
  1500. }
  1501. TUserTable::TSpecialUpdate TDataShard::SpecialUpdates(const NTable::TDatabase& db, const TTableId& tableId) const {
  1502. Y_ABORT_UNLESS(tableId.PathId.OwnerId == PathOwnerId, "%" PRIu64 " vs %" PRIu64,
  1503. tableId.PathId.OwnerId, PathOwnerId);
  1504. auto it = TableInfos.find(tableId.PathId.LocalPathId);
  1505. Y_ABORT_UNLESS(it != TableInfos.end());
  1506. const TUserTable& tableInfo = *it->second;
  1507. Y_ABORT_UNLESS(tableInfo.LocalTid != Max<ui32>());
  1508. TUserTable::TSpecialUpdate ret;
  1509. if (tableInfo.SpecialColTablet != Max<ui32>()) {
  1510. ret.ColIdTablet = tableInfo.SpecialColTablet;
  1511. ret.Tablet = TabletID();
  1512. ret.HasUpdates = true;
  1513. }
  1514. if (tableInfo.SpecialColEpoch != Max<ui32>() || tableInfo.SpecialColUpdateNo != Max<ui32>()) {
  1515. auto dbChange = db.Head(tableInfo.LocalTid);
  1516. ret.ColIdEpoch = tableInfo.SpecialColEpoch;
  1517. ret.ColIdUpdateNo = tableInfo.SpecialColUpdateNo;
  1518. ret.Epoch = dbChange.Epoch.ToCounter();
  1519. ret.UpdateNo = dbChange.Serial;
  1520. ret.HasUpdates = true;
  1521. }
  1522. return ret;
  1523. }
  1524. void TDataShard::SetTableAccessTime(const TTableId& tableId, TInstant ts) {
  1525. Y_ABORT_UNLESS(!TSysTables::IsSystemTable(tableId));
  1526. auto iter = TableInfos.find(tableId.PathId.LocalPathId);
  1527. Y_ABORT_UNLESS(iter != TableInfos.end());
  1528. iter->second->Stats.AccessTime = ts;
  1529. }
  1530. void TDataShard::SetTableUpdateTime(const TTableId& tableId, TInstant ts) {
  1531. Y_ABORT_UNLESS(!TSysTables::IsSystemTable(tableId));
  1532. auto iter = TableInfos.find(tableId.PathId.LocalPathId);
  1533. Y_ABORT_UNLESS(iter != TableInfos.end());
  1534. iter->second->Stats.AccessTime = ts;
  1535. iter->second->Stats.UpdateTime = ts;
  1536. }
  1537. void TDataShard::SampleKeyAccess(const TTableId& tableId, const TArrayRef<const TCell>& row) {
  1538. Y_ABORT_UNLESS(!TSysTables::IsSystemTable(tableId));
  1539. auto iter = TableInfos.find(tableId.PathId.LocalPathId);
  1540. Y_ABORT_UNLESS(iter != TableInfos.end());
  1541. const ui64 samplingKeyPrefixSize = row.size();
  1542. TArrayRef<const TCell> key(row.data(), samplingKeyPrefixSize);
  1543. iter->second->Stats.AccessStats.Add(key);
  1544. }
  1545. NMiniKQL::IKeyAccessSampler::TPtr TDataShard::GetKeyAccessSampler() {
  1546. return CurrentKeySampler;
  1547. }
  1548. void TDataShard::EnableKeyAccessSampling(const TActorContext &ctx, TInstant until) {
  1549. if (CurrentKeySampler == DisabledKeySampler) {
  1550. for (auto& table : TableInfos) {
  1551. table.second->Stats.AccessStats.Clear();
  1552. }
  1553. CurrentKeySampler = EnabledKeySampler;
  1554. StartedKeyAccessSamplingAt = AppData(ctx)->TimeProvider->Now();
  1555. LOG_NOTICE_S(ctx, NKikimrServices::TX_DATASHARD, "Started key access sampling at datashard: " << TabletID());
  1556. } else {
  1557. LOG_NOTICE_S(ctx, NKikimrServices::TX_DATASHARD, "Extended key access sampling at datashard: " << TabletID());
  1558. }
  1559. StopKeyAccessSamplingAt = until;
  1560. }
  1561. bool TDataShard::OnRenderAppHtmlPage(NMon::TEvRemoteHttpInfo::TPtr ev, const TActorContext &ctx) {
  1562. if (!Executor() || !Executor()->GetStats().IsActive)
  1563. return false;
  1564. if (!ev)
  1565. return true;
  1566. LOG_DEBUG(ctx, NKikimrServices::TX_DATASHARD, "Handle TEvRemoteHttpInfo: %s", ev->Get()->Query.data());
  1567. auto cgi = ev->Get()->Cgi();
  1568. if (const auto& action = cgi.Get("action")) {
  1569. if (action == "cleanup-borrowed-parts") {
  1570. Execute(CreateTxMonitoringCleanupBorrowedParts(this, ev), ctx);
  1571. return true;
  1572. }
  1573. if (action == "reset-schema-version") {
  1574. Execute(CreateTxMonitoringResetSchemaVersion(this, ev), ctx);
  1575. return true;
  1576. }
  1577. if (action == "key-access-sample") {
  1578. TDuration duration = TDuration::Seconds(120);
  1579. EnableKeyAccessSampling(ctx, ctx.Now() + duration);
  1580. ctx.Send(ev->Sender, new NMon::TEvRemoteHttpInfoRes("Enabled key access sampling for " + duration.ToString()));
  1581. return true;
  1582. }
  1583. ctx.Send(ev->Sender, new NMon::TEvRemoteBinaryInfoRes(NMonitoring::HTTPNOTFOUND));
  1584. return true;
  1585. }
  1586. if (const auto& page = cgi.Get("page")) {
  1587. if (page == "main") {
  1588. // fallthrough
  1589. } else if (page == "change-sender") {
  1590. if (OutChangeSender) {
  1591. ctx.Send(ev->Forward(OutChangeSender));
  1592. return true;
  1593. } else {
  1594. ctx.Send(ev->Sender, new NMon::TEvRemoteHttpInfoRes("Change sender is not running"));
  1595. return true;
  1596. }
  1597. } else {
  1598. ctx.Send(ev->Sender, new NMon::TEvRemoteBinaryInfoRes(NMonitoring::HTTPNOTFOUND));
  1599. return true;
  1600. }
  1601. }
  1602. Execute(CreateTxMonitoring(this, ev), ctx);
  1603. return true;
  1604. }
  1605. ui64 TDataShard::GetMemoryUsage() const {
  1606. ui64 res = sizeof(TDataShard) + (20 << 10); //basic value
  1607. res += Pipeline.GetInactiveTxSize();
  1608. return res;
  1609. }
  1610. bool TDataShard::ByKeyFilterDisabled() const {
  1611. return DisableByKeyFilter;
  1612. }
  1613. bool TDataShard::AllowCancelROwithReadsets() const {
  1614. return CanCancelROWithReadSets;
  1615. }
  1616. bool TDataShard::IsMvccEnabled() const {
  1617. return SnapshotManager.IsMvccEnabled();
  1618. }
  1619. TReadWriteVersions TDataShard::GetLocalReadWriteVersions() const {
  1620. if (IsFollower())
  1621. return {TRowVersion::Max(), TRowVersion::Max()};
  1622. if (!IsMvccEnabled())
  1623. return {TRowVersion::Max(), SnapshotManager.GetMinWriteVersion()};
  1624. TRowVersion edge = Max(
  1625. SnapshotManager.GetCompleteEdge(),
  1626. SnapshotManager.GetIncompleteEdge(),
  1627. SnapshotManager.GetUnprotectedReadEdge());
  1628. if (auto nextOp = Pipeline.GetNextPlannedOp(edge.Step, edge.TxId))
  1629. return TRowVersion(nextOp->GetStep(), nextOp->GetTxId());
  1630. TRowVersion maxEdge(edge.Step, ::Max<ui64>());
  1631. return Max(maxEdge, edge.Next(), SnapshotManager.GetImmediateWriteEdge());
  1632. }
  1633. TRowVersion TDataShard::GetMvccTxVersion(EMvccTxMode mode, TOperation* op) const {
  1634. Y_DEBUG_ABORT_UNLESS(IsMvccEnabled());
  1635. if (op) {
  1636. if (op->IsMvccSnapshotRead()) {
  1637. return op->GetMvccSnapshot();
  1638. }
  1639. if (op->GetStep()) {
  1640. return TRowVersion(op->GetStep(), op->GetTxId());
  1641. }
  1642. }
  1643. TRowVersion edge;
  1644. TRowVersion readEdge = Max(
  1645. SnapshotManager.GetCompleteEdge(),
  1646. SnapshotManager.GetUnprotectedReadEdge());
  1647. TRowVersion writeEdge = Max(readEdge, SnapshotManager.GetIncompleteEdge());
  1648. switch (mode) {
  1649. case EMvccTxMode::ReadOnly:
  1650. // With read-only transactions we don't need reads to include
  1651. // changes made at the incomplete edge, as that is a point where
  1652. // distributed transactions performed some reads, not writes.
  1653. // Since incomplete transactions are still inflight, the actual
  1654. // version will stick to the first incomplete transaction is queue,
  1655. // effectively reading non-repeatable state before that transaction.
  1656. edge = readEdge;
  1657. break;
  1658. case EMvccTxMode::ReadWrite:
  1659. // With read-write transactions we must choose a point that is
  1660. // greater than both complete and incomplete edges. The reason
  1661. // is that incomplete transactions performed some reads at that
  1662. // point and these snapshot points must be repeatable.
  1663. // Note that as soon as the first write past the IncompleteEdge
  1664. // happens it cements all distributed transactions up to that point
  1665. // as complete, so all future reads and writes are guaranteed to
  1666. // include that point as well.
  1667. edge = writeEdge;
  1668. break;
  1669. }
  1670. // If there's any planned operation that is above our edge, it would be a
  1671. // suitable version for a new immediate operation. We effectively try to
  1672. // execute "before" that point if possible.
  1673. if (auto nextOp = Pipeline.GetNextPlannedOp(edge.Step, edge.TxId))
  1674. return TRowVersion(nextOp->GetStep(), nextOp->GetTxId());
  1675. // Normally we stick transactions to the end of the last known mediator step
  1676. // Note this calculations only happen when we don't have distributed
  1677. // transactions left in queue, and we won't have any more transactions
  1678. // up to the current mediator time. The mediator time itself may be stale,
  1679. // in which case we may have evidence of its higher value via complete and
  1680. // incomplete edges above.
  1681. const ui64 mediatorStep = Max(MediatorTimeCastEntry ? MediatorTimeCastEntry->Get(TabletID()) : 0, writeEdge.Step);
  1682. TRowVersion mediatorEdge(mediatorStep, ::Max<ui64>());
  1683. switch (mode) {
  1684. case EMvccTxMode::ReadOnly: {
  1685. // We want to include everything that was potentially confirmed to
  1686. // users, but we don't want to include anything that is not replied
  1687. // at the start of this read.
  1688. // Note it's only possible to have ImmediateWriteEdge > mediatorEdge
  1689. // when ImmediateWriteEdge == mediatorEdge + 1
  1690. return Max(mediatorEdge, SnapshotManager.GetImmediateWriteEdgeReplied());
  1691. }
  1692. case EMvccTxMode::ReadWrite: {
  1693. // We must use at least a previously used immediate write edge
  1694. // But we must also avoid trumpling over any unprotected mvcc
  1695. // snapshot reads that have occurred.
  1696. // Note it's only possible to go past the last known mediator step
  1697. // when we had an unprotected read, which itself happens at the
  1698. // last mediator step. So we may only ever have a +1 step, never
  1699. // anything more.
  1700. return Max(mediatorEdge, writeEdge.Next(), SnapshotManager.GetImmediateWriteEdge());
  1701. }
  1702. }
  1703. Y_ABORT("unreachable");
  1704. }
  1705. TReadWriteVersions TDataShard::GetReadWriteVersions(TOperation* op) const {
  1706. if (IsFollower()) {
  1707. return {TRowVersion::Max(), TRowVersion::Max()};
  1708. }
  1709. if (!IsMvccEnabled())
  1710. return {TRowVersion::Max(), SnapshotManager.GetMinWriteVersion()};
  1711. if (op) {
  1712. if (!op->MvccReadWriteVersion) {
  1713. op->MvccReadWriteVersion = GetMvccTxVersion(op->IsReadOnly() ? EMvccTxMode::ReadOnly : EMvccTxMode::ReadWrite, op);
  1714. }
  1715. return *op->MvccReadWriteVersion;
  1716. }
  1717. return GetMvccTxVersion(EMvccTxMode::ReadWrite, nullptr);
  1718. }
  1719. TDataShard::TPromotePostExecuteEdges TDataShard::PromoteImmediatePostExecuteEdges(
  1720. const TRowVersion& version, EPromotePostExecuteEdges mode, TTransactionContext& txc)
  1721. {
  1722. Y_ABORT_UNLESS(!IsFollower(), "Unexpected attempt to promote edges on a follower");
  1723. TPromotePostExecuteEdges res;
  1724. res.HadWrites |= Pipeline.MarkPlannedLogicallyCompleteUpTo(version, txc);
  1725. switch (mode) {
  1726. case EPromotePostExecuteEdges::ReadOnly:
  1727. // We want read-only immediate transactions to be readonly, thus
  1728. // don't promote the complete edge unnecessarily. On restarts we
  1729. // will assume anything written is potentially replied anyway,
  1730. // even if it has never been read.
  1731. break;
  1732. case EPromotePostExecuteEdges::RepeatableRead: {
  1733. bool unprotectedReads = GetEnableUnprotectedMvccSnapshotReads();
  1734. if (unprotectedReads) {
  1735. // We want to use unprotected reads, but we need to make sure it's properly marked first
  1736. if (!SnapshotManager.GetPerformedUnprotectedReads()) {
  1737. SnapshotManager.SetPerformedUnprotectedReads(true, txc);
  1738. res.HadWrites = true;
  1739. }
  1740. if (!res.HadWrites && !SnapshotManager.IsPerformedUnprotectedReadsCommitted()) {
  1741. // We need to wait for completion until the flag is committed
  1742. res.WaitCompletion = true;
  1743. }
  1744. SnapshotManager.PromoteUnprotectedReadEdge(version);
  1745. } else if (SnapshotManager.GetPerformedUnprotectedReads()) {
  1746. // We want to drop the flag as soon as possible
  1747. SnapshotManager.SetPerformedUnprotectedReads(false, txc);
  1748. res.HadWrites = true;
  1749. }
  1750. // We want to promote the complete edge when protected reads are
  1751. // used or when we're already writing something anyway.
  1752. if (res.HadWrites || !unprotectedReads) {
  1753. res.HadWrites |= SnapshotManager.PromoteCompleteEdge(version, txc);
  1754. if (!res.HadWrites && SnapshotManager.GetCommittedCompleteEdge() < version) {
  1755. // We need to wait for completion because some other transaction
  1756. // has moved complete edge, but it's not committed yet.
  1757. res.WaitCompletion = true;
  1758. }
  1759. }
  1760. break;
  1761. }
  1762. case EPromotePostExecuteEdges::ReadWrite: {
  1763. if (version.Step <= GetMaxObservedStep()) {
  1764. res.HadWrites |= SnapshotManager.PromoteCompleteEdge(version.Step, txc);
  1765. }
  1766. res.HadWrites |= SnapshotManager.PromoteImmediateWriteEdge(version, txc);
  1767. if (res.HadWrites) {
  1768. // Promoting write edges may promote read edge
  1769. PromoteFollowerReadEdge(txc);
  1770. }
  1771. break;
  1772. }
  1773. }
  1774. return res;
  1775. }
  1776. ui64 TDataShard::GetMaxObservedStep() const {
  1777. return Max(
  1778. Pipeline.GetLastPlannedTx().Step,
  1779. SnapshotManager.GetCompleteEdge().Step,
  1780. SnapshotManager.GetIncompleteEdge().Step,
  1781. SnapshotManager.GetUnprotectedReadEdge().Step,
  1782. MediatorTimeCastEntry ? MediatorTimeCastEntry->Get(TabletID()) : 0);
  1783. }
  1784. void TDataShard::SendImmediateWriteResult(
  1785. const TRowVersion& version, const TActorId& target, IEventBase* event, ui64 cookie)
  1786. {
  1787. const ui64 step = version.Step;
  1788. const ui64 observedStep = GetMaxObservedStep();
  1789. if (step <= observedStep) {
  1790. SnapshotManager.PromoteImmediateWriteEdgeReplied(version);
  1791. Send(target, event, 0, cookie);
  1792. return;
  1793. }
  1794. MediatorDelayedReplies.emplace(
  1795. std::piecewise_construct,
  1796. std::forward_as_tuple(version),
  1797. std::forward_as_tuple(target, THolder<IEventBase>(event), cookie));
  1798. // Try to subscribe to the next step, when needed
  1799. if (MediatorTimeCastEntry && (MediatorTimeCastWaitingSteps.empty() || step < *MediatorTimeCastWaitingSteps.begin())) {
  1800. MediatorTimeCastWaitingSteps.insert(step);
  1801. Send(MakeMediatorTimecastProxyID(), new TEvMediatorTimecast::TEvWaitPlanStep(TabletID(), step));
  1802. LOG_DEBUG_S(*TlsActivationContext, NKikimrServices::TX_DATASHARD, "Waiting for PlanStep# " << step << " from mediator time cast");
  1803. }
  1804. }
  1805. TMonotonic TDataShard::ConfirmReadOnlyLease() {
  1806. if (IsFollower() || !ReadOnlyLeaseEnabled()) {
  1807. // Do nothing and return an empty timestamp
  1808. return {};
  1809. }
  1810. TMonotonic ts = AppData()->MonotonicTimeProvider->Now();
  1811. Executor()->ConfirmReadOnlyLease(ts);
  1812. return ts;
  1813. }
  1814. void TDataShard::ConfirmReadOnlyLease(TMonotonic ts) {
  1815. if (IsFollower() || !ReadOnlyLeaseEnabled()) {
  1816. // Do nothing
  1817. return;
  1818. }
  1819. Executor()->ConfirmReadOnlyLease(ts);
  1820. }
  1821. void TDataShard::SendWithConfirmedReadOnlyLease(
  1822. TMonotonic ts,
  1823. const TActorId& target,
  1824. IEventBase* event,
  1825. ui64 cookie,
  1826. const TActorId& sessionId)
  1827. {
  1828. if (IsFollower() || !ReadOnlyLeaseEnabled()) {
  1829. // Send possibly stale result (legacy behavior)
  1830. if (!sessionId) {
  1831. Send(target, event, 0, cookie);
  1832. } else {
  1833. SendViaSession(sessionId, target, SelfId(), event);
  1834. }
  1835. return;
  1836. }
  1837. struct TSendState : public TThrRefBase {
  1838. THolder<IEventHandle> Ev;
  1839. TSendState(const TActorId& sessionId, const TActorId& target, const TActorId& src, IEventBase* event, ui64 cookie)
  1840. {
  1841. const ui32 flags = 0;
  1842. Ev = MakeHolder<IEventHandle>(target, src, event, flags, cookie);
  1843. if (sessionId) {
  1844. Ev->Rewrite(TEvInterconnect::EvForward, sessionId);
  1845. }
  1846. }
  1847. };
  1848. if (!ts) {
  1849. ts = AppData()->MonotonicTimeProvider->Now();
  1850. }
  1851. Executor()->ConfirmReadOnlyLease(ts,
  1852. [state = MakeIntrusive<TSendState>(sessionId, target, SelfId(), event, cookie)] {
  1853. TActivationContext::Send(state->Ev.Release());
  1854. });
  1855. }
  1856. void TDataShard::SendWithConfirmedReadOnlyLease(
  1857. const TActorId& target,
  1858. IEventBase* event,
  1859. ui64 cookie,
  1860. const TActorId& sessionId)
  1861. {
  1862. SendWithConfirmedReadOnlyLease(TMonotonic::Zero(), target, event, cookie, sessionId);
  1863. }
  1864. void TDataShard::Handle(TEvPrivate::TEvConfirmReadonlyLease::TPtr& ev, const TActorContext&) {
  1865. SendWithConfirmedReadOnlyLease(ev->Get()->Timestamp, ev->Sender, new TEvPrivate::TEvReadonlyLeaseConfirmation, ev->Cookie);
  1866. }
  1867. void TDataShard::SendImmediateReadResult(
  1868. TMonotonic readTime,
  1869. const TActorId& target,
  1870. IEventBase* event,
  1871. ui64 cookie,
  1872. const TActorId& sessionId)
  1873. {
  1874. SendWithConfirmedReadOnlyLease(readTime, target, event, cookie, sessionId);
  1875. }
  1876. void TDataShard::SendImmediateReadResult(
  1877. const TActorId& target,
  1878. IEventBase* event,
  1879. ui64 cookie,
  1880. const TActorId& sessionId)
  1881. {
  1882. SendWithConfirmedReadOnlyLease(TMonotonic::Zero(), target, event, cookie, sessionId);
  1883. }
  1884. void TDataShard::SendAfterMediatorStepActivate(ui64 mediatorStep, const TActorContext& ctx) {
  1885. for (auto it = MediatorDelayedReplies.begin(); it != MediatorDelayedReplies.end();) {
  1886. const ui64 step = it->first.Step;
  1887. if (step <= mediatorStep) {
  1888. SnapshotManager.PromoteImmediateWriteEdgeReplied(it->first);
  1889. Send(it->second.Target, it->second.Event.Release(), 0, it->second.Cookie);
  1890. it = MediatorDelayedReplies.erase(it);
  1891. continue;
  1892. }
  1893. // Try to subscribe to the next step, when needed
  1894. if (MediatorTimeCastEntry && (MediatorTimeCastWaitingSteps.empty() || step < *MediatorTimeCastWaitingSteps.begin())) {
  1895. MediatorTimeCastWaitingSteps.insert(step);
  1896. Send(MakeMediatorTimecastProxyID(), new TEvMediatorTimecast::TEvWaitPlanStep(TabletID(), step));
  1897. LOG_DEBUG_S(ctx, NKikimrServices::TX_DATASHARD, "Waiting for PlanStep# " << step << " from mediator time cast");
  1898. }
  1899. break;
  1900. }
  1901. if (Pipeline.HasPredictedPlan()) {
  1902. ui64 nextStep = Pipeline.NextPredictedPlanStep();
  1903. if (nextStep <= mediatorStep) {
  1904. SchedulePlanPredictedTxs();
  1905. } else {
  1906. WaitPredictedPlanStep(nextStep);
  1907. }
  1908. }
  1909. if (IsMvccEnabled()) {
  1910. PromoteFollowerReadEdge();
  1911. }
  1912. EmitHeartbeats(ctx);
  1913. }
  1914. void TDataShard::CheckMediatorStateRestored() {
  1915. if (!MediatorStateWaiting ||
  1916. !RegistrationSended ||
  1917. !MediatorTimeCastEntry ||
  1918. CoordinatorSubscriptionsPending > 0 && CoordinatorPrevReadStepMax == Max<ui64>())
  1919. {
  1920. // We are not waiting or not ready to make a decision
  1921. if (MediatorStateWaiting &&
  1922. MediatorTimeCastEntry &&
  1923. CoordinatorPrevReadStepMax == Max<ui64>() &&
  1924. !MediatorStateBackupInitiated)
  1925. {
  1926. // It is possible we don't have coordinators with new protocol support
  1927. // Use a backup plan of acquiring a read snapshot for restoring the read step
  1928. Schedule(TDuration::MilliSeconds(50), new TEvPrivate::TEvMediatorRestoreBackup);
  1929. MediatorStateBackupInitiated = true;
  1930. }
  1931. return;
  1932. }
  1933. // CoordinatorPrevReadStepMax shows us what is the next minimum step that
  1934. // may be acquired as a snapshot. This tells us that no previous read
  1935. // could have happened after this step, even if it was acquired, since it
  1936. // would have been waiting until mediator time advances to that step.
  1937. // CoordinatorPrevReadStepMin shows us the maximum step that could have
  1938. // been acquired at coordinators before we subscribed, however this does
  1939. // not include possible local snapshots that could have been acquired by a
  1940. // previous generation during iterator reads, so we have to always use
  1941. // CoordinatorPrevReadStepMax as a worst case possible readStep.
  1942. // Note we always need to wait for CoordinatorPrevReadStepMax even without
  1943. // local snapshots, because previous generation may have observed it and
  1944. // may have replied to immediate writes at that step, and new immediate
  1945. // HEAD reads must include that in their results.
  1946. const ui64 waitStep = CoordinatorPrevReadStepMax;
  1947. const ui64 readStep = CoordinatorPrevReadStepMax;
  1948. LOG_DEBUG_S(*TlsActivationContext, NKikimrServices::TX_DATASHARD, "CheckMediatorStateRestored: waitStep# " << waitStep << " readStep# " << readStep);
  1949. // WARNING: we must perform this check BEFORE we update unprotected read edge
  1950. // We may enter this code path multiple times, and we expect that the above
  1951. // read step may be refined while we wait based on pessimistic backup step.
  1952. if (GetMaxObservedStep() < waitStep) {
  1953. // We need to wait until we observe mediator step that is at least
  1954. // as large as the step we found.
  1955. if (MediatorTimeCastWaitingSteps.insert(waitStep).second) {
  1956. Send(MakeMediatorTimecastProxyID(), new TEvMediatorTimecast::TEvWaitPlanStep(TabletID(), waitStep));
  1957. LOG_DEBUG_S(*TlsActivationContext, NKikimrServices::TX_DATASHARD, "Waiting for PlanStep# " << waitStep << " from mediator time cast");
  1958. }
  1959. return;
  1960. }
  1961. // Using the inferred last read step we restore the pessimistic unprotected
  1962. // read edge. Note we only need to do so if there have actually been any
  1963. // unprotected reads in this datashard history. We also need to make sure
  1964. // this edge is at least one smaller than ImmediateWriteEdge when we know
  1965. // we started unconfirmed immediate writes in the last generation.
  1966. if (SnapshotManager.GetPerformedUnprotectedReads()) {
  1967. const TRowVersion lastReadEdge(readStep, Max<ui64>());
  1968. const TRowVersion preImmediateWriteEdge =
  1969. SnapshotManager.GetImmediateWriteEdge().Step > SnapshotManager.GetCompleteEdge().Step
  1970. ? SnapshotManager.GetImmediateWriteEdge().Prev()
  1971. : TRowVersion::Min();
  1972. SnapshotManager.PromoteUnprotectedReadEdge(Max(lastReadEdge, preImmediateWriteEdge));
  1973. }
  1974. // Promote the replied immediate write edge up to the currently observed step
  1975. // This is needed to make sure we read any potentially replied immediate
  1976. // writes before the restart, and conversely don't accidentally read any
  1977. // data that is definitely not replied yet.
  1978. if (SnapshotManager.GetImmediateWriteEdgeReplied() < SnapshotManager.GetImmediateWriteEdge()) {
  1979. const ui64 writeStep = SnapshotManager.GetImmediateWriteEdge().Step;
  1980. const TRowVersion edge(GetMaxObservedStep(), Max<ui64>());
  1981. SnapshotManager.PromoteImmediateWriteEdgeReplied(
  1982. Min(edge, SnapshotManager.GetImmediateWriteEdge()));
  1983. // Try to ensure writes become visible sooner rather than later
  1984. if (edge.Step < writeStep) {
  1985. if (MediatorTimeCastWaitingSteps.insert(writeStep).second) {
  1986. Send(MakeMediatorTimecastProxyID(), new TEvMediatorTimecast::TEvWaitPlanStep(TabletID(), writeStep));
  1987. LOG_DEBUG_S(*TlsActivationContext, NKikimrServices::TX_DATASHARD, "Waiting for PlanStep# " << writeStep << " from mediator time cast");
  1988. }
  1989. }
  1990. }
  1991. MediatorStateWaiting = false;
  1992. // Resend all waiting messages
  1993. TVector<THolder<IEventHandle>> msgs;
  1994. msgs.swap(MediatorStateWaitingMsgs);
  1995. for (auto& ev : msgs) {
  1996. TActivationContext::Send(ev.Release());
  1997. }
  1998. }
  1999. NKikimrTxDataShard::TError::EKind ConvertErrCode(NMiniKQL::IEngineFlat::EResult code) {
  2000. using EResult = NMiniKQL::IEngineFlat::EResult;
  2001. switch (code) {
  2002. case EResult::Ok:
  2003. return NKikimrTxDataShard::TError::OK;
  2004. case EResult::SnapshotNotReady:
  2005. return NKikimrTxDataShard::TError::SNAPSHOT_NOT_READY_YET;
  2006. case EResult::SchemeChanged:
  2007. return NKikimrTxDataShard::TError::SCHEME_CHANGED;
  2008. case EResult::IsReadonly:
  2009. return NKikimrTxDataShard::TError::READONLY;
  2010. case EResult::KeyError:
  2011. return NKikimrTxDataShard::TError::SCHEME_ERROR;
  2012. case EResult::ProgramError:
  2013. return NKikimrTxDataShard::TError::PROGRAM_ERROR;
  2014. case EResult::TooManyData:
  2015. return NKikimrTxDataShard::TError::READ_SIZE_EXECEEDED;
  2016. case EResult::SnapshotNotExist:
  2017. return NKikimrTxDataShard::TError::SNAPSHOT_NOT_EXIST;
  2018. case EResult::ResultTooBig:
  2019. return NKikimrTxDataShard::TError::REPLY_SIZE_EXCEEDED;
  2020. case EResult::Cancelled:
  2021. return NKikimrTxDataShard::TError::EXECUTION_CANCELLED;
  2022. default:
  2023. return NKikimrTxDataShard::TError::UNKNOWN;
  2024. }
  2025. }
  2026. Ydb::StatusIds::StatusCode ConvertToYdbStatusCode(NKikimrTxDataShard::TError::EKind code) {
  2027. switch (code) {
  2028. case NKikimrTxDataShard::TError::OK:
  2029. return Ydb::StatusIds::SUCCESS;
  2030. case NKikimrTxDataShard::TError::BAD_TX_KIND:
  2031. case NKikimrTxDataShard::TError::SCHEME_ERROR:
  2032. case NKikimrTxDataShard::TError::WRONG_PAYLOAD_TYPE:
  2033. case NKikimrTxDataShard::TError::LEAF_REQUIRED:
  2034. case NKikimrTxDataShard::TError::WRONG_SHARD_STATE:
  2035. case NKikimrTxDataShard::TError::PROGRAM_ERROR:
  2036. case NKikimrTxDataShard::TError::OUT_OF_SPACE:
  2037. case NKikimrTxDataShard::TError::READ_SIZE_EXECEEDED:
  2038. case NKikimrTxDataShard::TError::SHARD_IS_BLOCKED:
  2039. case NKikimrTxDataShard::TError::UNKNOWN:
  2040. case NKikimrTxDataShard::TError::REPLY_SIZE_EXCEEDED:
  2041. case NKikimrTxDataShard::TError::EXECUTION_CANCELLED:
  2042. case NKikimrTxDataShard::TError::DISK_SPACE_EXHAUSTED:
  2043. return Ydb::StatusIds::INTERNAL_ERROR;
  2044. case NKikimrTxDataShard::TError::BAD_ARGUMENT:
  2045. case NKikimrTxDataShard::TError::READONLY:
  2046. case NKikimrTxDataShard::TError::SNAPSHOT_NOT_READY_YET:
  2047. case NKikimrTxDataShard::TError::SCHEME_CHANGED:
  2048. case NKikimrTxDataShard::TError::DUPLICATED_SNAPSHOT_POLICY:
  2049. case NKikimrTxDataShard::TError::MISSING_SNAPSHOT_POLICY:
  2050. return Ydb::StatusIds::BAD_REQUEST;
  2051. case NKikimrTxDataShard::TError::SNAPSHOT_NOT_EXIST:
  2052. return Ydb::StatusIds::NOT_FOUND;
  2053. default:
  2054. return Ydb::StatusIds::GENERIC_ERROR;
  2055. }
  2056. }
  2057. void TDataShard::Handle(TEvents::TEvGone::TPtr &ev) {
  2058. Actors.erase(ev->Sender);
  2059. }
  2060. void TDataShard::Handle(TEvDataShard::TEvGetShardState::TPtr &ev, const TActorContext &ctx) {
  2061. Execute(new TTxGetShardState(this, ev), ctx);
  2062. }
  2063. void TDataShard::Handle(TEvDataShard::TEvSchemaChangedResult::TPtr& ev, const TActorContext& ctx) {
  2064. LOG_DEBUG_S(ctx, NKikimrServices::TX_DATASHARD,
  2065. "Handle TEvSchemaChangedResult " << ev->Get()->Record.GetTxId()
  2066. << " datashard " << TabletID()
  2067. << " state " << DatashardStateName(State));
  2068. Execute(CreateTxSchemaChanged(ev), ctx);
  2069. }
  2070. void TDataShard::Handle(TEvDataShard::TEvStateChangedResult::TPtr& ev, const TActorContext& ctx) {
  2071. Y_UNUSED(ev);
  2072. LOG_DEBUG_S(ctx, NKikimrServices::TX_DATASHARD,
  2073. "Handle TEvStateChangedResult "
  2074. << " datashard " << TabletID()
  2075. << " state " << DatashardStateName(State));
  2076. // TODO: implement
  2077. NTabletPipe::CloseAndForgetClient(SelfId(), StateReportPipe);
  2078. }
  2079. bool TDataShard::CheckDataTxReject(const TString& opDescr,
  2080. const TActorContext &ctx,
  2081. NKikimrTxDataShard::TEvProposeTransactionResult::EStatus &rejectStatus,
  2082. ERejectReasons &rejectReasons,
  2083. TString &rejectDescription)
  2084. {
  2085. bool reject = false;
  2086. rejectStatus = NKikimrTxDataShard::TEvProposeTransactionResult::OVERLOADED;
  2087. rejectReasons = ERejectReasons::None;
  2088. TVector<TString> rejectDescriptions;
  2089. // In v0.5 reject all transactions on split Src after receiving EvSplit
  2090. if (State == TShardState::SplitSrcWaitForNoTxInFlight ||
  2091. State == TShardState::SplitSrcMakeSnapshot ||
  2092. State == TShardState::SplitSrcSendingSnapshot ||
  2093. State == TShardState::SplitSrcWaitForPartitioningChanged) {
  2094. reject = true;
  2095. rejectReasons |= ERejectReasons::WrongState;
  2096. rejectDescriptions.push_back(TStringBuilder()
  2097. << "is in process of split opId " << SrcSplitOpId
  2098. << " state " << DatashardStateName(State)
  2099. << " (wrong shard state)");
  2100. } else if (State == TShardState::SplitDstReceivingSnapshot) {
  2101. reject = true;
  2102. rejectReasons |= ERejectReasons::WrongState;
  2103. rejectDescriptions.push_back(TStringBuilder()
  2104. << "is in process of split opId " << DstSplitOpId
  2105. << " state " << DatashardStateName(State));
  2106. } else if (State == TShardState::PreOffline || State == TShardState::Offline) {
  2107. reject = true;
  2108. rejectStatus = NKikimrTxDataShard::TEvProposeTransactionResult::ERROR;
  2109. rejectReasons |= ERejectReasons::WrongState;
  2110. rejectDescriptions.push_back("is in a pre/offline state assuming this is due to a finished split (wrong shard state)");
  2111. } else if (MvccSwitchState == TSwitchState::SWITCHING) {
  2112. reject = true;
  2113. rejectReasons |= ERejectReasons::WrongState;
  2114. rejectDescriptions.push_back(TStringBuilder()
  2115. << "is in process of mvcc state change"
  2116. << " state " << DatashardStateName(State));
  2117. }
  2118. if (Pipeline.HasDrop()) {
  2119. reject = true;
  2120. rejectStatus = NKikimrTxDataShard::TEvProposeTransactionResult::ERROR;
  2121. rejectReasons |= ERejectReasons::Dropping;
  2122. rejectDescriptions.push_back("is in process of drop");
  2123. }
  2124. ui64 txInfly = TxInFly();
  2125. TDuration lag = GetDataTxCompleteLag();
  2126. if (txInfly > 1 && lag > TDuration::MilliSeconds(MaxTxLagMilliseconds)) {
  2127. reject = true;
  2128. rejectReasons |= ERejectReasons::OverloadByLag;
  2129. rejectDescriptions.push_back(TStringBuilder()
  2130. << "lags behind, lag: " << lag
  2131. << " in-flight tx count: " << txInfly);
  2132. }
  2133. const float rejectProbabilty = Executor()->GetRejectProbability();
  2134. if (!reject && rejectProbabilty > 0) {
  2135. float rnd = AppData(ctx)->RandomProvider->GenRandReal2();
  2136. reject |= (rnd < rejectProbabilty);
  2137. if (reject) {
  2138. rejectReasons |= ERejectReasons::OverloadByProbability;
  2139. rejectDescriptions.push_back("decided to reject due to given RejectProbability");
  2140. }
  2141. }
  2142. size_t totalInFly =
  2143. ReadIteratorsInFly() + TxInFly() + ImmediateInFly() + MediatorStateWaitingMsgs.size()
  2144. + ProposeQueue.Size() + TxWaiting();
  2145. if (totalInFly > GetMaxTxInFly()) {
  2146. reject = true;
  2147. rejectReasons |= ERejectReasons::OverloadByTxInFly;
  2148. rejectDescriptions.push_back("MaxTxInFly was exceeded");
  2149. }
  2150. if (!reject && Stopping) {
  2151. reject = true;
  2152. rejectReasons |= ERejectReasons::WrongState;
  2153. rejectDescriptions.push_back("is restarting");
  2154. }
  2155. if (!reject) {
  2156. for (auto& it : TableInfos) {
  2157. if (it.second->IsBackup) {
  2158. reject = true;
  2159. rejectStatus = NKikimrTxDataShard::TEvProposeTransactionResult::ERROR;
  2160. rejectReasons |= ERejectReasons::WrongState;
  2161. rejectDescriptions.push_back("is a backup table");
  2162. break;
  2163. }
  2164. }
  2165. }
  2166. if (reject) {
  2167. rejectDescription = TStringBuilder()
  2168. << "Rejecting " << opDescr
  2169. << " because datashard " << TabletID() << ": "
  2170. << JoinSeq("; ", rejectDescriptions);
  2171. }
  2172. return reject;
  2173. }
  2174. bool TDataShard::CheckDataTxRejectAndReply(const TEvDataShard::TEvProposeTransaction::TPtr& ev, const TActorContext& ctx)
  2175. {
  2176. auto* msg = ev->Get();
  2177. switch (msg->GetTxKind()) {
  2178. case NKikimrTxDataShard::TX_KIND_DATA:
  2179. case NKikimrTxDataShard::TX_KIND_SCAN:
  2180. case NKikimrTxDataShard::TX_KIND_SNAPSHOT:
  2181. case NKikimrTxDataShard::TX_KIND_DISTRIBUTED_ERASE:
  2182. case NKikimrTxDataShard::TX_KIND_COMMIT_WRITES:
  2183. break;
  2184. default:
  2185. return false;
  2186. }
  2187. TString txDescr = TStringBuilder() << "data TxId " << msg->GetTxId();
  2188. NKikimrTxDataShard::TEvProposeTransactionResult::EStatus rejectStatus;
  2189. ERejectReasons rejectReasons;
  2190. TString rejectDescription;
  2191. bool reject = CheckDataTxReject(txDescr, ctx, rejectStatus, rejectReasons, rejectDescription);
  2192. if (reject) {
  2193. LWTRACK(ProposeTransactionReject, msg->Orbit);
  2194. THolder<TEvDataShard::TEvProposeTransactionResult> result =
  2195. THolder(new TEvDataShard::TEvProposeTransactionResult(msg->GetTxKind(),
  2196. TabletID(),
  2197. msg->GetTxId(),
  2198. rejectStatus));
  2199. result->AddError(NKikimrTxDataShard::TError::WRONG_SHARD_STATE, rejectDescription);
  2200. LOG_NOTICE_S(ctx, NKikimrServices::TX_DATASHARD, rejectDescription);
  2201. ctx.Send(ev->Sender, result.Release());
  2202. IncCounter(COUNTER_PREPARE_OVERLOADED);
  2203. IncCounter(COUNTER_PREPARE_COMPLETE);
  2204. return true;
  2205. }
  2206. return false;
  2207. }
  2208. bool TDataShard::CheckDataTxRejectAndReply(const NEvents::TDataEvents::TEvWrite::TPtr& ev, const TActorContext& ctx)
  2209. {
  2210. auto* msg = ev->Get();
  2211. TString txDescr = TStringBuilder() << "data TxId " << msg->GetTxId();
  2212. NKikimrTxDataShard::TEvProposeTransactionResult::EStatus rejectStatus;
  2213. ERejectReasons rejectReasons;
  2214. TString rejectDescription;
  2215. bool reject = CheckDataTxReject(txDescr, ctx, rejectStatus, rejectReasons, rejectDescription);
  2216. if (reject) {
  2217. LWTRACK(ProposeTransactionReject, msg->GetOrbit());
  2218. NKikimrDataEvents::TEvWriteResult::EStatus status;
  2219. switch (rejectStatus) {
  2220. case NKikimrTxDataShard::TEvProposeTransactionResult::OVERLOADED:
  2221. status = NKikimrDataEvents::TEvWriteResult::STATUS_OVERLOADED;
  2222. break;
  2223. case NKikimrTxDataShard::TEvProposeTransactionResult::ERROR:
  2224. status = NKikimrDataEvents::TEvWriteResult::STATUS_INTERNAL_ERROR;
  2225. break;
  2226. default:
  2227. Y_FAIL_S("Unexpected rejectStatus " << rejectStatus);
  2228. }
  2229. auto result = NEvents::TDataEvents::TEvWriteResult::BuildError(TabletID(), msg->GetTxId(), status, rejectDescription);
  2230. LOG_NOTICE_S(ctx, NKikimrServices::TX_DATASHARD, rejectDescription);
  2231. ctx.Send(ev->Sender, result.release());
  2232. IncCounter(COUNTER_PREPARE_OVERLOADED);
  2233. IncCounter(COUNTER_PREPARE_COMPLETE);
  2234. return true;
  2235. }
  2236. return false;
  2237. }
  2238. void TDataShard::UpdateProposeQueueSize() const {
  2239. SetCounter(COUNTER_PROPOSE_QUEUE_SIZE, MediatorStateWaitingMsgs.size() + ProposeQueue.Size() + DelayedProposeQueue.size() + Pipeline.WaitingTxs());
  2240. SetCounter(COUNTER_READ_ITERATORS_WAITING, Pipeline.WaitingReadIterators());
  2241. }
  2242. void TDataShard::Handle(TEvDataShard::TEvProposeTransaction::TPtr &ev, const TActorContext &ctx) {
  2243. auto* msg = ev->Get();
  2244. LWTRACK(ProposeTransactionRequest, msg->Orbit);
  2245. // Check if we need to delay an immediate transaction
  2246. if (MediatorStateWaiting &&
  2247. (ev->Get()->GetFlags() & TTxFlags::Immediate) &&
  2248. !(ev->Get()->GetFlags() & TTxFlags::ForceOnline))
  2249. {
  2250. // We cannot calculate correct version until we restore mediator state
  2251. LWTRACK(ProposeTransactionWaitMediatorState, msg->Orbit);
  2252. MediatorStateWaitingMsgs.emplace_back(ev.Release());
  2253. UpdateProposeQueueSize();
  2254. return;
  2255. }
  2256. if (Pipeline.HasProposeDelayers()) {
  2257. LOG_DEBUG_S(ctx, NKikimrServices::TX_DATASHARD,
  2258. "Handle TEvProposeTransaction delayed at " << TabletID() << " until dependency graph is restored");
  2259. LWTRACK(ProposeTransactionWaitDelayers, msg->Orbit);
  2260. DelayedProposeQueue.emplace_back().Reset(ev.Release());
  2261. UpdateProposeQueueSize();
  2262. return;
  2263. }
  2264. if (CheckTxNeedWait(ev)) {
  2265. LOG_DEBUG_S(ctx, NKikimrServices::TX_DATASHARD,
  2266. "Handle TEvProposeTransaction delayed at " << TabletID() << " until interesting plan step will come");
  2267. if (Pipeline.AddWaitingTxOp(ev, ctx)) {
  2268. UpdateProposeQueueSize();
  2269. return;
  2270. }
  2271. }
  2272. IncCounter(COUNTER_PREPARE_REQUEST);
  2273. if (CheckDataTxRejectAndReply(ev, ctx)) {
  2274. return;
  2275. }
  2276. switch (ev->Get()->GetTxKind()) {
  2277. case NKikimrTxDataShard::TX_KIND_DATA:
  2278. case NKikimrTxDataShard::TX_KIND_SCAN:
  2279. case NKikimrTxDataShard::TX_KIND_SNAPSHOT:
  2280. case NKikimrTxDataShard::TX_KIND_DISTRIBUTED_ERASE:
  2281. case NKikimrTxDataShard::TX_KIND_COMMIT_WRITES:
  2282. ProposeTransaction(std::move(ev), ctx);
  2283. return;
  2284. case NKikimrTxDataShard::TX_KIND_SCHEME:
  2285. ProposeTransaction(std::move(ev), ctx);
  2286. return;
  2287. default:
  2288. break;
  2289. }
  2290. THolder<TEvDataShard::TEvProposeTransactionResult> result
  2291. = THolder(new TEvDataShard::TEvProposeTransactionResult(ev->Get()->GetTxKind(),
  2292. TabletID(),
  2293. ev->Get()->GetTxId(),
  2294. NKikimrTxDataShard::TEvProposeTransactionResult::ERROR));
  2295. result->AddError(NKikimrTxDataShard::TError::BAD_TX_KIND, "Unknown kind of transaction");
  2296. ctx.Send(ev->Sender, result.Release());
  2297. IncCounter(COUNTER_PREPARE_ERROR);
  2298. IncCounter(COUNTER_PREPARE_COMPLETE);
  2299. // TODO[serxa]: wake up! dont sleep! maybe...
  2300. //Executor()->WakeUp(ctx);
  2301. }
  2302. void TDataShard::Handle(TEvDataShard::TEvProposeTransactionAttach::TPtr &ev, const TActorContext &ctx) {
  2303. const auto &record = ev->Get()->Record;
  2304. const ui64 txId = record.GetTxId();
  2305. NKikimrProto::EReplyStatus status = NKikimrProto::NODATA;
  2306. auto op = TransQueue.FindTxInFly(txId);
  2307. if (!op) {
  2308. op = Pipeline.FindCompletingOp(txId);
  2309. }
  2310. if (op && op->GetTarget() == ev->Sender && !op->IsImmediate() && op->HasStoredFlag() && !op->HasResultSentFlag()) {
  2311. // This transaction is expected to send reply eventually
  2312. status = NKikimrProto::OK;
  2313. }
  2314. ctx.Send(ev->Sender, new TEvDataShard::TEvProposeTransactionAttachResult(TabletID(), txId, status), 0, ev->Cookie);
  2315. }
  2316. void TDataShard::HandleAsFollower(TEvDataShard::TEvProposeTransaction::TPtr &ev, const TActorContext &ctx) {
  2317. auto* msg = ev->Get();
  2318. LWTRACK(ProposeTransactionRequest, msg->Orbit);
  2319. IncCounter(COUNTER_PREPARE_REQUEST);
  2320. if (TxInFly() > GetMaxTxInFly()) {
  2321. THolder<TEvDataShard::TEvProposeTransactionResult> result =
  2322. THolder(new TEvDataShard::TEvProposeTransactionResult(ev->Get()->GetTxKind(), TabletID(),
  2323. ev->Get()->GetTxId(), NKikimrTxDataShard::TEvProposeTransactionResult::OVERLOADED));
  2324. ctx.Send(ev->Sender, result.Release());
  2325. IncCounter(COUNTER_PREPARE_OVERLOADED);
  2326. IncCounter(COUNTER_PREPARE_COMPLETE);
  2327. return;
  2328. }
  2329. if (ev->Get()->GetTxKind() == NKikimrTxDataShard::TX_KIND_DATA) {
  2330. ProposeTransaction(std::move(ev), ctx);
  2331. return;
  2332. }
  2333. THolder<TEvDataShard::TEvProposeTransactionResult> result
  2334. = THolder(new TEvDataShard::TEvProposeTransactionResult(ev->Get()->GetTxKind(),
  2335. TabletID(),
  2336. ev->Get()->GetTxId(),
  2337. NKikimrTxDataShard::TEvProposeTransactionResult::ERROR));
  2338. result->AddError(NKikimrTxDataShard::TError::BAD_TX_KIND, "Unsupported transaction kind");
  2339. ctx.Send(ev->Sender, result.Release());
  2340. IncCounter(COUNTER_PREPARE_ERROR);
  2341. IncCounter(COUNTER_PREPARE_COMPLETE);
  2342. }
  2343. void TDataShard::CheckDelayedProposeQueue(const TActorContext &ctx) {
  2344. if (DelayedProposeQueue && !Pipeline.HasProposeDelayers()) {
  2345. for (auto& ev : DelayedProposeQueue) {
  2346. ctx.ExecutorThread.Send(ev.Release());
  2347. }
  2348. DelayedProposeQueue.clear();
  2349. DelayedProposeQueue.shrink_to_fit();
  2350. UpdateProposeQueueSize();
  2351. }
  2352. }
  2353. void TDataShard::ProposeTransaction(TEvDataShard::TEvProposeTransaction::TPtr &&ev, const TActorContext &ctx) {
  2354. auto* msg = ev->Get();
  2355. // This transaction may run in immediate mode
  2356. bool mayRunImmediate = (msg->GetFlags() & TTxFlags::Immediate) && !(msg->GetFlags() & TTxFlags::ForceOnline) &&
  2357. msg->GetTxKind() == NKikimrTxDataShard::TX_KIND_DATA;
  2358. if (mayRunImmediate) {
  2359. // Enqueue immediate transactions so they don't starve existing operations
  2360. LWTRACK(ProposeTransactionEnqueue, msg->Orbit);
  2361. ProposeQueue.Enqueue(IEventHandle::Upcast<TEvDataShard::TEvProposeTransaction>(std::move(ev)), TAppData::TimeProvider->Now(), NextTieBreakerIndex++, ctx);
  2362. UpdateProposeQueueSize();
  2363. } else {
  2364. // Prepare planned transactions as soon as possible
  2365. Execute(new TTxProposeTransactionBase(this, std::move(ev), TAppData::TimeProvider->Now(), NextTieBreakerIndex++, /* delayed */ false), ctx);
  2366. }
  2367. }
  2368. void TDataShard::ProposeTransaction(NEvents::TDataEvents::TEvWrite::TPtr&& ev, const TActorContext& ctx) {
  2369. auto* msg = ev->Get();
  2370. const auto& record = msg->Record;
  2371. // This transaction may run in immediate mode
  2372. bool mayRunImmediate = record.txmode() == NKikimrDataEvents::TEvWrite::MODE_IMMEDIATE;
  2373. if (mayRunImmediate) {
  2374. // Enqueue immediate transactions so they don't starve existing operations
  2375. LWTRACK(ProposeTransactionEnqueue, msg->GetOrbit());
  2376. ProposeQueue.Enqueue(IEventHandle::Upcast<NEvents::TDataEvents::TEvWrite>(std::move(ev)), TAppData::TimeProvider->Now(), NextTieBreakerIndex++, ctx);
  2377. UpdateProposeQueueSize();
  2378. } else {
  2379. // Prepare planned transactions as soon as possible
  2380. Execute(new TTxWrite(this, std::move(ev), TAppData::TimeProvider->Now(), NextTieBreakerIndex++, /* delayed */ false), ctx);
  2381. }
  2382. }
  2383. void TDataShard::Handle(TEvTxProcessing::TEvPlanStep::TPtr &ev, const TActorContext &ctx) {
  2384. ui64 srcMediatorId = ev->Get()->Record.GetMediatorID();
  2385. if (!CheckMediatorAuthorisation(srcMediatorId)) {
  2386. LOG_CRIT_S(ctx, NKikimrServices::TX_DATASHARD, "tablet " << TabletID() <<
  2387. " receive PlanStep " << ev->Get()->Record.GetStep() <<
  2388. " from unauthorized mediator " << srcMediatorId);
  2389. HandlePoison(ctx);
  2390. return;
  2391. }
  2392. Execute(new TTxPlanStep(this, ev), ctx);
  2393. }
  2394. void TDataShard::Handle(TEvTxProcessing::TEvReadSet::TPtr &ev, const TActorContext &ctx) {
  2395. ui64 sender = ev->Get()->Record.GetTabletSource();
  2396. ui64 dest = ev->Get()->Record.GetTabletDest();
  2397. ui64 producer = ev->Get()->Record.GetTabletProducer();
  2398. ui64 txId = ev->Get()->Record.GetTxId();
  2399. LOG_DEBUG(ctx, NKikimrServices::TX_DATASHARD, "Receive RS at %" PRIu64 " source %" PRIu64 " dest %" PRIu64 " producer %" PRIu64 " txId %" PRIu64,
  2400. TabletID(), sender, dest, producer, txId);
  2401. IncCounter(COUNTER_READSET_RECEIVED_COUNT);
  2402. IncCounter(COUNTER_READSET_RECEIVED_SIZE, ev->Get()->Record.GetReadSet().size());
  2403. Execute(new TTxReadSet(this, ev), ctx);
  2404. }
  2405. void TDataShard::Handle(TEvTxProcessing::TEvReadSetAck::TPtr &ev, const TActorContext &ctx) {
  2406. OutReadSets.SaveAck(ctx, ev->Release());
  2407. // progress one more Tx to force delayed schema operations
  2408. if (Pipeline.HasSchemaOperation() && OutReadSets.Empty()) {
  2409. // TODO: wait for empty OutRS in a separate unit?
  2410. Pipeline.AddCandidateUnit(EExecutionUnitKind::PlanQueue);
  2411. PlanQueue.Progress(ctx);
  2412. }
  2413. CheckStateChange(ctx);
  2414. }
  2415. void TDataShard::Handle(TEvPrivate::TEvProgressTransaction::TPtr &ev, const TActorContext &ctx) {
  2416. Y_UNUSED(ev);
  2417. IncCounter(COUNTER_TX_PROGRESS_EV);
  2418. ExecuteProgressTx(ctx);
  2419. }
  2420. void TDataShard::Handle(TEvPrivate::TEvDelayedProposeTransaction::TPtr &ev, const TActorContext &ctx) {
  2421. Y_UNUSED(ev);
  2422. IncCounter(COUNTER_PROPOSE_QUEUE_EV);
  2423. if (ProposeQueue) {
  2424. auto item = ProposeQueue.Dequeue();
  2425. UpdateProposeQueueSize();
  2426. TDuration latency = TAppData::TimeProvider->Now() - item.ReceivedAt;
  2427. IncCounter(COUNTER_PROPOSE_QUEUE_LATENCY, latency);
  2428. if (!item.Cancelled) {
  2429. // N.B. we don't call ProposeQueue.Reset(), tx will Ack() on its first Execute()
  2430. switch (item.Event->GetTypeRewrite()) {
  2431. case TEvDataShard::TEvProposeTransaction::EventType: {
  2432. auto event = IEventHandle::Downcast<TEvDataShard::TEvProposeTransaction>(std::move(item.Event));
  2433. Execute(new TTxProposeTransactionBase(this, std::move(event), item.ReceivedAt, item.TieBreakerIndex, /* delayed */ true), ctx);
  2434. return;
  2435. }
  2436. case NEvents::TDataEvents::TEvWrite::EventType: {
  2437. auto event = IEventHandle::Downcast<NEvents::TDataEvents::TEvWrite>(std::move(item.Event));
  2438. Execute(new TTxWrite(this, std::move(event), item.ReceivedAt, item.TieBreakerIndex, /* delayed */ true), ctx);
  2439. return;
  2440. }
  2441. default:
  2442. Y_FAIL_S("Unexpected event type " << item.Event->GetTypeRewrite());
  2443. }
  2444. }
  2445. TActorId target = item.Event->Sender;
  2446. ui64 cookie = item.Event->Cookie;
  2447. switch (item.Event->GetTypeRewrite()) {
  2448. case TEvDataShard::TEvProposeTransaction::EventType: {
  2449. auto* msg = item.Event->Get<TEvDataShard::TEvProposeTransaction>();
  2450. auto kind = msg->GetTxKind();
  2451. auto txId = msg->GetTxId();
  2452. auto result = new TEvDataShard::TEvProposeTransactionResult(
  2453. kind, TabletID(), txId,
  2454. NKikimrTxDataShard::TEvProposeTransactionResult::CANCELLED);
  2455. ctx.Send(target, result, 0, cookie);
  2456. return;
  2457. }
  2458. case NEvents::TDataEvents::TEvWrite::EventType: {
  2459. auto* msg = item.Event->Get<NEvents::TDataEvents::TEvWrite>();
  2460. auto result = NEvents::TDataEvents::TEvWriteResult::BuildError(TabletID(), msg->GetTxId(), NKikimrDataEvents::TEvWriteResult::STATUS_CANCELLED, "Canceled");
  2461. ctx.Send(target, result.release(), 0, cookie);
  2462. return;
  2463. }
  2464. default:
  2465. Y_FAIL_S("Unexpected event type " << item.Event->GetTypeRewrite());
  2466. }
  2467. }
  2468. // N.B. Ack directly since we didn't start any delayed transactions
  2469. ProposeQueue.Ack(ctx);
  2470. }
  2471. void TDataShard::Handle(TEvPrivate::TEvProgressResendReadSet::TPtr &ev, const TActorContext &ctx) {
  2472. ResendReadSetQueue.Reset(ctx);
  2473. Execute(new TTxProgressResendRS(this, ev->Get()->Seqno), ctx);
  2474. }
  2475. void TDataShard::Handle(TEvPrivate::TEvRegisterScanActor::TPtr &ev, const TActorContext &ctx) {
  2476. ui64 txId = ev->Get()->TxId;
  2477. auto op = Pipeline.FindOp(txId);
  2478. if (!op) {
  2479. LOG_INFO_S(ctx, NKikimrServices::TX_DATASHARD,
  2480. "Cannot find op " << txId << " to register scan actor");
  2481. return;
  2482. }
  2483. if (!op->IsReadTable()) {
  2484. LOG_INFO_S(ctx, NKikimrServices::TX_DATASHARD,
  2485. "Cannot register scan actor for op " << txId
  2486. << " of kind " << op->GetKind());
  2487. return;
  2488. }
  2489. TActiveTransaction *tx = dynamic_cast<TActiveTransaction*>(op.Get());
  2490. Y_VERIFY_S(tx, "cannot cast operation of kind " << op->GetKind());
  2491. tx->SetScanActor(ev->Sender);
  2492. }
  2493. void TDataShard::Handle(TEvPrivate::TEvScanStats::TPtr& ev, const TActorContext &ctx) {
  2494. Y_UNUSED(ctx);
  2495. TabletCounters->Cumulative()[COUNTER_SCANNED_ROWS].Increment(ev->Get()->Rows);
  2496. TabletCounters->Cumulative()[COUNTER_SCANNED_BYTES].Increment(ev->Get()->Bytes);
  2497. }
  2498. void TDataShard::Handle(TEvPrivate::TEvPersistScanState::TPtr& ev, const TActorContext &ctx) {
  2499. TabletCounters->Cumulative()[COUNTER_SCANNED_ROWS].Increment(ev->Get()->Rows);
  2500. TabletCounters->Cumulative()[COUNTER_SCANNED_BYTES].Increment(ev->Get()->Bytes);
  2501. Execute(new TTxStoreScanState(this, ev), ctx);
  2502. }
  2503. void TDataShard::Handle(TEvTabletPipe::TEvClientConnected::TPtr &ev, const TActorContext &ctx) {
  2504. Y_ABORT_UNLESS(ev->Get()->Leader, "Unexpectedly connected to follower of tablet %" PRIu64, ev->Get()->TabletId);
  2505. if (ev->Get()->ClientId == SchemeShardPipe) {
  2506. if (!TransQueue.HasNotAckedSchemaTx()) {
  2507. LOG_ERROR(ctx, NKikimrServices::TX_DATASHARD,
  2508. "Datashard's schemeshard pipe connected while no messages to sent at %" PRIu64, TabletID());
  2509. }
  2510. TEvTabletPipe::TEvClientConnected *msg = ev->Get();
  2511. if (msg->Status != NKikimrProto::OK) {
  2512. SchemeShardPipe = TActorId();
  2513. NotifySchemeshard(ctx);
  2514. }
  2515. return;
  2516. }
  2517. if (ev->Get()->ClientId == StateReportPipe) {
  2518. if (ev->Get()->Status != NKikimrProto::OK) {
  2519. StateReportPipe = TActorId();
  2520. ReportState(ctx, State);
  2521. }
  2522. return;
  2523. }
  2524. if (ev->Get()->ClientId == DbStatsReportPipe) {
  2525. if (ev->Get()->Status != NKikimrProto::OK) {
  2526. DbStatsReportPipe = TActorId();
  2527. }
  2528. return;
  2529. }
  2530. if (ev->Get()->ClientId == TableResolvePipe) {
  2531. if (ev->Get()->Status != NKikimrProto::OK) {
  2532. TableResolvePipe = TActorId();
  2533. ResolveTablePath(ctx);
  2534. }
  2535. return;
  2536. }
  2537. if (LoanReturnTracker.Has(ev->Get()->TabletId, ev->Get()->ClientId)) {
  2538. if (ev->Get()->Status != NKikimrProto::OK) {
  2539. if (!ev->Get()->Dead) {
  2540. LOG_DEBUG_S(ctx, NKikimrServices::TX_DATASHARD,
  2541. "Resending loan returns from " << TabletID() << " to " << ev->Get()->TabletId);
  2542. LoanReturnTracker.ResendLoans(ev->Get()->TabletId, ctx);
  2543. } else {
  2544. LOG_DEBUG_S(ctx, NKikimrServices::TX_DATASHARD,
  2545. "Auto-Acking loan returns to dead " << ev->Get()->TabletId << " from " << TabletID());
  2546. LoanReturnTracker.AutoAckLoans(ev->Get()->TabletId, ctx);
  2547. }
  2548. }
  2549. return;
  2550. }
  2551. // Resend split-related messages in needed
  2552. if (SplitSrcSnapshotSender.Has(ev->Get()->TabletId, ev->Get()->ClientId)) {
  2553. if (ev->Get()->Status != NKikimrProto::OK) {
  2554. SplitSrcSnapshotSender.DoSend(ev->Get()->TabletId, ctx);
  2555. }
  2556. return;
  2557. }
  2558. if (ChangeSenderActivator.Has(ev->Get()->TabletId, ev->Get()->ClientId)) {
  2559. if (ev->Get()->Status != NKikimrProto::OK) {
  2560. if (!ev->Get()->Dead) {
  2561. ChangeSenderActivator.DoSend(ev->Get()->TabletId, ctx);
  2562. } else {
  2563. ChangeSenderActivator.AutoAck(ev->Get()->TabletId, ctx);
  2564. }
  2565. }
  2566. return;
  2567. }
  2568. if (!PipeClientCache->OnConnect(ev)) {
  2569. if (ev->Get()->Dead) {
  2570. AckRSToDeletedTablet(ev->Get()->TabletId, ctx);
  2571. } else {
  2572. LOG_NOTICE(ctx, NKikimrServices::TX_DATASHARD, "Failed to connect to tablet %" PRIu64 " from tablet %" PRIu64, ev->Get()->TabletId, TabletID());
  2573. RestartPipeRS(ev->Get()->TabletId, ctx);
  2574. }
  2575. } else {
  2576. LOG_DEBUG(ctx, NKikimrServices::TX_DATASHARD, "Connected to tablet %" PRIu64 " from tablet %" PRIu64, ev->Get()->TabletId, TabletID());
  2577. }
  2578. }
  2579. void TDataShard::Handle(TEvTabletPipe::TEvClientDestroyed::TPtr &ev, const TActorContext &ctx) {
  2580. if (ev->Get()->ClientId == SchemeShardPipe) {
  2581. if (!TransQueue.HasNotAckedSchemaTx()) {
  2582. LOG_ERROR(ctx, NKikimrServices::TX_DATASHARD,
  2583. "Datashard's schemeshard pipe destroyed while no messages to sent at %" PRIu64, TabletID());
  2584. }
  2585. SchemeShardPipe = TActorId();
  2586. NotifySchemeshard(ctx);
  2587. return;
  2588. }
  2589. if (ev->Get()->ClientId == StateReportPipe) {
  2590. StateReportPipe = TActorId();
  2591. ReportState(ctx, State);
  2592. return;
  2593. }
  2594. if (ev->Get()->ClientId == DbStatsReportPipe) {
  2595. DbStatsReportPipe = TActorId();
  2596. return;
  2597. }
  2598. // Resend loan-related messages in needed
  2599. if (LoanReturnTracker.Has(ev->Get()->TabletId, ev->Get()->ClientId)) {
  2600. LOG_DEBUG_S(ctx, NKikimrServices::TX_DATASHARD,
  2601. "Resending loan returns from " << TabletID() << " to " << ev->Get()->TabletId);
  2602. LoanReturnTracker.ResendLoans(ev->Get()->TabletId, ctx);
  2603. return;
  2604. }
  2605. // Resend split-related messages in needed
  2606. if (SplitSrcSnapshotSender.Has(ev->Get()->TabletId, ev->Get()->ClientId)) {
  2607. SplitSrcSnapshotSender.DoSend(ev->Get()->TabletId, ctx);
  2608. return;
  2609. }
  2610. if (ChangeSenderActivator.Has(ev->Get()->TabletId, ev->Get()->ClientId)) {
  2611. ChangeSenderActivator.DoSend(ev->Get()->TabletId, ctx);
  2612. return;
  2613. }
  2614. LOG_DEBUG(ctx, NKikimrServices::TX_DATASHARD, "Client pipe to tablet %" PRIu64 " from %" PRIu64 " is reset", ev->Get()->TabletId, TabletID());
  2615. PipeClientCache->OnDisconnect(ev);
  2616. RestartPipeRS(ev->Get()->TabletId, ctx);
  2617. }
  2618. void TDataShard::RestartPipeRS(ui64 tabletId, const TActorContext& ctx) {
  2619. for (auto seqno : ResendReadSetPipeTracker.FindTx(tabletId)) {
  2620. if (seqno == Max<ui64>()) {
  2621. OutReadSets.ResendExpectations(tabletId, ctx);
  2622. continue;
  2623. }
  2624. LOG_DEBUG(ctx, NKikimrServices::TX_DATASHARD, "Pipe reset to tablet %" PRIu64 " caused resend of readset %" PRIu64
  2625. " at tablet %" PRIu64, tabletId, seqno, TabletID());
  2626. ResendReadSetQueue.Progress(seqno, ctx);
  2627. }
  2628. }
  2629. void TDataShard::AckRSToDeletedTablet(ui64 tabletId, const TActorContext& ctx) {
  2630. bool detachExpectations = false;
  2631. for (auto seqno : ResendReadSetPipeTracker.FindTx(tabletId)) {
  2632. if (seqno == Max<ui64>()) {
  2633. AbortExpectationsFromDeletedTablet(tabletId, OutReadSets.RemoveExpectations(tabletId));
  2634. detachExpectations = true;
  2635. continue;
  2636. }
  2637. LOG_DEBUG(ctx, NKikimrServices::TX_DATASHARD, "Pipe reset to dead tablet %" PRIu64 " caused ack of readset %" PRIu64
  2638. " at tablet %" PRIu64, tabletId, seqno, TabletID());
  2639. OutReadSets.AckForDeletedDestination(tabletId, seqno, ctx);
  2640. // progress one more Tx to force delayed schema operations
  2641. if (Pipeline.HasSchemaOperation() && OutReadSets.Empty()) {
  2642. // TODO: wait for empty OutRS in a separate unit?
  2643. Pipeline.AddCandidateUnit(EExecutionUnitKind::PlanQueue);
  2644. PlanQueue.Progress(ctx);
  2645. }
  2646. }
  2647. if (detachExpectations) {
  2648. ResendReadSetPipeTracker.DetachTablet(Max<ui64>(), tabletId, 0, ctx);
  2649. }
  2650. CheckStateChange(ctx);
  2651. }
  2652. void TDataShard::AbortExpectationsFromDeletedTablet(ui64 tabletId, THashMap<ui64, ui64>&& expectations) {
  2653. for (auto& pr : expectations) {
  2654. auto* info = VolatileTxManager.FindByTxId(pr.first);
  2655. if (info && info->State == EVolatileTxState::Waiting && info->Participants.contains(tabletId)) {
  2656. VolatileTxManager.AbortWaitingTransaction(info);
  2657. }
  2658. }
  2659. }
  2660. void TDataShard::Handle(TEvTabletPipe::TEvServerConnected::TPtr &ev, const TActorContext &ctx) {
  2661. LOG_DEBUG_S(ctx, NKikimrServices::TX_DATASHARD, "Server connected at "
  2662. << (Executor()->GetStats().IsFollower ? "follower " : "leader ")
  2663. << "tablet# " << ev->Get()->TabletId
  2664. << ", clientId# " << ev->Get()->ClientId
  2665. << ", serverId# " << ev->Get()->ServerId
  2666. << ", sessionId# " << ev->InterconnectSession);
  2667. auto res = PipeServers.emplace(
  2668. std::piecewise_construct,
  2669. std::forward_as_tuple(ev->Get()->ServerId),
  2670. std::forward_as_tuple());
  2671. Y_VERIFY_DEBUG_S(res.second,
  2672. "Unexpected TEvServerConnected for " << ev->Get()->ServerId);
  2673. res.first->second.InterconnectSession = ev->Get()->InterconnectSession;
  2674. }
  2675. void TDataShard::Handle(TEvTabletPipe::TEvServerDisconnected::TPtr &ev, const TActorContext &ctx) {
  2676. LOG_DEBUG_S(ctx, NKikimrServices::TX_DATASHARD, "Server disconnected at "
  2677. << (Executor()->GetStats().IsFollower ? "follower " : "leader ")
  2678. << "tablet# " << ev->Get()->TabletId
  2679. << ", clientId# " << ev->Get()->ClientId
  2680. << ", serverId# " << ev->Get()->ServerId
  2681. << ", sessionId# " << ev->InterconnectSession);
  2682. auto it = PipeServers.find(ev->Get()->ServerId);
  2683. Y_VERIFY_DEBUG_S(it != PipeServers.end(),
  2684. "Unexpected TEvServerDisconnected for " << ev->Get()->ServerId);
  2685. DiscardOverloadSubscribers(it->second);
  2686. PipeServers.erase(it);
  2687. }
  2688. void TDataShard::Handle(TEvMediatorTimecast::TEvRegisterTabletResult::TPtr& ev, const TActorContext& ctx) {
  2689. LOG_DEBUG_S(ctx, NKikimrServices::TX_DATASHARD,
  2690. "Got TEvMediatorTimecast::TEvRegisterTabletResult at " << TabletID()
  2691. << " time " << ev->Get()->Entry->Get(TabletID()));
  2692. Y_ABORT_UNLESS(ev->Get()->TabletId == TabletID());
  2693. MediatorTimeCastEntry = ev->Get()->Entry;
  2694. Y_ABORT_UNLESS(MediatorTimeCastEntry);
  2695. SendAfterMediatorStepActivate(MediatorTimeCastEntry->Get(TabletID()), ctx);
  2696. Pipeline.ActivateWaitingTxOps(ctx);
  2697. CheckMediatorStateRestored();
  2698. }
  2699. void TDataShard::Handle(TEvMediatorTimecast::TEvSubscribeReadStepResult::TPtr& ev, const TActorContext& ctx) {
  2700. auto* msg = ev->Get();
  2701. LOG_DEBUG_S(ctx, NKikimrServices::TX_DATASHARD,
  2702. "Got TEvMediatorTimecast::TEvSubscribeReadStepResult at " << TabletID()
  2703. << " coordinator " << msg->CoordinatorId
  2704. << " last step " << msg->LastReadStep
  2705. << " next step " << msg->NextReadStep);
  2706. auto it = CoordinatorSubscriptionById.find(msg->CoordinatorId);
  2707. Y_VERIFY_S(it != CoordinatorSubscriptionById.end(),
  2708. "Unexpected TEvSubscribeReadStepResult for coordinator " << msg->CoordinatorId);
  2709. size_t index = it->second;
  2710. auto& subscription = CoordinatorSubscriptions.at(index);
  2711. subscription.ReadStep = msg->ReadStep;
  2712. CoordinatorPrevReadStepMin = Max(CoordinatorPrevReadStepMin, msg->LastReadStep);
  2713. CoordinatorPrevReadStepMax = Min(CoordinatorPrevReadStepMax, msg->NextReadStep);
  2714. --CoordinatorSubscriptionsPending;
  2715. CheckMediatorStateRestored();
  2716. }
  2717. void TDataShard::Handle(TEvMediatorTimecast::TEvNotifyPlanStep::TPtr& ev, const TActorContext& ctx) {
  2718. const auto* msg = ev->Get();
  2719. Y_ABORT_UNLESS(msg->TabletId == TabletID());
  2720. Y_ABORT_UNLESS(MediatorTimeCastEntry);
  2721. ui64 step = MediatorTimeCastEntry->Get(TabletID());
  2722. LOG_DEBUG_S(ctx, NKikimrServices::TX_DATASHARD, "Notified by mediator time cast with PlanStep# " << step << " at tablet " << TabletID());
  2723. for (auto it = MediatorTimeCastWaitingSteps.begin(); it != MediatorTimeCastWaitingSteps.end() && *it <= step;)
  2724. it = MediatorTimeCastWaitingSteps.erase(it);
  2725. SendAfterMediatorStepActivate(step, ctx);
  2726. Pipeline.ActivateWaitingTxOps(ctx);
  2727. CheckMediatorStateRestored();
  2728. }
  2729. void TDataShard::Handle(TEvPrivate::TEvMediatorRestoreBackup::TPtr&, const TActorContext&) {
  2730. if (MediatorStateWaiting && CoordinatorPrevReadStepMax == Max<ui64>()) {
  2731. // We are still waiting for new protol coordinator state
  2732. // TODO: send an old snapshot request to coordinators
  2733. }
  2734. }
  2735. bool TDataShard::WaitPlanStep(ui64 step) {
  2736. if (step <= Pipeline.GetLastPlannedTx().Step)
  2737. return false;
  2738. if (step <= SnapshotManager.GetCompleteEdge().Step)
  2739. return false;
  2740. if (MediatorTimeCastEntry && step <= MediatorTimeCastEntry->Get(TabletID()))
  2741. return false;
  2742. if (!RegistrationSended)
  2743. return false;
  2744. if (MediatorTimeCastWaitingSteps.empty() || step < *MediatorTimeCastWaitingSteps.begin()) {
  2745. MediatorTimeCastWaitingSteps.insert(step);
  2746. Send(MakeMediatorTimecastProxyID(), new TEvMediatorTimecast::TEvWaitPlanStep(TabletID(), step));
  2747. LOG_DEBUG_S(*TlsActivationContext, NKikimrServices::TX_DATASHARD, "Waiting for PlanStep# " << step << " from mediator time cast");
  2748. return true;
  2749. }
  2750. return false;
  2751. }
  2752. void TDataShard::WaitPredictedPlanStep(ui64 step) {
  2753. if (!MediatorTimeCastEntry) {
  2754. return;
  2755. }
  2756. if (step <= MediatorTimeCastEntry->Get(TabletID())) {
  2757. // This step is ready, schedule a transaction plan
  2758. SchedulePlanPredictedTxs();
  2759. return;
  2760. }
  2761. if (MediatorTimeCastWaitingSteps.empty() || step < *MediatorTimeCastWaitingSteps.begin()) {
  2762. MediatorTimeCastWaitingSteps.insert(step);
  2763. Send(MakeMediatorTimecastProxyID(), new TEvMediatorTimecast::TEvWaitPlanStep(TabletID(), step));
  2764. LOG_DEBUG_S(*TlsActivationContext, NKikimrServices::TX_DATASHARD, "Waiting for PlanStep# " << step << " from mediator time cast");
  2765. }
  2766. }
  2767. bool TDataShard::CheckTxNeedWait() const {
  2768. if (MvccSwitchState == TSwitchState::SWITCHING) {
  2769. LOG_TRACE_S(*TlsActivationContext, NKikimrServices::TX_DATASHARD, "New transaction needs to wait because of mvcc state switching");
  2770. return true;
  2771. }
  2772. return false;
  2773. }
  2774. bool TDataShard::CheckTxNeedWait(const TEvDataShard::TEvProposeTransaction::TPtr& ev) const {
  2775. if (CheckTxNeedWait()) {
  2776. return true;
  2777. }
  2778. auto* msg = ev->Get();
  2779. auto& rec = msg->Record;
  2780. if (rec.HasMvccSnapshot()) {
  2781. TRowVersion rowVersion(rec.GetMvccSnapshot().GetStep(), rec.GetMvccSnapshot().GetTxId());
  2782. TRowVersion unreadableEdge = Pipeline.GetUnreadableEdge(GetEnablePrioritizedMvccSnapshotReads());
  2783. if (rowVersion >= unreadableEdge) {
  2784. LOG_TRACE_S(*TlsActivationContext, NKikimrServices::TX_DATASHARD, "New transaction reads from " << rowVersion << " which is not before unreadable edge " << unreadableEdge);
  2785. LWTRACK(ProposeTransactionWaitSnapshot, msg->Orbit, rowVersion.Step, rowVersion.TxId);
  2786. return true;
  2787. }
  2788. }
  2789. return false;
  2790. }
  2791. bool TDataShard::CheckChangesQueueOverflow() const {
  2792. const auto* appData = AppData();
  2793. const auto sizeLimit = appData->DataShardConfig.GetChangesQueueItemsLimit();
  2794. const auto bytesLimit = appData->DataShardConfig.GetChangesQueueBytesLimit();
  2795. return ChangesQueue.size() >= sizeLimit || ChangesQueueBytes >= bytesLimit;
  2796. }
  2797. void TDataShard::CheckChangesQueueNoOverflow() {
  2798. if (OverloadSubscribersByReason[RejectReasonIndex(ERejectReason::ChangesQueueOverflow)]) {
  2799. const auto* appData = AppData();
  2800. const auto sizeLimit = appData->DataShardConfig.GetChangesQueueItemsLimit();
  2801. const auto bytesLimit = appData->DataShardConfig.GetChangesQueueBytesLimit();
  2802. if (ChangesQueue.size() < sizeLimit && ChangesQueueBytes < bytesLimit) {
  2803. NotifyOverloadSubscribers(ERejectReason::ChangesQueueOverflow);
  2804. }
  2805. }
  2806. }
  2807. void TDataShard::DoPeriodicTasks(const TActorContext &ctx) {
  2808. UpdateLagCounters(ctx);
  2809. UpdateChangeExchangeLag(ctx.Now());
  2810. UpdateTableStats(ctx);
  2811. SendPeriodicTableStats(ctx);
  2812. CollectCpuUsage(ctx);
  2813. if (CurrentKeySampler == EnabledKeySampler && ctx.Now() > StopKeyAccessSamplingAt) {
  2814. CurrentKeySampler = DisabledKeySampler;
  2815. LOG_NOTICE_S(ctx, NKikimrServices::TX_DATASHARD, "Stoped key access sampling at datashard: " << TabletID());
  2816. }
  2817. if (!PeriodicWakeupPending) {
  2818. PeriodicWakeupPending = true;
  2819. ctx.Schedule(TDuration::Seconds(5), new TEvPrivate::TEvPeriodicWakeup());
  2820. }
  2821. }
  2822. void TDataShard::DoPeriodicTasks(TEvPrivate::TEvPeriodicWakeup::TPtr&, const TActorContext &ctx) {
  2823. Y_ABORT_UNLESS(PeriodicWakeupPending, "Unexpected TEvPeriodicWakeup message");
  2824. PeriodicWakeupPending = false;
  2825. DoPeriodicTasks(ctx);
  2826. }
  2827. void TDataShard::UpdateLagCounters(const TActorContext &ctx) {
  2828. TDuration dataTxCompleteLag = GetDataTxCompleteLag();
  2829. TabletCounters->Simple()[COUNTER_TX_COMPLETE_LAG].Set(dataTxCompleteLag.MilliSeconds());
  2830. if (dataTxCompleteLag > TDuration::Minutes(5)) {
  2831. LOG_WARN_S(ctx, NKikimrServices::TX_DATASHARD,
  2832. "Tx completion lag (" << dataTxCompleteLag << ") is > 5 min on tablet "
  2833. << TabletID());
  2834. }
  2835. TDuration scanTxCompleteLag = GetScanTxCompleteLag();
  2836. TabletCounters->Simple()[COUNTER_SCAN_TX_COMPLETE_LAG].Set(scanTxCompleteLag.MilliSeconds());
  2837. if (scanTxCompleteLag > TDuration::Hours(1)) {
  2838. LOG_WARN_S(ctx, NKikimrServices::TX_DATASHARD,
  2839. "Scan completion lag (" << scanTxCompleteLag << ") is > 1 hour on tablet "
  2840. << TabletID());
  2841. }
  2842. }
  2843. void TDataShard::FillSplitTrajectory(ui64 origin, NKikimrTx::TBalanceTrackList& tracks) {
  2844. Y_UNUSED(origin);
  2845. Y_UNUSED(tracks);
  2846. }
  2847. THolder<TEvTxProcessing::TEvReadSet>
  2848. TDataShard::PrepareReadSet(ui64 step, ui64 txId, ui64 source, ui64 target,
  2849. const TString& body, ui64 seqno)
  2850. {
  2851. auto ev = MakeHolder<TEvTxProcessing::TEvReadSet>(step, txId, source, target, TabletID(), body, seqno);
  2852. if (source != TabletID())
  2853. FillSplitTrajectory(source, *ev->Record.MutableBalanceTrackList());
  2854. return ev;
  2855. }
  2856. THolder<TEvTxProcessing::TEvReadSet>
  2857. TDataShard::PrepareReadSetExpectation(ui64 step, ui64 txId, ui64 source, ui64 target)
  2858. {
  2859. // We want to notify the target that we expect a readset, there's no data and no ack needed so no seqno
  2860. auto ev = MakeHolder<TEvTxProcessing::TEvReadSet>(step, txId, source, target, TabletID());
  2861. ev->Record.SetFlags(
  2862. NKikimrTx::TEvReadSet::FLAG_EXPECT_READSET |
  2863. NKikimrTx::TEvReadSet::FLAG_NO_DATA |
  2864. NKikimrTx::TEvReadSet::FLAG_NO_ACK);
  2865. if (source != TabletID())
  2866. FillSplitTrajectory(source, *ev->Record.MutableBalanceTrackList());
  2867. return ev;
  2868. }
  2869. void TDataShard::SendReadSet(
  2870. const TActorContext& ctx,
  2871. THolder<TEvTxProcessing::TEvReadSet>&& rs)
  2872. {
  2873. ui64 txId = rs->Record.GetTxId();
  2874. ui64 source = rs->Record.GetTabletSource();
  2875. ui64 target = rs->Record.GetTabletDest();
  2876. LOG_DEBUG_S(ctx, NKikimrServices::TX_DATASHARD,
  2877. "Send RS at " << TabletID() << " from " << source << " to " << target << " txId " << txId);
  2878. IncCounter(COUNTER_READSET_SENT_COUNT);
  2879. IncCounter(COUNTER_READSET_SENT_SIZE, rs->Record.GetReadSet().size());
  2880. PipeClientCache->Send(ctx, target, rs.Release());
  2881. }
  2882. void TDataShard::SendReadSet(const TActorContext& ctx, ui64 step,
  2883. ui64 txId, ui64 source, ui64 target,
  2884. const TString& body, ui64 seqno)
  2885. {
  2886. auto ev = PrepareReadSet(step, txId, source, target, body, seqno);
  2887. SendReadSet(ctx, std::move(ev));
  2888. }
  2889. bool TDataShard::AddExpectation(ui64 target, ui64 step, ui64 txId) {
  2890. bool hadExpectations = OutReadSets.HasExpectations(target);
  2891. bool added = OutReadSets.AddExpectation(target, step, txId);
  2892. if (!hadExpectations) {
  2893. ResendReadSetPipeTracker.AttachTablet(Max<ui64>(), target);
  2894. }
  2895. return added;
  2896. }
  2897. bool TDataShard::RemoveExpectation(ui64 target, ui64 txId) {
  2898. bool removed = OutReadSets.RemoveExpectation(target, txId);
  2899. if (removed && !OutReadSets.HasExpectations(target)) {
  2900. auto ctx = ActorContext();
  2901. ResendReadSetPipeTracker.DetachTablet(Max<ui64>(), target, 0, ctx);
  2902. }
  2903. // progress one more tx to force delayed schema operations
  2904. if (removed && OutReadSets.Empty() && Pipeline.HasSchemaOperation()) {
  2905. // TODO: wait for empty OutRS in a separate unit?
  2906. auto ctx = ActorContext();
  2907. Pipeline.AddCandidateUnit(EExecutionUnitKind::PlanQueue);
  2908. PlanQueue.Progress(ctx);
  2909. }
  2910. return removed;
  2911. }
  2912. void TDataShard::SendReadSetExpectation(const TActorContext& ctx, ui64 step, ui64 txId,
  2913. ui64 source, ui64 target)
  2914. {
  2915. auto ev = PrepareReadSetExpectation(step, txId, source, target);
  2916. PipeClientCache->Send(ctx, target, ev.Release());
  2917. }
  2918. std::unique_ptr<IEventHandle> TDataShard::GenerateReadSetNoData(const TActorId& recipient, ui64 step, ui64 txId, ui64 source, ui64 target)
  2919. {
  2920. auto msg = std::make_unique<TEvTxProcessing::TEvReadSet>(step, txId, source, target, TabletID());
  2921. msg->Record.SetFlags(
  2922. NKikimrTx::TEvReadSet::FLAG_NO_DATA |
  2923. NKikimrTx::TEvReadSet::FLAG_NO_ACK);
  2924. if (source != TabletID()) {
  2925. FillSplitTrajectory(source, *msg->Record.MutableBalanceTrackList());
  2926. }
  2927. return std::make_unique<IEventHandle>(recipient, SelfId(), msg.release());
  2928. }
  2929. void TDataShard::SendReadSetNoData(const TActorContext& ctx, const TActorId& recipient, ui64 step, ui64 txId, ui64 source, ui64 target)
  2930. {
  2931. Y_UNUSED(ctx);
  2932. auto ev = GenerateReadSetNoData(recipient, step, txId, source, target);
  2933. struct TSendState : public TThrRefBase {
  2934. std::unique_ptr<IEventHandle> Event;
  2935. TSendState(std::unique_ptr<IEventHandle>&& event)
  2936. : Event(std::move(event))
  2937. { }
  2938. };
  2939. // FIXME: we can probably avoid lease confirmation here
  2940. Executor()->ConfirmReadOnlyLease(
  2941. [state = MakeIntrusive<TSendState>(std::move(ev))] {
  2942. TActivationContext::Send(std::move(state->Event));
  2943. });
  2944. }
  2945. bool TDataShard::ProcessReadSetExpectation(TEvTxProcessing::TEvReadSet::TPtr& ev) {
  2946. const auto& record = ev->Get()->Record;
  2947. // Check if we already have a pending readset from dest to source
  2948. TReadSetKey rsKey(record.GetTxId(), TabletID(), record.GetTabletDest(), record.GetTabletSource());
  2949. if (OutReadSets.Has(rsKey)) {
  2950. return true;
  2951. }
  2952. if (IsStateActive()) {
  2953. // When we have a pending op, remember that readset from dest to source is expected
  2954. if (auto op = Pipeline.FindOp(record.GetTxId())) {
  2955. auto key = std::make_pair(record.GetTabletDest(), record.GetTabletSource());
  2956. op->ExpectedReadSets()[key].push_back(ev->Sender);
  2957. return true;
  2958. }
  2959. }
  2960. // In all other cases we want to reply with no data
  2961. return false;
  2962. }
  2963. void TDataShard::SendReadSets(const TActorContext& ctx,
  2964. TVector<THolder<TEvTxProcessing::TEvReadSet>> &&readsets)
  2965. {
  2966. TPendingPipeTrackerCommands pendingPipeTrackerCommands;
  2967. for (auto &rs : readsets) {
  2968. ui64 target = rs->Record.GetTabletDest();
  2969. ui64 seqno = rs->Record.GetSeqno();
  2970. pendingPipeTrackerCommands.AttachTablet(seqno, target);
  2971. SendReadSet(ctx, std::move(rs));
  2972. }
  2973. pendingPipeTrackerCommands.Apply(ResendReadSetPipeTracker, ctx);
  2974. readsets.clear();
  2975. }
  2976. void TDataShard::ResendReadSet(const TActorContext& ctx, ui64 step, ui64 txId, ui64 source, ui64 target,
  2977. const TString& body, ui64 seqNo)
  2978. {
  2979. LOG_INFO_S(ctx, NKikimrServices::TX_DATASHARD,
  2980. "Resend RS at " << TabletID() << " from " << source << " to " << target << " txId " << txId);
  2981. SendReadSet(ctx, step, txId, source, target, body, seqNo);
  2982. ResendReadSetPipeTracker.AttachTablet(seqNo, target);
  2983. }
  2984. void TDataShard::UpdateLastSchemeOpSeqNo(const TSchemeOpSeqNo &newSeqNo,
  2985. TTransactionContext &txc)
  2986. {
  2987. NIceDb::TNiceDb db(txc.DB);
  2988. if (LastSchemeOpSeqNo < newSeqNo) {
  2989. LastSchemeOpSeqNo = newSeqNo;
  2990. PersistSys(db, Schema::Sys_LastSchemeShardGeneration, LastSchemeOpSeqNo.Generation);
  2991. PersistSys(db, Schema::Sys_LastSchemeShardRound, LastSchemeOpSeqNo.Round);
  2992. }
  2993. }
  2994. void TDataShard::ResetLastSchemeOpSeqNo(TTransactionContext &txc)
  2995. {
  2996. NIceDb::TNiceDb db(txc.DB);
  2997. LastSchemeOpSeqNo = TSchemeOpSeqNo();
  2998. PersistSys(db, Schema::Sys_LastSchemeShardGeneration, LastSchemeOpSeqNo.Generation);
  2999. PersistSys(db, Schema::Sys_LastSchemeShardRound, LastSchemeOpSeqNo.Round);
  3000. }
  3001. void TDataShard::PersistProcessingParams(const NKikimrSubDomains::TProcessingParams &params,
  3002. NTabletFlatExecutor::TTransactionContext &txc)
  3003. {
  3004. NIceDb::TNiceDb db(txc.DB);
  3005. ProcessingParams.reset(new NKikimrSubDomains::TProcessingParams());
  3006. ProcessingParams->CopyFrom(params);
  3007. PersistSys(db, TDataShard::Schema::Sys_SubDomainInfo,
  3008. ProcessingParams->SerializeAsString());
  3009. }
  3010. void TDataShard::PersistCurrentSchemeShardId(ui64 id,
  3011. NTabletFlatExecutor::TTransactionContext &txc)
  3012. {
  3013. NIceDb::TNiceDb db(txc.DB);
  3014. CurrentSchemeShardId = id;
  3015. PersistSys(db, TDataShard::Schema::Sys_CurrentSchemeShardId, id);
  3016. }
  3017. void TDataShard::PersistSubDomainPathId(ui64 ownerId, ui64 localPathId,
  3018. NTabletFlatExecutor::TTransactionContext &txc)
  3019. {
  3020. NIceDb::TNiceDb db(txc.DB);
  3021. SubDomainPathId.emplace(ownerId, localPathId);
  3022. PersistSys(db, Schema::Sys_SubDomainOwnerId, ownerId);
  3023. PersistSys(db, Schema::Sys_SubDomainLocalPathId, localPathId);
  3024. }
  3025. void TDataShard::PersistOwnerPathId(ui64 id,
  3026. NTabletFlatExecutor::TTransactionContext &txc)
  3027. {
  3028. NIceDb::TNiceDb db(txc.DB);
  3029. PathOwnerId = id;
  3030. PersistSys(db, TDataShard::Schema::Sys_PathOwnerId, id);
  3031. }
  3032. void TDataShard::ResolveTablePath(const TActorContext &ctx)
  3033. {
  3034. if (State != TShardState::Ready)
  3035. return;
  3036. for (auto& [pathId, info] : TableInfos) {
  3037. TString reason = "empty path";
  3038. if (info->Path) {
  3039. NKikimrSchemeOp::TTableDescription desc;
  3040. info->GetSchema(desc);
  3041. if (desc.GetName() == ExtractBase(desc.GetPath())) {
  3042. continue;
  3043. }
  3044. reason = "buggy path";
  3045. }
  3046. LOG_DEBUG_S(ctx, NKikimrServices::TX_DATASHARD, "Resolve path at " << TabletID()
  3047. << ": reason# " << reason);
  3048. if (!TableResolvePipe) {
  3049. NTabletPipe::TClientConfig clientConfig;
  3050. clientConfig.RetryPolicy = SchemeShardPipeRetryPolicy;
  3051. TableResolvePipe = ctx.Register(NTabletPipe::CreateClient(ctx.SelfID, CurrentSchemeShardId, clientConfig));
  3052. }
  3053. auto event = MakeHolder<TEvSchemeShard::TEvDescribeScheme>(PathOwnerId, pathId);
  3054. event->Record.MutableOptions()->SetReturnPartitioningInfo(false);
  3055. event->Record.MutableOptions()->SetReturnPartitionConfig(false);
  3056. event->Record.MutableOptions()->SetReturnChildren(false);
  3057. NTabletPipe::SendData(ctx, TableResolvePipe, event.Release());
  3058. }
  3059. }
  3060. void TDataShard::SerializeHistogram(const TUserTable &tinfo,
  3061. const NTable::THistogram &histogram,
  3062. NKikimrTxDataShard::TEvGetDataHistogramResponse::THistogram &hist)
  3063. {
  3064. for (auto &item : histogram) {
  3065. auto &rec = *hist.AddItems();
  3066. rec.SetValue(item.Value);
  3067. TSerializedCellVec key(item.EndKey);
  3068. for (ui32 ki = 0; ki < tinfo.KeyColumnIds.size(); ++ki) {
  3069. DbgPrintValue(*rec.AddKeyValues(), key.GetCells()[ki], tinfo.KeyColumnTypes[ki]);
  3070. }
  3071. }
  3072. }
  3073. void TDataShard::SerializeKeySample(const TUserTable &tinfo,
  3074. const NTable::TKeyAccessSample &keySample,
  3075. NKikimrTxDataShard::TEvGetDataHistogramResponse::THistogram &hist)
  3076. {
  3077. THashMap<TString, ui64> accessCounts;
  3078. for (auto &key : keySample.GetSample()) {
  3079. accessCounts[key.first]++;
  3080. // TODO: count access kinds separately
  3081. }
  3082. for (auto &item : accessCounts) {
  3083. auto &rec = *hist.AddItems();
  3084. rec.SetValue(item.second);
  3085. TSerializedCellVec key(item.first);
  3086. for (ui32 ki = 0; ki < tinfo.KeyColumnIds.size() && ki < key.GetCells().size(); ++ki) {
  3087. DbgPrintValue(*rec.AddKeyValues(), key.GetCells()[ki], tinfo.KeyColumnTypes[ki]);
  3088. }
  3089. }
  3090. Sort(hist.MutableItems()->begin(), hist.MutableItems()->end(),
  3091. [] (const auto& a, const auto& b) { return a.GetValue() > b.GetValue(); });
  3092. }
  3093. void TDataShard::Handle(TEvSchemeShard::TEvDescribeSchemeResult::TPtr ev, const TActorContext &ctx) {
  3094. const auto &rec = ev->Get()->GetRecord();
  3095. LOG_DEBUG_S(ctx, NKikimrServices::TX_DATASHARD,
  3096. "Got scheme resolve result at " << TabletID() << ": "
  3097. << rec.ShortDebugString());
  3098. ui64 pathId = rec.GetPathId();
  3099. if (!TableInfos.contains(pathId)) {
  3100. LOG_ERROR_S(ctx, NKikimrServices::TX_DATASHARD,
  3101. "Shard " << TabletID() << " got describe result for unknown table "
  3102. << pathId);
  3103. return;
  3104. }
  3105. if (!rec.GetPath()) {
  3106. LOG_CRIT_S(ctx, NKikimrServices::TX_DATASHARD,
  3107. "Shard " << TabletID() << " couldn't get path for table "
  3108. << pathId << " with status " << rec.GetStatus());
  3109. return;
  3110. }
  3111. Execute(new TTxStoreTablePath(this, pathId, rec.GetPath()), ctx);
  3112. }
  3113. void TDataShard::Handle(TEvDataShard::TEvGetInfoRequest::TPtr &ev,
  3114. const TActorContext &ctx)
  3115. {
  3116. Execute(CreateTxGetInfo(this, ev), ctx);
  3117. }
  3118. void TDataShard::Handle(TEvDataShard::TEvListOperationsRequest::TPtr &ev,
  3119. const TActorContext &ctx)
  3120. {
  3121. Execute(CreateTxListOperations(this, ev), ctx);
  3122. }
  3123. void TDataShard::Handle(TEvDataShard::TEvGetOperationRequest::TPtr &ev,
  3124. const TActorContext &ctx)
  3125. {
  3126. Execute(CreateTxGetOperation(this, ev), ctx);
  3127. }
  3128. void TDataShard::Handle(TEvDataShard::TEvGetDataHistogramRequest::TPtr &ev,
  3129. const TActorContext &ctx)
  3130. {
  3131. auto *response = new TEvDataShard::TEvGetDataHistogramResponse;
  3132. response->Record.MutableStatus()->SetCode(Ydb::StatusIds::SUCCESS);
  3133. const auto& rec = ev->Get()->Record;
  3134. if (rec.GetCollectKeySampleMs() > 0) {
  3135. EnableKeyAccessSampling(ctx,
  3136. AppData(ctx)->TimeProvider->Now() + TDuration::MilliSeconds(rec.GetCollectKeySampleMs()));
  3137. }
  3138. if (rec.GetActualData()) {
  3139. if (CurrentKeySampler == DisabledKeySampler) {
  3140. // datashard stores expired stats
  3141. ctx.Send(ev->Sender, response);
  3142. return;
  3143. }
  3144. }
  3145. for (const auto &pr : TableInfos) {
  3146. const auto &tinfo = *pr.second;
  3147. const NTable::TStats &stats = tinfo.Stats.DataStats;
  3148. auto &hist = *response->Record.AddTableHistograms();
  3149. hist.SetTableName(pr.second->Name);
  3150. for (ui32 ki : tinfo.KeyColumnIds)
  3151. hist.AddKeyNames(tinfo.Columns.FindPtr(ki)->Name);
  3152. SerializeHistogram(tinfo, stats.DataSizeHistogram, *hist.MutableSizeHistogram());
  3153. SerializeHistogram(tinfo, stats.RowCountHistogram, *hist.MutableCountHistogram());
  3154. SerializeKeySample(tinfo, tinfo.Stats.AccessStats, *hist.MutableKeyAccessSample());
  3155. }
  3156. ctx.Send(ev->Sender, response);
  3157. }
  3158. void TDataShard::Handle(TEvDataShard::TEvGetReadTableSinkStateRequest::TPtr &ev,
  3159. const TActorContext &ctx)
  3160. {
  3161. ui64 txId = ev->Get()->Record.GetTxId();
  3162. auto op = Pipeline.FindOp(txId);
  3163. if (!op) {
  3164. auto *response = new TEvDataShard::TEvGetReadTableSinkStateResponse;
  3165. SetStatusError(response->Record, Ydb::StatusIds::NOT_FOUND,
  3166. TStringBuilder() << "Cannot find operation "
  3167. << txId << " on shard " << TabletID());
  3168. ctx.Send(ev->Sender, response);
  3169. return;
  3170. }
  3171. if (op->GetKind() != EOperationKind::ReadTable) {
  3172. auto *response = new TEvDataShard::TEvGetReadTableSinkStateResponse;
  3173. SetStatusError(response->Record, Ydb::StatusIds::BAD_REQUEST,
  3174. TStringBuilder() << "Cannot get sink state for tx of kind "
  3175. << op->GetKind());
  3176. ctx.Send(ev->Sender, response);
  3177. return;
  3178. }
  3179. TActiveTransaction *tx = dynamic_cast<TActiveTransaction*>(op.Get());
  3180. Y_VERIFY_S(tx, "cannot cast operation of kind " << op->GetKind());
  3181. ctx.Send(ev->Forward(tx->GetStreamSink()));
  3182. }
  3183. void TDataShard::Handle(TEvDataShard::TEvGetReadTableScanStateRequest::TPtr &ev,
  3184. const TActorContext &ctx)
  3185. {
  3186. ui64 txId = ev->Get()->Record.GetTxId();
  3187. auto op = Pipeline.FindOp(txId);
  3188. if (!op) {
  3189. auto *response = new TEvDataShard::TEvGetReadTableScanStateResponse;
  3190. SetStatusError(response->Record, Ydb::StatusIds::NOT_FOUND,
  3191. TStringBuilder() << "Cannot find operation "
  3192. << txId << " on shard " << TabletID());
  3193. ctx.Send(ev->Sender, response);
  3194. return;
  3195. }
  3196. if (op->GetKind() != EOperationKind::ReadTable) {
  3197. auto *response = new TEvDataShard::TEvGetReadTableScanStateResponse;
  3198. SetStatusError(response->Record, Ydb::StatusIds::BAD_REQUEST,
  3199. TStringBuilder() << "Cannot get scan state for tx of kind "
  3200. << op->GetKind());
  3201. ctx.Send(ev->Sender, response);
  3202. return;
  3203. }
  3204. TActiveTransaction *tx = dynamic_cast<TActiveTransaction*>(op.Get());
  3205. Y_VERIFY_S(tx, "cannot cast operation of kind " << op->GetKind());
  3206. ctx.Send(ev->Forward(tx->GetStreamSink()));
  3207. if (!tx->GetScanActor()) {
  3208. auto *response = new TEvDataShard::TEvGetReadTableScanStateResponse;
  3209. SetStatusError(response->Record, Ydb::StatusIds::GENERIC_ERROR,
  3210. TStringBuilder() << "Operation has no registered scan actor");
  3211. ctx.Send(ev->Sender, response);
  3212. return;
  3213. }
  3214. ctx.Send(ev->Forward(tx->GetScanActor()));
  3215. }
  3216. void TDataShard::Handle(TEvDataShard::TEvGetReadTableStreamStateRequest::TPtr &ev,
  3217. const TActorContext &ctx)
  3218. {
  3219. ui64 txId = ev->Get()->Record.GetTxId();
  3220. auto op = Pipeline.FindOp(txId);
  3221. if (!op) {
  3222. auto *response = new TEvDataShard::TEvGetReadTableStreamStateResponse;
  3223. SetStatusError(response->Record, Ydb::StatusIds::NOT_FOUND,
  3224. TStringBuilder() << "Cannot find operation "
  3225. << txId << " on shard " << TabletID());
  3226. ctx.Send(ev->Sender, response);
  3227. return;
  3228. }
  3229. if (op->GetKind() != EOperationKind::ReadTable) {
  3230. auto *response = new TEvDataShard::TEvGetReadTableStreamStateResponse;
  3231. SetStatusError(response->Record, Ydb::StatusIds::BAD_REQUEST,
  3232. TStringBuilder() << "Cannot get stream state for tx of kind "
  3233. << op->GetKind());
  3234. ctx.Send(ev->Sender, response);
  3235. return;
  3236. }
  3237. TActiveTransaction *tx = dynamic_cast<TActiveTransaction*>(op.Get());
  3238. Y_VERIFY_S(tx, "cannot cast operation of kind " << op->GetKind());
  3239. ctx.Send(ev->Forward(tx->GetStreamSink()));
  3240. }
  3241. void TDataShard::Handle(TEvDataShard::TEvGetRSInfoRequest::TPtr &ev,
  3242. const TActorContext &ctx)
  3243. {
  3244. auto *response = new TEvDataShard::TEvGetRSInfoResponse;
  3245. response->Record.MutableStatus()->SetCode(Ydb::StatusIds::SUCCESS);
  3246. for (auto &pr : OutReadSets.CurrentReadSets) {
  3247. auto &rs = *response->Record.AddOutReadSets();
  3248. rs.SetTxId(pr.second.TxId);
  3249. rs.SetOrigin(pr.second.Origin);
  3250. rs.SetSource(pr.second.From);
  3251. rs.SetDestination(pr.second.To);
  3252. rs.SetSeqNo(pr.first);
  3253. }
  3254. for (auto &p : OutReadSets.ReadSetAcks) {
  3255. auto &rec = p->Record;
  3256. auto &ack = *response->Record.AddOutRSAcks();
  3257. ack.SetTxId(rec.GetTxId());
  3258. ack.SetStep(rec.GetStep());
  3259. ack.SetOrigin(rec.GetTabletConsumer());
  3260. ack.SetSource(rec.GetTabletSource());
  3261. ack.SetDestination(rec.GetTabletDest());
  3262. ack.SetSeqNo(rec.GetSeqno());
  3263. }
  3264. for (auto &pr : Pipeline.GetDelayedAcks()) {
  3265. for (auto &ack : pr.second) {
  3266. auto *ev = ack->CastAsLocal<TEvTxProcessing::TEvReadSetAck>();
  3267. if (ev) {
  3268. auto &rec = ev->Record;
  3269. auto &ack = *response->Record.AddDelayedRSAcks();
  3270. ack.SetTxId(rec.GetTxId());
  3271. ack.SetStep(rec.GetStep());
  3272. ack.SetOrigin(rec.GetTabletConsumer());
  3273. ack.SetSource(rec.GetTabletSource());
  3274. ack.SetDestination(rec.GetTabletDest());
  3275. ack.SetSeqNo(rec.GetSeqno());
  3276. }
  3277. }
  3278. }
  3279. ctx.Send(ev->Sender, response);
  3280. }
  3281. void TDataShard::Handle(TEvDataShard::TEvGetSlowOpProfilesRequest::TPtr &ev,
  3282. const TActorContext &ctx)
  3283. {
  3284. auto *response = new TEvDataShard::TEvGetSlowOpProfilesResponse;
  3285. response->Record.MutableStatus()->SetCode(Ydb::StatusIds::SUCCESS);
  3286. Pipeline.FillStoredExecutionProfiles(response->Record);
  3287. ctx.Send(ev->Sender, response);
  3288. }
  3289. void TDataShard::Handle(TEvDataShard::TEvRefreshVolatileSnapshotRequest::TPtr& ev, const TActorContext& ctx) {
  3290. Execute(new TTxRefreshVolatileSnapshot(this, std::move(ev)), ctx);
  3291. }
  3292. void TDataShard::Handle(TEvDataShard::TEvDiscardVolatileSnapshotRequest::TPtr& ev, const TActorContext& ctx) {
  3293. Execute(new TTxDiscardVolatileSnapshot(this, std::move(ev)), ctx);
  3294. }
  3295. void TDataShard::Handle(TEvents::TEvUndelivered::TPtr &ev,
  3296. const TActorContext &ctx)
  3297. {
  3298. auto op = Pipeline.FindOp(ev->Cookie);
  3299. if (op) {
  3300. op->AddInputEvent(ev.Release());
  3301. Pipeline.AddCandidateOp(op);
  3302. PlanQueue.Progress(ctx);
  3303. return;
  3304. }
  3305. switch (ev->Get()->SourceType) {
  3306. case TEvents::TEvSubscribe::EventType:
  3307. ReadIteratorsOnNodeDisconnected(ev->Sender, ctx);
  3308. break;
  3309. default:
  3310. ;
  3311. }
  3312. }
  3313. void TDataShard::Handle(TEvInterconnect::TEvNodeDisconnected::TPtr &ev,
  3314. const TActorContext &ctx)
  3315. {
  3316. const ui32 nodeId = ev->Get()->NodeId;
  3317. LOG_NOTICE_S(ctx, NKikimrServices::TX_DATASHARD,
  3318. "Shard " << TabletID() << " disconnected from node " << nodeId);
  3319. Pipeline.ProcessDisconnected(nodeId);
  3320. PlanQueue.Progress(ctx);
  3321. ReadIteratorsOnNodeDisconnected(ev->Sender, ctx);
  3322. }
  3323. void TDataShard::Handle(TEvDataShard::TEvMigrateSchemeShardRequest::TPtr& ev,
  3324. const TActorContext& ctx)
  3325. {
  3326. Execute(new TTxMigrateSchemeShard(this, ev), ctx);
  3327. }
  3328. void TDataShard::Handle(TEvDataShard::TEvCancelBackup::TPtr& ev, const TActorContext& ctx)
  3329. {
  3330. TOperation::TPtr op = Pipeline.FindOp(ev->Get()->Record.GetBackupTxId());
  3331. if (op) {
  3332. ForwardEventToOperation(ev, op, ctx);
  3333. }
  3334. }
  3335. void TDataShard::Handle(TEvDataShard::TEvCancelRestore::TPtr& ev, const TActorContext& ctx)
  3336. {
  3337. TOperation::TPtr op = Pipeline.FindOp(ev->Get()->Record.GetRestoreTxId());
  3338. if (op) {
  3339. ForwardEventToOperation(ev, op, ctx);
  3340. }
  3341. }
  3342. void TDataShard::Handle(TEvDataShard::TEvGetS3Upload::TPtr& ev, const TActorContext& ctx)
  3343. {
  3344. Execute(new TTxGetS3Upload(this, ev), ctx);
  3345. }
  3346. void TDataShard::Handle(TEvDataShard::TEvStoreS3UploadId::TPtr& ev, const TActorContext& ctx)
  3347. {
  3348. Execute(new TTxStoreS3UploadId(this, ev), ctx);
  3349. }
  3350. void TDataShard::Handle(TEvDataShard::TEvChangeS3UploadStatus::TPtr& ev, const TActorContext& ctx)
  3351. {
  3352. Execute(new TTxChangeS3UploadStatus(this, ev), ctx);
  3353. }
  3354. void TDataShard::Handle(TEvDataShard::TEvGetS3DownloadInfo::TPtr& ev, const TActorContext& ctx)
  3355. {
  3356. Execute(new TTxGetS3DownloadInfo(this, ev), ctx);
  3357. }
  3358. void TDataShard::Handle(TEvDataShard::TEvStoreS3DownloadInfo::TPtr& ev, const TActorContext& ctx)
  3359. {
  3360. Execute(new TTxStoreS3DownloadInfo(this, ev), ctx);
  3361. }
  3362. void TDataShard::Handle(TEvDataShard::TEvS3UploadRowsRequest::TPtr& ev, const TActorContext& ctx)
  3363. {
  3364. const float rejectProbabilty = Executor()->GetRejectProbability();
  3365. if (rejectProbabilty > 0) {
  3366. const float rnd = AppData(ctx)->RandomProvider->GenRandReal2();
  3367. if (rnd < rejectProbabilty) {
  3368. DelayedS3UploadRows.emplace_back().Reset(ev.Release());
  3369. IncCounter(COUNTER_BULK_UPSERT_OVERLOADED);
  3370. return;
  3371. }
  3372. }
  3373. Execute(new TTxS3UploadRows(this, ev), ctx);
  3374. }
  3375. void TDataShard::ScanComplete(NTable::EAbort,
  3376. TAutoPtr<IDestructable> prod,
  3377. ui64 cookie,
  3378. const TActorContext &ctx)
  3379. {
  3380. if (auto* noTxScan = dynamic_cast<INoTxScan*>(prod.Get())) {
  3381. LOG_DEBUG_S(ctx, NKikimrServices::TX_DATASHARD, "Non-transactinal scan complete at "
  3382. << TabletID());
  3383. noTxScan->OnFinished(this);
  3384. prod.Destroy();
  3385. } else {
  3386. LOG_DEBUG_S(ctx, NKikimrServices::TX_DATASHARD,
  3387. "FullScan complete at " << TabletID());
  3388. auto op = Pipeline.FindOp(cookie);
  3389. if (op) {
  3390. LOG_DEBUG_S(ctx, NKikimrServices::TX_DATASHARD, "Found op"
  3391. << ": cookie: " << cookie
  3392. << ", at: "<< TabletID());
  3393. if (op->IsWaitingForScan()) {
  3394. op->SetScanResult(prod);
  3395. Pipeline.AddCandidateOp(op);
  3396. }
  3397. } else {
  3398. if (InFlightCondErase && InFlightCondErase.TxId == cookie) {
  3399. LOG_DEBUG_S(ctx, NKikimrServices::TX_DATASHARD, "Conditional erase complete"
  3400. << ": cookie: " << cookie
  3401. << ", at: "<< TabletID());
  3402. InFlightCondErase.Clear();
  3403. } else if (CdcStreamScanManager.Has(cookie)) {
  3404. LOG_DEBUG_S(ctx, NKikimrServices::TX_DATASHARD, "Cdc stream scan complete"
  3405. << ": cookie: " << cookie
  3406. << ", at: "<< TabletID());
  3407. CdcStreamScanManager.Complete(cookie);
  3408. } else if (!Pipeline.FinishStreamingTx(cookie)) {
  3409. LOG_ERROR_S(ctx, NKikimrServices::TX_DATASHARD,
  3410. "Scan complete at " << TabletID() << " for unknown tx " << cookie);
  3411. }
  3412. }
  3413. }
  3414. // Continue current Tx
  3415. PlanQueue.Progress(ctx);
  3416. }
  3417. void TDataShard::Handle(TEvPrivate::TEvAsyncJobComplete::TPtr &ev, const TActorContext &ctx) {
  3418. LOG_DEBUG_S(ctx, NKikimrServices::TX_DATASHARD, "AsyncJob complete"
  3419. << " at " << TabletID());
  3420. auto op = Pipeline.FindOp(ev->Cookie);
  3421. if (op) {
  3422. LOG_DEBUG_S(ctx, NKikimrServices::TX_DATASHARD, "Found op"
  3423. << " at "<< TabletID()
  3424. << " cookie " << ev->Cookie);
  3425. if (op->IsWaitingForAsyncJob()) {
  3426. op->SetAsyncJobResult(ev->Get()->Prod);
  3427. Pipeline.AddCandidateOp(op);
  3428. }
  3429. } else {
  3430. LOG_ERROR_S(ctx, NKikimrServices::TX_DATASHARD, "AsyncJob complete"
  3431. << " at " << TabletID()
  3432. << " for unknown tx " << ev->Cookie);
  3433. }
  3434. // Continue current Tx
  3435. PlanQueue.Progress(ctx);
  3436. }
  3437. void TDataShard::Handle(TEvPrivate::TEvRestartOperation::TPtr &ev, const TActorContext &ctx) {
  3438. const auto txId = ev->Get()->TxId;
  3439. if (auto op = Pipeline.FindOp(txId)) {
  3440. LOG_DEBUG_S(ctx, NKikimrServices::TX_DATASHARD, "Restart op: " << txId
  3441. << " at " << TabletID());
  3442. if (op->IsWaitingForRestart()) {
  3443. op->ResetWaitingForRestartFlag();
  3444. Pipeline.AddCandidateOp(op);
  3445. }
  3446. }
  3447. // Continue current Tx
  3448. PlanQueue.Progress(ctx);
  3449. }
  3450. bool TDataShard::ReassignChannelsEnabled() const {
  3451. return true;
  3452. }
  3453. void TDataShard::ExecuteProgressTx(const TActorContext& ctx) {
  3454. Execute(new TTxProgressTransaction(this), ctx);
  3455. }
  3456. void TDataShard::ExecuteProgressTx(TOperation::TPtr op, const TActorContext& ctx) {
  3457. Y_ABORT_UNLESS(op->IsInProgress());
  3458. Execute(new TTxProgressTransaction(this, std::move(op)), ctx);
  3459. }
  3460. TDuration TDataShard::CleanupTimeout() const {
  3461. const TDuration pipelineTimeout = Pipeline.CleanupTimeout();
  3462. const TDuration snapshotTimeout = SnapshotManager.CleanupTimeout();
  3463. const TDuration minTimeout = TDuration::Seconds(1);
  3464. const TDuration maxTimeout = TDuration::MilliSeconds(DefaultTxStepDeadline() / 2);
  3465. return Max(minTimeout, Min(pipelineTimeout, snapshotTimeout, maxTimeout));
  3466. }
  3467. class TDataShard::TTxGetRemovedRowVersions : public NTabletFlatExecutor::TTransactionBase<TDataShard> {
  3468. public:
  3469. TTxGetRemovedRowVersions(TDataShard* self, TEvDataShard::TEvGetRemovedRowVersions::TPtr&& ev)
  3470. : TTransactionBase(self)
  3471. , Ev(std::move(ev))
  3472. { }
  3473. bool Execute(TTransactionContext& txc, const TActorContext&) override {
  3474. auto pathId = Ev->Get()->PathId;
  3475. auto it = pathId ? Self->GetUserTables().find(pathId.LocalPathId) : Self->GetUserTables().begin();
  3476. Y_ABORT_UNLESS(it != Self->GetUserTables().end());
  3477. Reply = MakeHolder<TEvDataShard::TEvGetRemovedRowVersionsResult>(txc.DB.GetRemovedRowVersions(it->second->LocalTid));
  3478. return true;
  3479. }
  3480. void Complete(const TActorContext& ctx) override {
  3481. ctx.Send(Ev->Sender, Reply.Release(), 0, Ev->Cookie);
  3482. }
  3483. private:
  3484. TEvDataShard::TEvGetRemovedRowVersions::TPtr Ev;
  3485. THolder<TEvDataShard::TEvGetRemovedRowVersionsResult> Reply;
  3486. };
  3487. void TDataShard::Handle(TEvDataShard::TEvGetRemovedRowVersions::TPtr& ev, const TActorContext& ctx) {
  3488. Execute(new TTxGetRemovedRowVersions(this, std::move(ev)), ctx);
  3489. }
  3490. void SendViaSession(const TActorId& sessionId,
  3491. const TActorId& target,
  3492. const TActorId& src,
  3493. IEventBase* event,
  3494. ui32 flags,
  3495. ui64 cookie)
  3496. {
  3497. THolder<IEventHandle> ev = MakeHolder<IEventHandle>(target, src, event, flags, cookie);
  3498. if (sessionId) {
  3499. ev->Rewrite(TEvInterconnect::EvForward, sessionId);
  3500. }
  3501. TActivationContext::Send(ev.Release());
  3502. }
  3503. class TBreakWriteConflictsTxObserver : public NTable::ITransactionObserver {
  3504. friend class TBreakWriteConflictsTxObserverVolatileDependenciesGuard;
  3505. public:
  3506. TBreakWriteConflictsTxObserver(TDataShard* self)
  3507. : Self(self)
  3508. {
  3509. }
  3510. void OnSkipUncommitted(ui64 txId) override {
  3511. Y_ABORT_UNLESS(VolatileDependencies);
  3512. Self->BreakWriteConflict(txId, *VolatileDependencies);
  3513. }
  3514. void OnSkipCommitted(const TRowVersion&) override {
  3515. // nothing
  3516. }
  3517. void OnSkipCommitted(const TRowVersion&, ui64) override {
  3518. // nothing
  3519. }
  3520. void OnApplyCommitted(const TRowVersion&) override {
  3521. // nothing
  3522. }
  3523. void OnApplyCommitted(const TRowVersion&, ui64) override {
  3524. // nothing
  3525. }
  3526. private:
  3527. TDataShard* Self;
  3528. absl::flat_hash_set<ui64>* VolatileDependencies = nullptr;
  3529. };
  3530. class TBreakWriteConflictsTxObserverVolatileDependenciesGuard {
  3531. public:
  3532. TBreakWriteConflictsTxObserverVolatileDependenciesGuard(
  3533. TBreakWriteConflictsTxObserver* observer,
  3534. absl::flat_hash_set<ui64>& volatileDependencies)
  3535. : Observer(observer)
  3536. {
  3537. Y_ABORT_UNLESS(!Observer->VolatileDependencies);
  3538. Observer->VolatileDependencies = &volatileDependencies;
  3539. }
  3540. ~TBreakWriteConflictsTxObserverVolatileDependenciesGuard() {
  3541. Observer->VolatileDependencies = nullptr;
  3542. }
  3543. private:
  3544. TBreakWriteConflictsTxObserver* const Observer;
  3545. };
  3546. bool TDataShard::BreakWriteConflicts(NTable::TDatabase& db, const TTableId& tableId,
  3547. TArrayRef<const TCell> keyCells, absl::flat_hash_set<ui64>& volatileDependencies)
  3548. {
  3549. const auto localTid = GetLocalTableId(tableId);
  3550. Y_ABORT_UNLESS(localTid);
  3551. if (auto* cached = GetConflictsCache().GetTableCache(localTid).FindUncommittedWrites(keyCells)) {
  3552. for (ui64 txId : *cached) {
  3553. BreakWriteConflict(txId, volatileDependencies);
  3554. }
  3555. return true;
  3556. }
  3557. if (!BreakWriteConflictsTxObserver) {
  3558. BreakWriteConflictsTxObserver = new TBreakWriteConflictsTxObserver(this);
  3559. }
  3560. TBreakWriteConflictsTxObserverVolatileDependenciesGuard guard(
  3561. static_cast<TBreakWriteConflictsTxObserver*>(BreakWriteConflictsTxObserver.Get()),
  3562. volatileDependencies);
  3563. // We are not actually interested in the row version, we only need to
  3564. // detect uncommitted transaction skips on the path to that version.
  3565. auto res = db.SelectRowVersion(
  3566. localTid, keyCells, /* readFlags */ 0,
  3567. nullptr,
  3568. BreakWriteConflictsTxObserver);
  3569. if (res.Ready == NTable::EReady::Page) {
  3570. return false;
  3571. }
  3572. return true;
  3573. }
  3574. void TDataShard::BreakWriteConflict(ui64 txId, absl::flat_hash_set<ui64>& volatileDependencies) {
  3575. if (auto* info = GetVolatileTxManager().FindByCommitTxId(txId)) {
  3576. if (info->State != EVolatileTxState::Aborting) {
  3577. volatileDependencies.insert(txId);
  3578. }
  3579. } else {
  3580. SysLocksTable().BreakLock(txId);
  3581. }
  3582. }
  3583. class TDataShard::TTxGetOpenTxs : public NTabletFlatExecutor::TTransactionBase<TDataShard> {
  3584. public:
  3585. TTxGetOpenTxs(TDataShard* self, TEvDataShard::TEvGetOpenTxs::TPtr&& ev)
  3586. : TTransactionBase(self)
  3587. , Ev(std::move(ev))
  3588. { }
  3589. bool Execute(TTransactionContext& txc, const TActorContext&) override {
  3590. auto pathId = Ev->Get()->PathId;
  3591. auto it = pathId ? Self->GetUserTables().find(pathId.LocalPathId) : Self->GetUserTables().begin();
  3592. Y_ABORT_UNLESS(it != Self->GetUserTables().end());
  3593. auto openTxs = txc.DB.GetOpenTxs(it->second->LocalTid);
  3594. Reply = MakeHolder<TEvDataShard::TEvGetOpenTxsResult>(pathId, std::move(openTxs));
  3595. return true;
  3596. }
  3597. void Complete(const TActorContext& ctx) override {
  3598. ctx.Send(Ev->Sender, Reply.Release(), 0, Ev->Cookie);
  3599. }
  3600. private:
  3601. TEvDataShard::TEvGetOpenTxs::TPtr Ev;
  3602. THolder<TEvDataShard::TEvGetOpenTxsResult> Reply;
  3603. };
  3604. void TDataShard::Handle(TEvDataShard::TEvGetOpenTxs::TPtr& ev, const TActorContext& ctx) {
  3605. Execute(new TTxGetOpenTxs(this, std::move(ev)), ctx);
  3606. }
  3607. void TDataShard::Handle(TEvTxUserProxy::TEvAllocateTxIdResult::TPtr& ev, const TActorContext& ctx) {
  3608. auto op = Pipeline.FindOp(ev->Cookie);
  3609. if (op && op->HasWaitingForGlobalTxIdFlag()) {
  3610. Pipeline.ProvideGlobalTxId(op, ev->Get()->TxId);
  3611. Pipeline.AddCandidateOp(op);
  3612. PlanQueue.Progress(ctx);
  3613. }
  3614. }
  3615. } // NDataShard
  3616. TString TEvDataShard::TEvRead::ToString() const {
  3617. TStringStream ss;
  3618. ss << TBase::ToString();
  3619. if (!Keys.empty()) {
  3620. ss << " KeysSize: " << Keys.size();
  3621. }
  3622. if (!Ranges.empty()) {
  3623. ss << " RangesSize: " << Ranges.size();
  3624. }
  3625. return ss.Str();
  3626. }
  3627. NActors::IEventBase* TEvDataShard::TEvRead::Load(TEventSerializedData* data) {
  3628. auto* base = TBase::Load(data);
  3629. auto* event = static_cast<TEvRead*>(base);
  3630. auto& record = event->Record;
  3631. event->Keys.reserve(record.KeysSize());
  3632. for (const auto& key: record.GetKeys()) {
  3633. event->Keys.emplace_back(key);
  3634. }
  3635. event->Ranges.reserve(record.RangesSize());
  3636. for (const auto& range: record.GetRanges()) {
  3637. event->Ranges.emplace_back(range);
  3638. }
  3639. return base;
  3640. }
  3641. // really ugly hacky, because Record is not mutable and calling members are const
  3642. void TEvDataShard::TEvRead::FillRecord() {
  3643. if (!Keys.empty()) {
  3644. Record.MutableKeys()->Reserve(Keys.size());
  3645. for (auto& key: Keys) {
  3646. Record.AddKeys(key.ReleaseBuffer());
  3647. }
  3648. Keys.clear();
  3649. }
  3650. if (!Ranges.empty()) {
  3651. Record.MutableRanges()->Reserve(Ranges.size());
  3652. for (auto& range: Ranges) {
  3653. auto* pbRange = Record.AddRanges();
  3654. range.Serialize(*pbRange);
  3655. }
  3656. Ranges.clear();
  3657. }
  3658. }
  3659. TString TEvDataShard::TEvReadResult::ToString() const {
  3660. TStringStream ss;
  3661. ss << TBase::ToString();
  3662. if (ArrowBatch) {
  3663. ss << " ArrowRows: " << ArrowBatch->num_rows()
  3664. << " ArrowCols: " << ArrowBatch->num_columns();
  3665. }
  3666. if (!Rows.empty()) {
  3667. ss << " RowsSize: " << Rows.size();
  3668. }
  3669. return ss.Str();
  3670. }
  3671. NActors::IEventBase* TEvDataShard::TEvReadResult::Load(TEventSerializedData* data) {
  3672. auto* base = TBase::Load(data);
  3673. auto* event = static_cast<TEvReadResult*>(base);
  3674. auto& record = event->Record;
  3675. if (record.HasArrowBatch()) {
  3676. const auto& batch = record.GetArrowBatch();
  3677. auto schema = NArrow::DeserializeSchema(batch.GetSchema());
  3678. event->ArrowBatch = NArrow::DeserializeBatch(batch.GetBatch(), schema);
  3679. record.ClearArrowBatch();
  3680. } else if (record.HasCellVec()) {
  3681. auto& batch = *record.MutableCellVec();
  3682. event->RowsSerialized.reserve(batch.RowsSize());
  3683. for (auto& row: *batch.MutableRows()) {
  3684. event->RowsSerialized.emplace_back(std::move(row));
  3685. }
  3686. record.ClearCellVec();
  3687. }
  3688. return base;
  3689. }
  3690. void TEvDataShard::TEvReadResult::FillRecord() {
  3691. if (ArrowBatch) {
  3692. auto* protoBatch = Record.MutableArrowBatch();
  3693. protoBatch->SetSchema(NArrow::SerializeSchema(*ArrowBatch->schema()));
  3694. protoBatch->SetBatch(NArrow::SerializeBatchNoCompression(ArrowBatch));
  3695. ArrowBatch.reset();
  3696. return;
  3697. }
  3698. if (!Batch.empty()) {
  3699. auto* protoBatch = Record.MutableCellVec();
  3700. protoBatch->MutableRows()->Reserve(Batch.Size());
  3701. for (const auto& row: Batch) {
  3702. protoBatch->AddRows(TSerializedCellVec::Serialize(row));
  3703. }
  3704. Batch = {};
  3705. return;
  3706. }
  3707. if (!Rows.empty()) {
  3708. auto* protoBatch = Record.MutableCellVec();
  3709. protoBatch->MutableRows()->Reserve(Rows.size());
  3710. for (const auto& row: Rows) {
  3711. protoBatch->AddRows(TSerializedCellVec::Serialize(row));
  3712. }
  3713. Rows.clear();
  3714. return;
  3715. }
  3716. }
  3717. std::shared_ptr<arrow::RecordBatch> TEvDataShard::TEvReadResult::GetArrowBatch() const {
  3718. return const_cast<TEvDataShard::TEvReadResult*>(this)->GetArrowBatch();
  3719. }
  3720. std::shared_ptr<arrow::RecordBatch> TEvDataShard::TEvReadResult::GetArrowBatch() {
  3721. if (ArrowBatch)
  3722. return ArrowBatch;
  3723. if (Record.GetRowCount() == 0)
  3724. return nullptr;
  3725. ArrowBatch = NArrow::CreateNoColumnsBatch(Record.GetRowCount());
  3726. return ArrowBatch;
  3727. }
  3728. } // NKikimr