|
@@ -2139,7 +2139,8 @@ Y_UNIT_TEST_SUITE(TClientTest) {
|
|
|
TPortManager tp;
|
|
|
ui16 port = tp.GetPort(2134);
|
|
|
|
|
|
- const auto settings = TServerSettings(port);
|
|
|
+ const auto settings = TServerSettings(port)
|
|
|
+ .SetUseRealThreads(false);
|
|
|
TServer server(settings);
|
|
|
TClient client(settings);
|
|
|
SetupLogging(server);
|
|
@@ -2153,32 +2154,129 @@ Y_UNIT_TEST_SUITE(TClientTest) {
|
|
|
const TActorId edge = runtime.AllocateEdgeActor();
|
|
|
|
|
|
{
|
|
|
+ ui64 confirmationsCount = 0;
|
|
|
+ auto observeConfirmations = [&](TAutoPtr<IEventHandle>& ev) {
|
|
|
+ switch (ev->GetTypeRewrite()) {
|
|
|
+ case TEvBlobStorage::TEvPut::EventType: {
|
|
|
+ const auto* msg = ev->Get<TEvBlobStorage::TEvPut>();
|
|
|
+ // step 1 is snapshot
|
|
|
+ // step 2 is schema alter
|
|
|
+ // step 3 is expected write below
|
|
|
+ if (msg->Id.TabletID() == tabletId &&
|
|
|
+ msg->Id.Channel() == 0 &&
|
|
|
+ msg->Id.Cookie() == 1 &&
|
|
|
+ msg->Id.Step() > 2)
|
|
|
+ {
|
|
|
+ ++confirmationsCount;
|
|
|
+ }
|
|
|
+ break;
|
|
|
+ }
|
|
|
+ }
|
|
|
+ return TTestActorRuntime::EEventAction::PROCESS;
|
|
|
+ };
|
|
|
+ runtime.SetObserverFunc(observeConfirmations);
|
|
|
+
|
|
|
const TActorId leaderTablet = runtime.Register(CreateTablet(edge, tabletInfo.Get(), setupInfo.Get(), 0, nullptr, nullptr));
|
|
|
const TActorId leaderId = runtime.GrabEdgeEvent<TEvTablet::TEvRestored>(edge)->Get()->UserTabletActor;
|
|
|
- Y_UNUSED(leaderId);
|
|
|
+
|
|
|
+ // we use it to kill leader only when it has sent the write to the follower and it is confirmed
|
|
|
+ const TActorId followerTablet = runtime.Register(CreateTabletFollower(edge, tabletInfo.Get(), setupInfo.Get(), 1, nullptr, nullptr));
|
|
|
+
|
|
|
+ auto doLeaderWrite = [&](ui64 key, ui64 value) {
|
|
|
+ const char *writeQuery = R"__((
|
|
|
+ (let row_ '('('key (Uint64 '%lu))))
|
|
|
+ (let update_ '('('v_ui64 (Uint64 '%lu))))
|
|
|
+ (let result_ (UpdateRow 't_by_ui64 row_ update_))
|
|
|
+ (return (AsList result_))
|
|
|
+ ))__";
|
|
|
+
|
|
|
+ THolder<TEvTablet::TEvLocalMKQL> reqWrite = MakeHolder<TEvTablet::TEvLocalMKQL>();
|
|
|
+ reqWrite->Record.MutableProgram()->MutableProgram()->SetText(Sprintf(writeQuery, key, value));
|
|
|
+ runtime.Send(new IEventHandle(leaderId, edge, reqWrite.Release()));
|
|
|
+
|
|
|
+ auto reply = runtime.GrabEdgeEvent<TEvTablet::TEvLocalMKQLResponse>(edge);
|
|
|
+ UNIT_ASSERT_VALUES_EQUAL(reply->Get()->Record.GetStatus(), 0);
|
|
|
+ };
|
|
|
+
|
|
|
+ doLeaderWrite(42, 51);
|
|
|
+
|
|
|
+ auto waitFor = [&](const auto& condition, const TString& description) {
|
|
|
+ if (!condition()) {
|
|
|
+ Cerr << "... waiting for " << description << Endl;
|
|
|
+ TDispatchOptions options;
|
|
|
+ options.CustomFinalCondition = [&]() {
|
|
|
+ return condition();
|
|
|
+ };
|
|
|
+ runtime.DispatchEvents(options);
|
|
|
+ UNIT_ASSERT_C(condition(), "... failed to wait for " << description);
|
|
|
+ }
|
|
|
+ };
|
|
|
+
|
|
|
+ waitFor([&](){ return confirmationsCount > 0; }, "Write confirmed");
|
|
|
|
|
|
runtime.Send(new IEventHandle(leaderTablet, edge, new TEvents::TEvPoisonPill()));
|
|
|
auto reply = runtime.GrabEdgeEvent<TEvTablet::TEvTabletDead>(edge);
|
|
|
UNIT_ASSERT_VALUES_EQUAL(reply->Get()->TabletID, tabletId);
|
|
|
+
|
|
|
+ runtime.Send(new IEventHandle(followerTablet, edge, new TEvents::TEvPoisonPill()));
|
|
|
+ reply = runtime.GrabEdgeEvent<TEvTablet::TEvTabletDead>(edge);
|
|
|
+ UNIT_ASSERT_VALUES_EQUAL(reply->Get()->TabletID, tabletId);
|
|
|
}
|
|
|
|
|
|
- const TActorId followerTablet = runtime.Register(CreateTabletFollower(edge, tabletInfo.Get(), setupInfo.Get(), 1, nullptr, nullptr));
|
|
|
- Y_UNUSED(followerTablet);
|
|
|
+ // now we start follower without its leader
|
|
|
|
|
|
- const TActorId followerId = runtime.GrabEdgeEvent<TEvTablet::TEvRestored>(edge)->Get()->UserTabletActor;
|
|
|
- Y_UNUSED(followerId);
|
|
|
+ const TActorId followerEdge = runtime.AllocateEdgeActor();
|
|
|
+ const TActorId followerTablet = runtime.Register(CreateTabletFollower(followerEdge, tabletInfo.Get(), setupInfo.Get(), 1, nullptr, nullptr));
|
|
|
+ Y_UNUSED(followerTablet);
|
|
|
+ const TActorId followerId = runtime.GrabEdgeEvent<TEvTablet::TEvRestored>(followerEdge)->Get()->UserTabletActor;
|
|
|
|
|
|
{
|
|
|
NTabletPipe::TClientConfig pipeClientConfig;
|
|
|
pipeClientConfig.AllowFollower = true;
|
|
|
pipeClientConfig.ForceFollower = true;
|
|
|
pipeClientConfig.RetryPolicy = {.RetryLimitCount = 2};
|
|
|
- runtime.Register(NTabletPipe::CreateClient(edge, tabletId, pipeClientConfig));
|
|
|
+ runtime.Register(NTabletPipe::CreateClient(followerEdge, tabletId, pipeClientConfig));
|
|
|
|
|
|
- auto reply = runtime.GrabEdgeEvent<TEvTabletPipe::TEvClientConnected>(edge);
|
|
|
+ auto reply = runtime.GrabEdgeEvent<TEvTabletPipe::TEvClientConnected>(followerEdge);
|
|
|
|
|
|
UNIT_ASSERT_VALUES_EQUAL(reply->Get()->Status, NKikimrProto::OK);
|
|
|
}
|
|
|
+
|
|
|
+ auto doFollowerRead = [&](ui64 key) -> TMaybe<ui64> {
|
|
|
+ const char *readQuery = R"__((
|
|
|
+ (let row_ '('('key (Uint64 '%lu))))
|
|
|
+ (let select_ '('v_ui64))
|
|
|
+ (let pgmReturn (AsList
|
|
|
+ (SetResult 'res (SelectRow 't_by_ui64 row_ select_))
|
|
|
+ ))
|
|
|
+ (return pgmReturn)
|
|
|
+ ))__";
|
|
|
+
|
|
|
+ THolder<TEvTablet::TEvLocalMKQL> reqRead = MakeHolder<TEvTablet::TEvLocalMKQL>();
|
|
|
+ reqRead->Record.MutableProgram()->MutableProgram()->SetText(Sprintf(readQuery, key));
|
|
|
+ runtime.Send(new IEventHandle(followerId, followerEdge, reqRead.Release()));
|
|
|
+
|
|
|
+ auto reply = runtime.GrabEdgeEvent<TEvTablet::TEvLocalMKQLResponse>(followerEdge);
|
|
|
+ UNIT_ASSERT_VALUES_EQUAL(reply->Get()->Record.GetStatus(), 0);
|
|
|
+ const auto res = reply->Get()->Record
|
|
|
+ .GetExecutionEngineEvaluatedResponse()
|
|
|
+ .GetValue()
|
|
|
+ .GetStruct(0)
|
|
|
+ .GetOptional();
|
|
|
+ if (!res.HasOptional()) {
|
|
|
+ return Nothing();
|
|
|
+ }
|
|
|
+
|
|
|
+ return res
|
|
|
+ .GetOptional()
|
|
|
+ .GetStruct(0)
|
|
|
+ .GetOptional()
|
|
|
+ .GetUint64();
|
|
|
+ };
|
|
|
+
|
|
|
+ // Perform basic sanity checks
|
|
|
+ UNIT_ASSERT_VALUES_EQUAL(doFollowerRead(41), Nothing());
|
|
|
+ UNIT_ASSERT_VALUES_EQUAL(doFollowerRead(42), 51u);
|
|
|
}
|
|
|
|
|
|
Y_UNIT_TEST(FollowerOfflineBoot) {
|