|
@@ -6,8 +6,37 @@
|
|
|
using namespace NSchemeShardUT_Private;
|
|
|
|
|
|
Y_UNIT_TEST_SUITE(TCdcStreamWithRebootsTests) {
|
|
|
+ class TTestWithTabletReboots: public TTestWithReboots {
|
|
|
+ public:
|
|
|
+ void Run(std::function<void(TTestActorRuntime& runtime, bool& activeZone)> testScenario) {
|
|
|
+ TDatashardLogBatchingSwitch logBatchingSwitch(false /* without batching */);
|
|
|
+ RunWithTabletReboots(testScenario);
|
|
|
+ }
|
|
|
+ };
|
|
|
+
|
|
|
+ class TTestWithPipeResets: public TTestWithReboots {
|
|
|
+ public:
|
|
|
+ void Run(std::function<void(TTestActorRuntime& runtime, bool& activeZone)> testScenario) {
|
|
|
+ TDatashardLogBatchingSwitch logBatchingSwitch(false /* without batching */);
|
|
|
+ RunWithPipeResets(testScenario);
|
|
|
+ }
|
|
|
+ };
|
|
|
+
|
|
|
+ #define Y_UNIT_TEST_WITH_REBOOTS(N) \
|
|
|
+ template <typename T> void N(NUnitTest::TTestContext&); \
|
|
|
+ struct TTestRegistration##N { \
|
|
|
+ TTestRegistration##N() { \
|
|
|
+ TCurrentTest::AddTest(#N "[TabletReboots]", static_cast<void (*)(NUnitTest::TTestContext&)>(&N<TTestWithTabletReboots>), false); \
|
|
|
+ TCurrentTest::AddTest(#N "[PipeResets]", static_cast<void (*)(NUnitTest::TTestContext&)>(&N<TTestWithPipeResets>), false); \
|
|
|
+ } \
|
|
|
+ }; \
|
|
|
+ static TTestRegistration##N testRegistration##N; \
|
|
|
+ template <typename T> \
|
|
|
+ void N(NUnitTest::TTestContext&)
|
|
|
+
|
|
|
+ template <typename T>
|
|
|
void CreateStream(const TMaybe<NKikimrSchemeOp::ECdcStreamState>& state = Nothing(), bool vt = false) {
|
|
|
- TTestWithReboots t;
|
|
|
+ T t;
|
|
|
t.GetTestEnvOptions().EnableChangefeedInitialScan(true);
|
|
|
|
|
|
t.Run([&](TTestActorRuntime& runtime, bool& activeZone) {
|
|
@@ -51,24 +80,24 @@ Y_UNIT_TEST_SUITE(TCdcStreamWithRebootsTests) {
|
|
|
});
|
|
|
}
|
|
|
|
|
|
- Y_UNIT_TEST(CreateStream) {
|
|
|
- CreateStream();
|
|
|
+ Y_UNIT_TEST_WITH_REBOOTS(CreateStream) {
|
|
|
+ CreateStream<T>();
|
|
|
}
|
|
|
|
|
|
- Y_UNIT_TEST(CreateStreamExplicitReady) {
|
|
|
- CreateStream(NKikimrSchemeOp::ECdcStreamStateReady);
|
|
|
+ Y_UNIT_TEST_WITH_REBOOTS(CreateStreamExplicitReady) {
|
|
|
+ CreateStream<T>(NKikimrSchemeOp::ECdcStreamStateReady);
|
|
|
}
|
|
|
|
|
|
- Y_UNIT_TEST(CreateStreamWithInitialScan) {
|
|
|
- CreateStream(NKikimrSchemeOp::ECdcStreamStateScan);
|
|
|
+ Y_UNIT_TEST_WITH_REBOOTS(CreateStreamWithInitialScan) {
|
|
|
+ CreateStream<T>(NKikimrSchemeOp::ECdcStreamStateScan);
|
|
|
}
|
|
|
|
|
|
- Y_UNIT_TEST(CreateStreamWithVirtualTimestamps) {
|
|
|
- CreateStream({}, true);
|
|
|
+ Y_UNIT_TEST_WITH_REBOOTS(CreateStreamWithVirtualTimestamps) {
|
|
|
+ CreateStream<T>({}, true);
|
|
|
}
|
|
|
|
|
|
- Y_UNIT_TEST(CreateStreamWithAwsRegion) {
|
|
|
- TTestWithReboots t;
|
|
|
+ Y_UNIT_TEST_WITH_REBOOTS(CreateStreamWithAwsRegion) {
|
|
|
+ T t;
|
|
|
t.GetTestEnvOptions().EnableChangefeedDynamoDBStreamsFormat(true);
|
|
|
|
|
|
t.Run([&](TTestActorRuntime& runtime, bool& activeZone) {
|
|
@@ -106,8 +135,8 @@ Y_UNIT_TEST_SUITE(TCdcStreamWithRebootsTests) {
|
|
|
});
|
|
|
}
|
|
|
|
|
|
- Y_UNIT_TEST(CreateStreamWithResolvedTimestamps) {
|
|
|
- TTestWithReboots t;
|
|
|
+ Y_UNIT_TEST_WITH_REBOOTS(CreateStreamWithResolvedTimestamps) {
|
|
|
+ T t;
|
|
|
t.Run([&](TTestActorRuntime& runtime, bool& activeZone) {
|
|
|
{
|
|
|
TInactiveZone inactive(activeZone);
|
|
@@ -140,8 +169,8 @@ Y_UNIT_TEST_SUITE(TCdcStreamWithRebootsTests) {
|
|
|
});
|
|
|
}
|
|
|
|
|
|
- Y_UNIT_TEST(DisableStream) {
|
|
|
- TTestWithReboots t;
|
|
|
+ Y_UNIT_TEST_WITH_REBOOTS(DisableStream) {
|
|
|
+ T t;
|
|
|
t.Run([&](TTestActorRuntime& runtime, bool& activeZone) {
|
|
|
{
|
|
|
TInactiveZone inactive(activeZone);
|
|
@@ -191,8 +220,8 @@ Y_UNIT_TEST_SUITE(TCdcStreamWithRebootsTests) {
|
|
|
});
|
|
|
}
|
|
|
|
|
|
- Y_UNIT_TEST(GetReadyStream) {
|
|
|
- TTestWithReboots t;
|
|
|
+ Y_UNIT_TEST_WITH_REBOOTS(GetReadyStream) {
|
|
|
+ T t;
|
|
|
t.GetTestEnvOptions().EnableChangefeedInitialScan(true);
|
|
|
|
|
|
t.Run([&](TTestActorRuntime& runtime, bool& activeZone) {
|
|
@@ -252,8 +281,9 @@ Y_UNIT_TEST_SUITE(TCdcStreamWithRebootsTests) {
|
|
|
});
|
|
|
}
|
|
|
|
|
|
+ template <typename T>
|
|
|
void DropStream(const TMaybe<NKikimrSchemeOp::ECdcStreamState>& state = Nothing()) {
|
|
|
- TTestWithReboots t;
|
|
|
+ T t;
|
|
|
t.GetTestEnvOptions().EnableChangefeedInitialScan(true);
|
|
|
|
|
|
t.Run([&](TTestActorRuntime& runtime, bool& activeZone) {
|
|
@@ -299,20 +329,20 @@ Y_UNIT_TEST_SUITE(TCdcStreamWithRebootsTests) {
|
|
|
});
|
|
|
}
|
|
|
|
|
|
- Y_UNIT_TEST(DropStream) {
|
|
|
- DropStream();
|
|
|
+ Y_UNIT_TEST_WITH_REBOOTS(DropStream) {
|
|
|
+ DropStream<T>();
|
|
|
}
|
|
|
|
|
|
- Y_UNIT_TEST(DropStreamExplicitReady) {
|
|
|
- DropStream(NKikimrSchemeOp::ECdcStreamStateReady);
|
|
|
+ Y_UNIT_TEST_WITH_REBOOTS(DropStreamExplicitReady) {
|
|
|
+ DropStream<T>(NKikimrSchemeOp::ECdcStreamStateReady);
|
|
|
}
|
|
|
|
|
|
- Y_UNIT_TEST(DropStreamCreatedWithInitialScan) {
|
|
|
- DropStream(NKikimrSchemeOp::ECdcStreamStateScan);
|
|
|
+ Y_UNIT_TEST_WITH_REBOOTS(DropStreamCreatedWithInitialScan) {
|
|
|
+ DropStream<T>(NKikimrSchemeOp::ECdcStreamStateScan);
|
|
|
}
|
|
|
|
|
|
- Y_UNIT_TEST(CreateDropRecreate) {
|
|
|
- TTestWithReboots t;
|
|
|
+ Y_UNIT_TEST_WITH_REBOOTS(CreateDropRecreate) {
|
|
|
+ T t;
|
|
|
t.Run([&](TTestActorRuntime& runtime, bool& activeZone) {
|
|
|
runtime.SetLogPriority(NKikimrServices::FLAT_TX_SCHEMESHARD, NActors::NLog::PRI_TRACE);
|
|
|
|
|
@@ -378,8 +408,8 @@ Y_UNIT_TEST_SUITE(TCdcStreamWithRebootsTests) {
|
|
|
});
|
|
|
}
|
|
|
|
|
|
- Y_UNIT_TEST(Attributes) {
|
|
|
- TTestWithReboots t;
|
|
|
+ Y_UNIT_TEST_WITH_REBOOTS(Attributes) {
|
|
|
+ T t;
|
|
|
t.Run([&](TTestActorRuntime& runtime, bool& activeZone) {
|
|
|
{
|
|
|
TInactiveZone inactive(activeZone);
|
|
@@ -419,8 +449,8 @@ Y_UNIT_TEST_SUITE(TCdcStreamWithRebootsTests) {
|
|
|
});
|
|
|
}
|
|
|
|
|
|
- Y_UNIT_TEST(RacySplitAndDropTable) {
|
|
|
- TTestWithReboots t;
|
|
|
+ Y_UNIT_TEST_WITH_REBOOTS(RacySplitAndDropTable) {
|
|
|
+ T t;
|
|
|
t.Run([&](TTestActorRuntime& runtime, bool& activeZone) {
|
|
|
{
|
|
|
TInactiveZone inactive(activeZone);
|
|
@@ -464,8 +494,8 @@ Y_UNIT_TEST_SUITE(TCdcStreamWithRebootsTests) {
|
|
|
});
|
|
|
}
|
|
|
|
|
|
- Y_UNIT_TEST(InitialScan) {
|
|
|
- TTestWithReboots t;
|
|
|
+ Y_UNIT_TEST_WITH_REBOOTS(InitialScan) {
|
|
|
+ T t;
|
|
|
t.GetTestEnvOptions().EnableChangefeedInitialScan(true);
|
|
|
|
|
|
t.Run([&](TTestActorRuntime& runtime, bool& activeZone) {
|
|
@@ -572,8 +602,8 @@ Y_UNIT_TEST_SUITE(TCdcStreamWithRebootsTests) {
|
|
|
}
|
|
|
}
|
|
|
|
|
|
- Y_UNIT_TEST(SplitTable) {
|
|
|
- TTestWithReboots t;
|
|
|
+ Y_UNIT_TEST_WITH_REBOOTS(SplitTable) {
|
|
|
+ T t;
|
|
|
t.Run([&](TTestActorRuntime& runtime, bool& activeZone) {
|
|
|
{
|
|
|
TInactiveZone inactive(activeZone);
|
|
@@ -615,8 +645,8 @@ Y_UNIT_TEST_SUITE(TCdcStreamWithRebootsTests) {
|
|
|
});
|
|
|
}
|
|
|
|
|
|
- Y_UNIT_TEST(MergeTable) {
|
|
|
- TTestWithReboots t;
|
|
|
+ Y_UNIT_TEST_WITH_REBOOTS(MergeTable) {
|
|
|
+ T t;
|
|
|
t.Run([&](TTestActorRuntime& runtime, bool& activeZone) {
|
|
|
{
|
|
|
TInactiveZone inactive(activeZone);
|
|
@@ -659,8 +689,8 @@ Y_UNIT_TEST_SUITE(TCdcStreamWithRebootsTests) {
|
|
|
});
|
|
|
}
|
|
|
|
|
|
- Y_UNIT_TEST(RacySplitTableAndCreateStream) {
|
|
|
- TTestWithReboots t;
|
|
|
+ Y_UNIT_TEST_WITH_REBOOTS(RacySplitTableAndCreateStream) {
|
|
|
+ T t;
|
|
|
t.Run([&](TTestActorRuntime& runtime, bool& activeZone) {
|
|
|
{
|
|
|
TInactiveZone inactive(activeZone);
|