Browse Source

YQ-2840 Move pq_async_actor tests to github (#2414)

Dmitry Kardymon 1 year ago
parent
commit
663250905e

+ 352 - 0
ydb/tests/fq/pq_async_io/dq_pq_read_actor_ut.cpp

@@ -0,0 +1,352 @@
+#include "ut_helpers.h"
+
+#include <ydb/library/yql/utils/yql_panic.h>
+
+#include <library/cpp/testing/unittest/registar.h>
+
+#include <thread>
+
+namespace NYql::NDq {
+
+namespace {
+
+// We can't be sure that no extra watermarks were generated (we can't control LB receipt write time).
+// So, we will check only if there is at least one watermark before each specified position.
+template<typename T>
+void AssertDataWithWatermarks(
+    const std::vector<std::variant<T, TInstant>>& actual,
+    const std::vector<T>& expected,
+    const std::vector<ui32>& watermarkBeforePositions)
+{
+    auto expectedPos = 0U;
+    auto watermarksBeforeIter = watermarkBeforePositions.begin();
+
+    for (auto item : actual) {
+        if (std::holds_alternative<TInstant>(item)) {
+            if (watermarksBeforeIter != watermarkBeforePositions.end()) {
+                watermarksBeforeIter++;
+            }
+            continue;
+        } else {
+            UNIT_ASSERT_C(expectedPos < expected.size(), "Too many data items");
+            UNIT_ASSERT_C(
+                watermarksBeforeIter == watermarkBeforePositions.end() ||
+                *watermarksBeforeIter > expectedPos,
+                "Watermark before item on position " << expectedPos << " was expected");
+            UNIT_ASSERT_EQUAL(std::get<T>(item), expected.at(expectedPos));
+            expectedPos++;
+        }
+    }
+}
+
+constexpr auto defaultWatermarkPeriod = TDuration::MilliSeconds(100);
+constexpr auto defaultLateArrivalDelay = TDuration::MilliSeconds(1);
+
+void WaitForNextWatermark(TDuration lateArrivalDelayMs = defaultLateArrivalDelay) {
+    // We can't control write time in LB, so just sleep for watermarkPeriod to ensure the next written data
+    // will obtain write_time which will move watermark forward.
+    Sleep(lateArrivalDelayMs);
+}
+
+}
+
+Y_UNIT_TEST_SUITE(TDqPqReadActorTest) {
+    Y_UNIT_TEST_F(TestReadFromTopic, TPqIoTestFixture) {
+        const TString topicName = "ReadFromTopic";
+        PQCreateStream(topicName);
+        InitSource(topicName);
+
+        const std::vector<TString> data = { "1", "2", "3", "4" };
+        PQWrite(data, topicName);
+
+        auto result = SourceReadDataUntil<TString>(UVParser, 4);
+        AssertDataWithWatermarks(result, data, {});
+    }
+
+    Y_UNIT_TEST_F(TestReadFromTopicFromNow, TPqIoTestFixture) {
+        const TString topicName = "ReadFromTopicFromNow";
+        PQCreateStream(topicName);
+
+        const std::vector<TString> oldData = { "-4", "-3", "-2", "-1", "0" };
+        PQWrite(oldData, topicName);
+
+        InitSource(topicName);
+
+        const std::vector<TString> data = { "1", "2", "3", "4" };
+        PQWrite(data, topicName);
+
+        auto result = SourceReadDataUntil<TString>(UVParser, 4);
+        AssertDataWithWatermarks(result, data, {});
+    }
+
+    Y_UNIT_TEST_F(ReadWithFreeSpace, TPqIoTestFixture) {
+        const TString topicName = "ReadWithFreeSpace";
+        PQCreateStream(topicName);
+        InitSource(topicName);
+
+        PQWrite({"data1", "data2", "data3"}, topicName);
+
+        {
+            auto result = SourceReadDataUntil<TString>(UVParser, 1, 1);
+            std::vector<TString> expected {"data1"};
+            AssertDataWithWatermarks(result, expected, {});
+        }
+
+        UNIT_ASSERT_EQUAL(SourceRead<TString>(UVParser, 0).size(), 0);
+        UNIT_ASSERT_EQUAL(SourceRead<TString>(UVParser, -1).size(), 0);
+    }
+
+    Y_UNIT_TEST_F(ReadNonExistentTopic, TPqIoTestFixture) {
+        const TString topicName = "NonExistentTopic";
+        InitSource(topicName);
+
+        while (true) {
+            try {
+                SourceRead<TString>(UVParser);
+            } catch (yexception& e) {
+                UNIT_ASSERT_STRING_CONTAINS(e.what(), "Read session to topic \"NonExistentTopic\" was closed");
+                break;
+            }
+
+            sleep(1);
+        }
+    }
+
+    Y_UNIT_TEST(TestSaveLoadPqRead) {
+        NDqProto::TSourceState state;
+        const TString topicName = "SaveLoadPqRead";
+        PQCreateStream(topicName);
+
+        {
+            TPqIoTestFixture setup1;
+            setup1.InitSource(topicName);
+
+            std::vector<TString> data {"data"};
+            PQWrite(data, topicName);
+
+            auto result = setup1.SourceReadDataUntil<TString>(UVParser, 1);
+            AssertDataWithWatermarks(result, data, {});
+
+            auto checkpoint = CreateCheckpoint();
+            setup1.SaveSourceState(checkpoint, state);
+            Cerr << "State saved" << Endl;
+        }
+
+        NDqProto::TSourceState state2;
+        {
+            TPqIoTestFixture setup2;
+            setup2.InitSource(topicName);
+
+            std::vector<TString> data {"data"};
+            PQWrite({"data"}, topicName);
+
+            setup2.LoadSource(state);
+            Cerr << "State loaded" << Endl;
+            auto result = setup2.SourceReadDataUntil<TString>(UVParser, 1);
+            AssertDataWithWatermarks(result, data, {});
+
+            auto checkpoint = CreateCheckpoint();
+            setup2.SaveSourceState(checkpoint, state2);
+
+            PQWrite({"futherData"}, topicName);
+        }
+
+        NDqProto::TSourceState state3;
+        {
+            TPqIoTestFixture setup3;
+            setup3.InitSource(topicName);
+            setup3.LoadSource(state2);
+
+            auto result = setup3.SourceReadDataUntil<TString>(UVParser, 1);
+            std::vector<TString> data {"futherData"};
+            AssertDataWithWatermarks(result, data, {});
+
+            // pq session is steel alive
+
+            PQWrite({"yetAnotherData"}, topicName);
+
+            auto checkpoint = CreateCheckpoint();
+            setup3.SaveSourceState(checkpoint, state3);
+        }
+
+        // Load the first state and check it.
+        {
+            TPqIoTestFixture setup4;
+            setup4.InitSource(topicName);
+            setup4.LoadSource(state);
+
+            auto result = setup4.SourceReadUntil<TString>(UVParser, 3);
+            std::vector<TString> data {"data", "futherData", "yetAnotherData"};
+            AssertDataWithWatermarks(result, data, {});
+        }
+
+        // Load graphState2 and check it (offsets were saved).
+        {
+            TPqIoTestFixture setup5;
+            setup5.InitSource(topicName);
+            setup5.LoadSource(state2);
+
+            auto result = setup5.SourceReadDataUntil<TString>(UVParser, 2);
+            std::vector<TString> data {"futherData", "yetAnotherData"};
+            AssertDataWithWatermarks(result, data, {});
+        }
+
+        // Load graphState3 and check it (other offsets).
+        {
+            TPqIoTestFixture setup6;
+            setup6.InitSource(topicName);
+            setup6.LoadSource(state3);
+
+            auto result = setup6.SourceReadDataUntil<TString>(UVParser, 1);
+            std::vector<TString> data {"yetAnotherData"};
+            AssertDataWithWatermarks(result, data, {});
+        }
+    }
+
+    Y_UNIT_TEST(LoadCorruptedState) {
+        NDqProto::TSourceState state;
+        const TString topicName = "Invalid"; // We wouldn't read from this topic.
+        auto checkpoint = CreateCheckpoint();
+
+        {
+            TPqIoTestFixture setup1;
+            setup1.InitSource(topicName);
+            setup1.SaveSourceState(checkpoint, state);
+        }
+
+        // Corrupt state.
+        TString corruptedBlob = state.GetData(0).GetStateData().GetBlob();
+        corruptedBlob.append('a');
+        state.MutableData(0)->MutableStateData()->SetBlob(corruptedBlob);
+
+        {
+            TPqIoTestFixture setup2;
+            setup2.InitSource(topicName);
+            UNIT_ASSERT_EXCEPTION_CONTAINS(setup2.LoadSource(state), yexception, "Serialized state is corrupted");
+        }
+    }
+
+    Y_UNIT_TEST(TestLoadFromSeveralStates) {
+        const TString topicName = "LoadFromSeveralStates";
+        PQCreateStream(topicName);
+
+        NDqProto::TSourceState state2;
+        {
+            TPqIoTestFixture setup;
+            setup.InitSource(topicName);
+
+            std::vector<TString> data {"data"};
+            PQWrite(data, topicName);
+
+            auto result1 = setup.SourceReadDataUntil<TString>(UVParser, 1);
+            AssertDataWithWatermarks(result1, data, {});
+
+            NDqProto::TSourceState state1;
+            auto checkpoint1 = CreateCheckpoint();
+            setup.SaveSourceState(checkpoint1, state1);
+            Cerr << "State saved" << Endl;
+
+            std::vector<TString> data2 {"data2"};
+            PQWrite(data2, topicName);
+
+            auto result2 = setup.SourceReadDataUntil<TString>(UVParser, 1);
+            AssertDataWithWatermarks(result2, data2, {});
+
+            auto checkpoint2 = CreateCheckpoint();
+            setup.SaveSourceState(checkpoint2, state2);
+            Cerr << "State 2 saved" << Endl;
+
+            // Add state1 to state2
+            *state2.AddData() = state1.GetData(0);
+        }
+
+        TPqIoTestFixture setup2;
+        setup2.InitSource(topicName);
+        setup2.LoadSource(state2); // Loads min offset
+
+        std::vector<TString> data3 {"data3"};
+        PQWrite(data3, topicName);
+
+        auto result = setup2.SourceReadUntil<TString>(UVParser, 2);
+        std::vector<TString> dataResult {"data2", "data3"};
+        AssertDataWithWatermarks(result, dataResult, {});
+    }
+
+    Y_UNIT_TEST_F(TestReadFromTopicFirstWatermark, TPqIoTestFixture) {
+        const TString topicName = "ReadFromTopicFirstWatermark";
+        PQCreateStream(topicName);
+
+        auto settings = BuildPqTopicSourceSettings(topicName, defaultWatermarkPeriod);
+        InitSource(std::move(settings));
+
+        const std::vector<TString> data = { "1", "2", "3", "4" };
+        PQWrite(data, topicName);
+
+        auto result = SourceReadDataUntil<TString>(UVParser, 4);
+        AssertDataWithWatermarks(result, { "1", "2", "3", "4" }, {0});
+    }
+
+    Y_UNIT_TEST_F(TestReadFromTopicWatermarks1, TPqIoTestFixture) {
+        const TString topicName = "ReadFromTopicWatermarks1";
+        PQCreateStream(topicName);
+
+        auto settings = BuildPqTopicSourceSettings(topicName, defaultWatermarkPeriod, defaultLateArrivalDelay);
+        InitSource(std::move(settings));
+
+        const std::vector<TString> data1 = { "1", "2" };
+        PQWrite(data1, topicName);
+
+        WaitForNextWatermark();
+        const std::vector<TString> data2 = { "3", "4", "5" };
+        PQWrite(data2, topicName);
+
+        WaitForNextWatermark();
+        const std::vector<TString> data3 = { "6" };
+        PQWrite(data3, topicName);
+
+        auto result = SourceReadDataUntil<TString>(UVParser, 6);
+        AssertDataWithWatermarks(result, {"1", "2", "3", "4", "5", "6"}, {0, 2, 5});
+    }
+
+    Y_UNIT_TEST(WatermarkCheckpointWithItemsInReadyBuffer) {
+        const TString topicName = "WatermarkCheckpointWithItemsInReadyBuffer";
+        PQCreateStream(topicName);
+        NDqProto::TSourceState state;
+
+        {
+            TPqIoTestFixture setup;
+            auto settings = BuildPqTopicSourceSettings(topicName, defaultWatermarkPeriod);
+            setup.InitSource(std::move(settings));
+
+            std::vector<TString> data1 {"1", "2"};
+            PQWrite(data1, topicName);
+
+            auto result = setup.SourceReadDataUntil<TString>(UVParser, 2);
+            AssertDataWithWatermarks(result, data1, {0});
+
+            WaitForNextWatermark();
+            std::vector<TString> data2 {"3", "4"};
+            PQWrite(data2, topicName);
+
+            // read only watermark (1-st batch), items '3', '4' will stay in ready buffer inside Source actor
+            result = setup.SourceReadUntil<TString>(UVParser, 1);
+            AssertDataWithWatermarks(result, {}, {0});
+
+            auto checkpoint = CreateCheckpoint();
+            setup.SaveSourceState(checkpoint, state);
+            Cerr << "State saved" << Endl;
+        }
+
+        {
+            TPqIoTestFixture setup;
+            auto settings = BuildPqTopicSourceSettings(topicName, defaultWatermarkPeriod);
+            setup.InitSource(std::move(settings));
+            setup.LoadSource(state);
+
+            auto result = setup.SourceReadDataUntil<TString>(UVParser, 2);
+            // Since items '3', '4' weren't returned from source actor, they should be reread
+            AssertDataWithWatermarks(result, {"3", "4"}, {});
+        }
+    }
+}
+} // NYql::NDq

+ 143 - 0
ydb/tests/fq/pq_async_io/dq_pq_write_actor_ut.cpp

@@ -0,0 +1,143 @@
+#include "ut_helpers.h"
+
+#include <ydb/library/yql/utils/yql_panic.h>
+
+#include <library/cpp/testing/unittest/registar.h>
+
+namespace NYql::NDq {
+
+constexpr TDuration WaitTimeout = TDuration::MilliSeconds(10000);
+
+Y_UNIT_TEST_SUITE(TPqWriterTest) {
+    Y_UNIT_TEST_F(TestWriteToTopic, TPqIoTestFixture) {
+        const TString topicName = "WriteToTopic";
+        PQCreateStream(topicName);
+        InitAsyncOutput(topicName);
+        const std::vector<TString> data = { "1", "2", "3", "4" };
+
+        AsyncOutputWrite(data);
+        auto result = PQReadUntil(topicName, 4);
+        UNIT_ASSERT_EQUAL(result, data);
+    }
+
+    Y_UNIT_TEST_F(TestWriteToTopicMultiBatch, TPqIoTestFixture) {
+        const TString topicName = "WriteToTopicMultiBatch";
+        PQCreateStream(topicName);
+        InitAsyncOutput(topicName);
+
+        const std::vector<TString> data1 = { "1" };
+        const std::vector<TString> data2 = { "2" };
+        const std::vector<TString> data3 = { "3" };
+
+        AsyncOutputWrite(data1);
+        AsyncOutputWrite(data2);
+        AsyncOutputWrite(data3);
+        auto result = PQReadUntil(topicName, 3);
+
+        std::vector<TString> expected = { "1", "2", "3" };
+        UNIT_ASSERT_EQUAL(result, expected);
+    }
+
+    Y_UNIT_TEST_F(TestDeferredWriteToTopic, TPqIoTestFixture) {
+        // In this case we are checking free space overflow
+        const TString topicName = "DeferredWriteToTopic";
+        PQCreateStream(topicName);
+        InitAsyncOutput(topicName, 1);
+
+        const std::vector<TString> data = { "1", "2", "3" };
+
+        auto future = CaSetup->AsyncOutputPromises.ResumeExecution.GetFuture();
+        AsyncOutputWrite(data);
+        auto result = PQReadUntil(topicName, 3);
+
+        UNIT_ASSERT_EQUAL(result, data);
+        UNIT_ASSERT(future.Wait(WaitTimeout)); // Resume execution should be called
+
+        const std::vector<TString> data2 = { "4", "5", "6" };
+
+        AsyncOutputWrite(data2);
+        auto result2 = PQReadUntil(topicName, 6);
+        const std::vector<TString> expected = { "1", "2", "3", "4", "5", "6" };
+        UNIT_ASSERT_EQUAL(result2, expected);
+    }
+
+    Y_UNIT_TEST_F(WriteNonExistentTopic, TPqIoTestFixture) {
+        const TString topicName = "NonExistentTopic";
+        InitAsyncOutput(topicName);
+
+        const std::vector<TString> data = { "1" };
+        auto future = CaSetup->AsyncOutputPromises.Issue.GetFuture();
+        AsyncOutputWrite(data);
+
+        UNIT_ASSERT(future.Wait(WaitTimeout));
+        UNIT_ASSERT_STRING_CONTAINS(future.GetValue().ToString(), "Write session to topic \"NonExistentTopic\" was closed");
+    }
+
+    Y_UNIT_TEST(TestCheckpoints) {
+        const TString topicName = "Checkpoints";
+        PQCreateStream(topicName);
+
+        NDqProto::TSinkState state1;
+        {
+            TPqIoTestFixture setup;
+            setup.InitAsyncOutput(topicName);
+
+            const std::vector<TString> data1 = { "1" };
+            setup.AsyncOutputWrite(data1);
+
+            const std::vector<TString> data2 = { "2", "3" };
+            auto checkpoint = CreateCheckpoint();
+            auto future = setup.CaSetup->AsyncOutputPromises.StateSaved.GetFuture();
+            setup.AsyncOutputWrite(data2, checkpoint);
+
+            UNIT_ASSERT(future.Wait(WaitTimeout));
+            state1 = future.GetValue();
+        }
+
+        {
+            TPqIoTestFixture setup;
+            setup.InitAsyncOutput(topicName);
+            setup.LoadSink(state1);
+
+            const std::vector<TString> data3 = { "4", "5" };
+            setup.AsyncOutputWrite(data3);
+
+            auto result = PQReadUntil(topicName, 5);
+            const std::vector<TString> expected = { "1", "2", "3", "4", "5" };
+            UNIT_ASSERT_EQUAL(result, expected);
+        }
+
+        {
+            TPqIoTestFixture setup;
+            setup.InitAsyncOutput(topicName);
+            setup.LoadSink(state1);
+
+            const std::vector<TString> data4 = { "4", "5" };
+            setup.AsyncOutputWrite(data4); // This write should be deduplicated
+
+            auto result = PQReadUntil(topicName, 4);
+            const std::vector<TString> expected = { "1", "2", "3", "4", "5" };
+            UNIT_ASSERT_EQUAL(result, expected);
+        }
+    }
+
+    Y_UNIT_TEST_F(TestCheckpointWithEmptyBatch, TPqIoTestFixture) {
+        const TString topicName = "CheckpointsWithEmptyBatch";
+        PQCreateStream(topicName);
+
+        NDqProto::TSinkState state1;
+        {
+            InitAsyncOutput(topicName);
+
+            const std::vector<TString> data = {};
+            auto checkpoint = CreateCheckpoint();
+            auto future = CaSetup->AsyncOutputPromises.StateSaved.GetFuture();
+            AsyncOutputWrite(data, checkpoint);
+
+            UNIT_ASSERT(future.Wait(WaitTimeout));
+            state1 = future.GetValue();
+        }
+    }
+}
+
+} // NYql::NDq

+ 248 - 0
ydb/tests/fq/pq_async_io/ut_helpers.cpp

@@ -0,0 +1,248 @@
+#include "ut_helpers.h"
+
+#include <ydb/library/yql/minikql/mkql_string_util.h>
+
+#include <ydb/core/testlib/basics/appdata.h>
+
+#include <util/system/env.h>
+
+#include <condition_variable>
+#include <thread>
+
+namespace NYql::NDq {
+
+using namespace NActors;
+
+NYql::NPq::NProto::TDqPqTopicSource BuildPqTopicSourceSettings(
+    TString topic,
+    TMaybe<TDuration> watermarksPeriod,
+    TDuration lateArrivalDelay,
+    bool idlePartitionsEnabled)
+{
+    NYql::NPq::NProto::TDqPqTopicSource settings;
+    settings.SetTopicPath(topic);
+    settings.SetConsumerName(DefaultPqConsumer);
+    settings.SetEndpoint(GetDefaultPqEndpoint());
+    settings.MutableToken()->SetName("token");
+    settings.SetDatabase(GetDefaultPqDatabase());
+    if (watermarksPeriod) {
+        settings.MutableWatermarks()->SetEnabled(true);
+        settings.MutableWatermarks()->SetGranularityUs(watermarksPeriod->MicroSeconds());
+    }
+    settings.MutableWatermarks()->SetIdlePartitionsEnabled(idlePartitionsEnabled);
+    settings.MutableWatermarks()->SetLateArrivalDelayUs(lateArrivalDelay.MicroSeconds());
+
+    return settings;
+}
+
+NYql::NPq::NProto::TDqPqTopicSink BuildPqTopicSinkSettings(TString topic) {
+    NYql::NPq::NProto::TDqPqTopicSink settings;
+    settings.SetTopicPath(topic);
+    settings.SetEndpoint(GetDefaultPqEndpoint());
+    settings.SetDatabase(GetDefaultPqDatabase());
+    settings.SetClusterType(NPq::NProto::DataStreams);
+    settings.MutableToken()->SetName("token");
+
+    return settings;
+}
+
+TPqIoTestFixture::TPqIoTestFixture() {
+}
+
+TPqIoTestFixture::~TPqIoTestFixture() {
+    CaSetup = nullptr;
+    Driver.Stop(true);
+}
+
+void TPqIoTestFixture::InitSource(
+    NYql::NPq::NProto::TDqPqTopicSource&& settings,
+    i64 freeSpace)
+{
+    CaSetup->Execute([&](TFakeActor& actor) {
+        NPq::NProto::TDqReadTaskParams params;
+        auto* partitioninigParams = params.MutablePartitioningParams();
+        partitioninigParams->SetTopicPartitionsCount(1);
+        partitioninigParams->SetEachTopicPartitionGroupId(0);
+        partitioninigParams->SetDqPartitionsCount(1);
+
+        TString serializedParams;
+        Y_PROTOBUF_SUPPRESS_NODISCARD params.SerializeToString(&serializedParams);
+
+        const THashMap<TString, TString> secureParams;
+        const THashMap<TString, TString> taskParams { {"pq", serializedParams} };
+
+        auto [dqSource, dqSourceAsActor] = CreateDqPqReadActor(
+            std::move(settings),
+            0,
+            NYql::NDq::TCollectStatsLevel::None,
+            "query_1",
+            0,
+            secureParams,
+            taskParams,
+            Driver,
+            nullptr,
+            actor.SelfId(),
+            actor.GetHolderFactory(),
+            freeSpace);
+
+        actor.InitAsyncInput(dqSource, dqSourceAsActor);
+    });
+}
+
+void TPqIoTestFixture::InitAsyncOutput(
+    NPq::NProto::TDqPqTopicSink&& settings,
+    i64 freeSpace)
+{
+    const THashMap<TString, TString> secureParams;
+
+    CaSetup->Execute([&](TFakeActor& actor) {
+        auto [dqAsyncOutput, dqAsyncOutputAsActor] = CreateDqPqWriteActor(
+            std::move(settings),
+            0,
+            NYql::NDq::TCollectStatsLevel::None,
+            "query_1",
+            secureParams,
+            Driver,
+            nullptr,
+            &actor.GetAsyncOutputCallbacks(),
+            freeSpace);
+
+        actor.InitAsyncOutput(dqAsyncOutput, dqAsyncOutputAsActor);
+    });
+}
+
+TString GetDefaultPqEndpoint() {
+    auto endpoint = GetEnv("YDB_ENDPOINT");
+    UNIT_ASSERT_C(endpoint, "Yds recipe is expected");
+    return endpoint;
+}
+
+TString GetDefaultPqDatabase() {
+    auto database = GetEnv("YDB_DATABASE");
+    UNIT_ASSERT_C(database, "Yds recipe is expected");
+    return database;
+}
+
+extern const TString DefaultPqConsumer = "test_client";
+
+void PQWrite(
+    const std::vector<TString>& sequence,
+    const TString& topic,
+    const TString& endpoint)
+{
+    NYdb::TDriverConfig cfg;
+    cfg.SetEndpoint(endpoint);
+    cfg.SetDatabase(GetDefaultPqDatabase());
+    cfg.SetLog(CreateLogBackend("cerr"));
+    NYdb::TDriver driver(cfg);
+    NYdb::NPersQueue::TPersQueueClient client(driver);
+    NYdb::NPersQueue::TWriteSessionSettings sessionSettings;
+    sessionSettings
+        .Path(topic)
+        .MessageGroupId("src_id")
+        .Codec(NYdb::NPersQueue::ECodec::RAW);
+    auto session = client.CreateSimpleBlockingWriteSession(sessionSettings);
+    for (const TString& data : sequence) {
+        UNIT_ASSERT_C(session->Write(data), "Failed to write message with body \"" << data << "\" to topic " << topic);
+        Cerr << "Message '" << data << "' was written into topic '" << topic << "'" << Endl;
+    }
+    session->Close(); // Wait until all data would be written into PQ.
+    session = nullptr;
+    driver.Stop(true);
+}
+
+std::vector<TString> PQReadUntil(
+    const TString& topic,
+    ui64 size,
+    const TString& endpoint,
+    TDuration timeout)
+{
+    NYdb::TDriverConfig cfg;
+    cfg.SetEndpoint(endpoint);
+    cfg.SetDatabase(GetDefaultPqDatabase());
+    cfg.SetLog(CreateLogBackend("cerr"));
+    NYdb::TDriver driver(cfg);
+    NYdb::NPersQueue::TPersQueueClient client(driver);
+    NYdb::NPersQueue::TReadSessionSettings sessionSettings;
+    sessionSettings
+        .AppendTopics(topic)
+        .ConsumerName(DefaultPqConsumer)
+        .DisableClusterDiscovery(true);
+
+    auto promise = NThreading::NewPromise();
+    std::vector<TString> result;
+
+    sessionSettings.EventHandlers_.SimpleDataHandlers([&](NYdb::NPersQueue::TReadSessionEvent::TDataReceivedEvent& ev) {
+        for (const auto& message : ev.GetMessages()) {
+            result.emplace_back(message.GetData());
+        }
+        if (result.size() >= size) {
+            promise.SetValue();
+        }
+    }, false, false);
+
+    std::shared_ptr<NYdb::NPersQueue::IReadSession> session = client.CreateReadSession(sessionSettings);
+    UNIT_ASSERT(promise.GetFuture().Wait(timeout));
+    session->Close(TDuration::Zero());
+    session = nullptr;
+    driver.Stop(true);
+    return result;
+}
+
+void PQCreateStream(const TString& streamName)
+{
+    NYdb::TDriverConfig cfg;
+    cfg.SetEndpoint(GetDefaultPqEndpoint());
+    cfg.SetDatabase(GetDefaultPqDatabase());
+    cfg.SetLog(CreateLogBackend("cerr"));
+    NYdb::TDriver driver(cfg);
+
+    NYdb::NDataStreams::V1::TDataStreamsClient client = NYdb::NDataStreams::V1::TDataStreamsClient(
+        driver,
+        NYdb::TCommonClientSettings().Database(GetDefaultPqDatabase()));
+    
+    auto result = client.CreateStream(streamName,
+        NYdb::NDataStreams::V1::TCreateStreamSettings().ShardCount(1).RetentionPeriodHours(1)).ExtractValueSync();
+    UNIT_ASSERT_VALUES_EQUAL_C(result.GetStatus(), NYdb::EStatus::SUCCESS, result.GetIssues().ToString());
+    UNIT_ASSERT_VALUES_EQUAL(result.IsTransportError(), false);
+
+    AddReadRule(driver, streamName);
+    driver.Stop(true);
+}
+
+void AddReadRule(NYdb::TDriver& driver, const TString& streamName) {
+    NYdb::NPersQueue::TPersQueueClient client(driver);
+
+    auto result = client.AddReadRule(
+            streamName,
+            NYdb::NPersQueue::TAddReadRuleSettings()
+                .ReadRule(
+                    NYdb::NPersQueue::TReadRuleSettings()
+                        .ConsumerName(DefaultPqConsumer)
+                        .ServiceType("yandex-query")
+                        .SupportedCodecs({
+                            NYdb::NPersQueue::ECodec::RAW
+                        })
+                )
+        ).ExtractValueSync();
+    UNIT_ASSERT_VALUES_EQUAL_C(result.GetStatus(), NYdb::EStatus::SUCCESS, result.GetIssues().ToString());
+    UNIT_ASSERT_VALUES_EQUAL(result.IsTransportError(), false);
+}
+
+std::vector<TString> UVParser(const NUdf::TUnboxedValue& item) {
+    return { TString(item.AsStringRef()) };
+}
+
+void TPqIoTestFixture::AsyncOutputWrite(std::vector<TString> data, TMaybe<NDqProto::TCheckpoint> checkpoint) {
+    CaSetup->AsyncOutputWrite([data](NKikimr::NMiniKQL::THolderFactory& factory) {
+        NKikimr::NMiniKQL::TUnboxedValueBatch batch;
+        for (const auto& item : data) {
+            NUdf::TUnboxedValue* unboxedValueForData = nullptr;
+            batch.emplace_back(factory.CreateDirectArrayHolder(1, unboxedValueForData));
+            unboxedValueForData[0] = NKikimr::NMiniKQL::MakeString(NUdf::TStringRef(item.Data(), item.Size()));
+        }
+
+        return batch;
+    }, checkpoint);
+}
+}

+ 129 - 0
ydb/tests/fq/pq_async_io/ut_helpers.h

@@ -0,0 +1,129 @@
+#pragma once
+
+#include <ydb/library/yql/providers/common/ut_helpers/dq_fake_ca.h>
+#include <ydb/library/yql/providers/pq/async_io/dq_pq_read_actor.h>
+#include <ydb/library/yql/providers/pq/async_io/dq_pq_write_actor.h>
+#include <ydb/library/yql/minikql/computation/mkql_computation_node_holders.h>
+#include <ydb/library/yql/dq/actors/compute/dq_compute_actor_async_io.h>
+#include <ydb/library/yql/dq/actors/protos/dq_events.pb.h>
+#include <ydb/library/yql/minikql/mkql_alloc.h>
+
+#include <ydb/public/sdk/cpp/client/ydb_persqueue_public/persqueue.h>
+#include <ydb/public/sdk/cpp/client/ydb_datastreams/datastreams.h>
+#include <ydb/core/testlib/basics/runtime.h>
+
+#include <library/cpp/testing/unittest/registar.h>
+
+#include <chrono>
+#include <queue>
+
+namespace NYql::NDq {
+
+NYql::NPq::NProto::TDqPqTopicSource BuildPqTopicSourceSettings(
+    TString topic,
+    TMaybe<TDuration> watermarksPeriod = Nothing(),
+    TDuration lateArrivalDelay = TDuration::Seconds(2),
+    bool idlePartitionsEnabled = false);
+
+NYql::NPq::NProto::TDqPqTopicSink BuildPqTopicSinkSettings(TString topic);
+
+TString GetDefaultPqEndpoint();
+TString GetDefaultPqDatabase();
+
+struct TPqIoTestFixture : public NUnitTest::TBaseFixture {
+    std::unique_ptr<TFakeCASetup> CaSetup = std::make_unique<TFakeCASetup>();
+    NYdb::TDriver Driver = NYdb::TDriver(NYdb::TDriverConfig().SetLog(CreateLogBackend("cerr")));
+
+    TPqIoTestFixture();
+    ~TPqIoTestFixture();
+
+    void InitSource(
+        NYql::NPq::NProto::TDqPqTopicSource&& settings,
+        i64 freeSpace = 1_MB);
+
+    void InitSource(
+        const TString& topic,
+        i64 freeSpace = 1_MB)
+    {
+        InitSource(BuildPqTopicSourceSettings(topic), freeSpace);
+    }
+
+    template<typename T>
+    std::vector<std::variant<T, TInstant>> SourceRead(const TReadValueParser<T> parser, i64 freeSpace = 12345) {
+        NThreading::TFuture<void> nextDataFuture;
+        return CaSetup->AsyncInputRead(parser, nextDataFuture, freeSpace);
+    }
+
+    template<typename T>
+    std::vector<std::variant<T, TInstant>> SourceReadUntil(
+        const TReadValueParser<T> parser,
+        ui64 size,
+        i64 eachReadFreeSpace = 1000,
+        TDuration timeout = TDuration::Seconds(30))
+    {
+        return CaSetup->AsyncInputReadUntil(parser, size, eachReadFreeSpace, timeout, false);
+    }
+
+    template<typename T>
+    std::vector<std::variant<T, TInstant>> SourceReadDataUntil(
+        const TReadValueParser<T> parser,
+        ui64 size,
+        i64 eachReadFreeSpace = 1000)
+    {
+        return CaSetup->AsyncInputReadUntil(parser, size, eachReadFreeSpace, TDuration::Seconds(30), true);
+    }
+
+
+    void SaveSourceState(NDqProto::TCheckpoint checkpoint, NDqProto::TSourceState& state) {
+        CaSetup->SaveSourceState(checkpoint, state);
+    }
+
+    void LoadSource(const NDqProto::TSourceState& state) {
+        return CaSetup->LoadSource(state);
+    }
+
+
+    void InitAsyncOutput(
+        NYql::NPq::NProto::TDqPqTopicSink&& settings,
+        i64 freeSpace = 1_MB);
+
+    void InitAsyncOutput(
+        const TString& topic,
+        i64 freeSpace = 1_MB)
+    {
+        InitAsyncOutput(BuildPqTopicSinkSettings(topic), freeSpace);
+    }
+
+    void LoadSink(const NDqProto::TSinkState& state) {
+        CaSetup->LoadSink(state);
+    }
+
+    void AsyncOutputWrite(std::vector<TString> data, TMaybe<NDqProto::TCheckpoint> checkpoint = Nothing());
+};
+
+extern const TString DefaultPqConsumer;
+extern const TString DefaultPqCluster;
+
+// Write using YDB driver
+void PQWrite(
+    const std::vector<TString>& sequence,
+    const TString& topic,
+    const TString& endpoint = GetDefaultPqEndpoint());
+
+// Read using YDB driver
+std::vector<TString> PQReadUntil(
+    const TString& topic,
+    ui64 size,
+    const TString& endpoint = GetDefaultPqEndpoint(),
+    TDuration timeout = TDuration::MilliSeconds(10000));
+
+void PQCreateStream(
+    const TString& streamName);
+
+void AddReadRule(
+    NYdb::TDriver& driver,
+    const TString& streamName);
+
+std::vector<TString> UVParser(const NUdf::TUnboxedValue& item);
+
+}

+ 27 - 0
ydb/tests/fq/pq_async_io/ya.make

@@ -0,0 +1,27 @@
+UNITTEST_FOR(ydb/library/yql/providers/pq/async_io)
+
+SIZE(MEDIUM)
+
+INCLUDE(${ARCADIA_ROOT}/ydb/tests/tools/fq_runner/ydb_runner_with_datastreams.inc)
+
+SRCS(
+    dq_pq_read_actor_ut.cpp
+    dq_pq_write_actor_ut.cpp
+    ut_helpers.cpp
+)
+
+PEERDIR(
+    ydb/core/testlib/basics/default
+    ydb/library/yql/minikql/computation/llvm14
+    ydb/library/yql/public/udf/service/exception_policy
+    ydb/library/yql/providers/common/comp_nodes
+    ydb/library/yql/providers/common/ut_helpers
+    ydb/library/yql/sql
+    ydb/public/sdk/cpp/client/ydb_datastreams
+    ydb/public/sdk/cpp/client/ydb_persqueue_public
+    ydb/library/yql/minikql/comp_nodes/llvm14
+)
+
+YQL_LAST_ABI_VERSION()
+
+END()

+ 1 - 0
ydb/tests/fq/ya.make

@@ -5,6 +5,7 @@ RECURSE_FOR_TESTS(
     mem_alloc
     multi_plane
     plans
+    pq_async_io
     restarts
     s3
     yds