Browse Source

Add local kmeans actor scan (#8756)

Valery Mironov 6 months ago
parent
commit
07875100df

+ 6 - 4
ydb/core/protos/out/out.cpp

@@ -1,5 +1,3 @@
-#include <ydb/public/api/protos/ydb_table.pb.h>
-
 #include <ydb/core/protos/blobstorage.pb.h>
 #include <ydb/core/protos/blobstorage_vdisk_internal.pb.h>
 #include <ydb/core/protos/blobstorage_vdisk_config.pb.h>
@@ -254,6 +252,10 @@ Y_DECLARE_OUT_SPEC(, NKikimrStat::TEvStatisticsResponse::EStatus, stream, value)
     stream << NKikimrStat::TEvStatisticsResponse::EStatus_Name(value);
 }
 
-Y_DECLARE_OUT_SPEC(, Ydb::Table::IndexBuildState_State, stream, value) {
-    stream << IndexBuildState_State_Name(value);
+Y_DECLARE_OUT_SPEC(, NKikimrIndexBuilder::EBuildStatus, stream, value) {
+    stream << NKikimrIndexBuilder::EBuildStatus_Name(value);
+}
+
+Y_DECLARE_OUT_SPEC(, NKikimrTxDataShard::TEvLocalKMeansRequest_EState, stream, value) {
+    stream << NKikimrTxDataShard::TEvLocalKMeansRequest_EState_Name(value);
 }

+ 67 - 0
ydb/core/protos/tx_datashard.proto

@@ -16,6 +16,7 @@ import "ydb/core/protos/subdomains.proto";
 import "ydb/core/protos/query_stats.proto";
 import "ydb/public/api/protos/ydb_issue_message.proto";
 import "ydb/public/api/protos/ydb_status_codes.proto";
+import "ydb/public/api/protos/ydb_table.proto";
 import "ydb/library/yql/dq/actors/protos/dq_events.proto";
 import "ydb/library/yql/dq/actors/protos/dq_stats.proto";
 import "ydb/library/yql/dq/proto/dq_tasks.proto";
@@ -1485,6 +1486,72 @@ message TEvSampleKResponse {
     repeated bytes Rows = 11;
 }
 
+message TEvLocalKMeansRequest {
+    optional uint64 Id = 1;
+
+    optional uint64 TabletId = 2;
+    optional NKikimrProto.TPathID PathId = 3;
+
+    optional uint64 SnapshotTxId = 4;
+    optional uint64 SnapshotStep = 5;
+
+    optional uint64 SeqNoGeneration = 6;
+    optional uint64 SeqNoRound = 7;
+
+    optional Ydb.Table.VectorIndexSettings Settings = 8;
+
+    optional uint64 Seed = 9;
+    optional uint32 K = 10;
+
+    enum EState {
+        UNSPECIFIED = 0;
+        SAMPLE = 1;
+        KMEANS = 2;
+        UPLOAD_MAIN_TO_TMP = 3;
+        UPLOAD_MAIN_TO_POSTING = 4;
+        UPLOAD_TMP_TO_TMP = 5;
+        UPLOAD_TMP_TO_POSTING = 6;
+        DONE = 7;
+    };
+    optional EState Upload = 11;
+    // State != DONE
+    optional EState State = 12;
+    // State != KMEANS || DoneRounds < NeedsRounds
+    optional uint32 DoneRounds = 13;
+    optional uint32 NeedsRounds = 14;
+
+    // id of parent cluster
+    optional uint32 Parent = 15;
+    // [Child ... Child + K] ids reserved for our clusters
+    optional uint32 Child = 16;
+
+    optional string LevelName = 17;
+    optional string PostingName = 18;
+
+    optional string EmbeddingColumn = 19;
+    repeated string DataColumns = 20;
+}
+
+message TEvLocalKMeansProgressResponse {
+    optional uint64 Id = 1;
+
+    optional uint64 TabletId = 2;
+    optional NKikimrProto.TPathID PathId = 3;
+
+    optional uint64 RequestSeqNoGeneration = 4;
+    optional uint64 RequestSeqNoRound = 5;
+
+    optional NKikimrIndexBuilder.EBuildStatus Status = 6;
+    repeated Ydb.Issue.IssueMessage Issues = 7;
+
+    // TODO(mbkkt) implement slow-path (reliable-path)
+    // optional uint64 RowsDelta = 8;
+    // optional uint64 BytesDelta = 9;
+
+    // optional TEvLocalKMeansRequest.EState State = 10;
+    // optional uint32 DoneRounds = 11;
+}
+
 message TEvCdcStreamScanRequest {
     message TLimits {
         optional uint32 BatchMaxBytes = 1 [default = 512000];

+ 12 - 3
ydb/core/tablet_flat/flat_scan_lead.h

@@ -9,9 +9,14 @@ namespace NTable {
 
     struct TLead {
         void To(TTagsRef tags, TArrayRef<const TCell> key, ESeek seek)
+        {
+            To(key, seek);
+            SetTags(tags);
+        }
+
+        void To(TArrayRef<const TCell> key, ESeek seek)
         {
             Valid = true;
-            Tags.assign(tags.begin(), tags.end());
             Relation = seek;
             Key = TSerializedCellVec(key);
             StopKey = { };
@@ -24,6 +29,10 @@ namespace NTable {
             StopKeyInclusive = inclusive;
         }
 
+        void SetTags(TTagsRef tags) {
+            Tags.assign(tags.begin(), tags.end());
+        }
+
         explicit operator bool() const noexcept
         {
             return Valid;
@@ -34,12 +43,12 @@ namespace NTable {
             Valid = false;
         }
 
-        bool Valid = false;
         ESeek Relation = ESeek::Exact;
+        bool Valid = false;
+        bool StopKeyInclusive = true;
         TVector<ui32> Tags;
         TSerializedCellVec Key;
         TSerializedCellVec StopKey;
-        bool StopKeyInclusive = true;
     };
 
 }

+ 72 - 0
ydb/core/tx/datashard/buffer_data.h

@@ -0,0 +1,72 @@
+#include "ydb/core/scheme/scheme_tablecell.h"
+#include "ydb/core/tx/datashard/upload_stats.h"
+#include "ydb/core/tx/tx_proxy/upload_rows.h"
+
+namespace NKikimr::NDataShard {
+
+class TBufferData: public IStatHolder, public TNonCopyable {
+public:
+    TBufferData()
+        : Rows{std::make_shared<NTxProxy::TUploadRows>()} {
+    }
+
+    ui64 GetRows() const override final {
+        return Rows->size();
+    }
+
+    auto GetRowsData() const {
+        return Rows;
+    }
+
+    ui64 GetBytes() const override final {
+        return ByteSize;
+    }
+
+    void FlushTo(TBufferData& other) {
+        Y_ABORT_UNLESS(this != &other);
+        Y_ABORT_UNLESS(other.IsEmpty());
+        other.Rows.swap(Rows);
+        other.ByteSize = std::exchange(ByteSize, 0);
+        other.LastKey = std::exchange(LastKey, {});
+    }
+
+    void Clear() {
+        Rows->clear();
+        ByteSize = 0;
+        LastKey = {};
+    }
+
+    void AddRow(TSerializedCellVec&& key, TSerializedCellVec&& targetPk, TString&& targetValue) {
+        Rows->emplace_back(std::move(targetPk), std::move(targetValue));
+        ByteSize += Rows->back().first.GetBuffer().size() + Rows->back().second.size();
+        LastKey = std::move(key);
+    }
+
+    bool IsEmpty() const {
+        return Rows->empty();
+    }
+
+    size_t Size() const {
+        return Rows->size();
+    }
+
+    bool IsReachLimits(const TUploadLimits& Limits) {
+        // TODO(mbkkt) why [0..BatchRowsLimit) but [0..BatchBytesLimit]
+        return Rows->size() >= Limits.BatchRowsLimit || ByteSize > Limits.BatchBytesLimit;
+    }
+
+    auto&& ExtractLastKey() {
+        return std::move(LastKey);
+    }
+
+    const auto& GetLastKey() const {
+        return LastKey;
+    }
+
+private:
+    std::shared_ptr<NTxProxy::TUploadRows> Rows;
+    ui64 ByteSize = 0;
+    TSerializedCellVec LastKey;
+};
+
+}

+ 19 - 124
ydb/core/tx/datashard/build_index.cpp

@@ -2,6 +2,7 @@
 #include "range_ops.h"
 #include "scan_common.h"
 #include "upload_stats.h"
+#include "buffer_data.h"
 
 #include <ydb/core/base/appdata.h>
 #include <ydb/core/base/counters.h>
@@ -27,36 +28,11 @@ namespace NKikimr::NDataShard {
 #define LOG_W(stream) LOG_WARN_S(*TlsActivationContext, NKikimrServices::TX_DATASHARD, stream)
 #define LOG_E(stream) LOG_ERROR_S(*TlsActivationContext, NKikimrServices::TX_DATASHARD, stream)
 
-using TColumnsTypes = THashMap<TString, NScheme::TTypeInfo>;
-using TTypes = NTxProxy::TUploadTypes;
-using TRows = NTxProxy::TUploadRows;
-
-static TColumnsTypes GetAllTypes(const TUserTable& tableInfo) {
-    TColumnsTypes result;
-
-    for (const auto& it : tableInfo.Columns) {
-        result[it.second.Name] = it.second.Type;
-    }
-
-    return result;
-}
-
-static void ProtoYdbTypeFromTypeInfo(Ydb::Type* type, const NScheme::TTypeInfo typeInfo) {
-    if (typeInfo.GetTypeId() == NScheme::NTypeIds::Pg) {
-        auto* typeDesc = typeInfo.GetTypeDesc();
-        auto* pg = type->mutable_pg_type();
-        pg->set_type_name(NPg::PgTypeNameFromTypeDesc(typeDesc));
-        pg->set_oid(NPg::PgTypeIdFromTypeDesc(typeDesc));
-    } else {
-        type->set_type_id((Ydb::Type::PrimitiveTypeId)typeInfo.GetTypeId());
-    }
-}
-
-static std::shared_ptr<TTypes> BuildTypes(const TUserTable& tableInfo, const NKikimrIndexBuilder::TColumnBuildSettings& buildSettings) {
+static std::shared_ptr<NTxProxy::TUploadTypes> BuildTypes(const TUserTable& tableInfo, const NKikimrIndexBuilder::TColumnBuildSettings& buildSettings) {
     auto types = GetAllTypes(tableInfo);
 
     Y_ABORT_UNLESS(buildSettings.columnSize() > 0);
-    auto result = std::make_shared<TTypes>();
+    auto result = std::make_shared<NTxProxy::TUploadTypes>();
     result->reserve(tableInfo.KeyColumnIds.size() + buildSettings.columnSize());
 
     for (const auto& keyColId : tableInfo.KeyColumnIds) {
@@ -72,10 +48,10 @@ static std::shared_ptr<TTypes> BuildTypes(const TUserTable& tableInfo, const NKi
     return result;
 }
 
-static std::shared_ptr<TTypes> BuildTypes(const TUserTable& tableInfo, TProtoColumnsCRef indexColumns, TProtoColumnsCRef dataColumns) {
+static std::shared_ptr<NTxProxy::TUploadTypes> BuildTypes(const TUserTable& tableInfo, TProtoColumnsCRef indexColumns, TProtoColumnsCRef dataColumns) {
     auto types = GetAllTypes(tableInfo);
 
-    auto result = std::make_shared<TTypes>();
+    auto result = std::make_shared<NTxProxy::TUploadTypes>();
     result->reserve(indexColumns.size() + dataColumns.size());
 
     for (const auto& colName : indexColumns) {
@@ -119,74 +95,6 @@ bool BuildExtraColumns(TVector<TCell>& cells, const NKikimrIndexBuilder::TColumn
     return true;
 }
 
-class TBufferData: public IStatHolder, public TNonCopyable {
-public:
-    TBufferData()
-        : Rows(new TRows)
-    {
-    }
-
-    ui64 GetRows() const override final {
-        return Rows->size();
-    }
-
-    std::shared_ptr<TRows> GetRowsData() const {
-        return Rows;
-    }
-
-    ui64 GetBytes() const override final {
-        return ByteSize;
-    }
-
-    void FlushTo(TBufferData& other) {
-        if (this == &other) {
-            return;
-        }
-
-        Y_ABORT_UNLESS(other.Rows);
-        Y_ABORT_UNLESS(other.IsEmpty());
-
-        other.Rows.swap(Rows);
-        other.ByteSize = ByteSize;
-        other.LastKey = std::move(LastKey);
-
-        Clear();
-    }
-
-    void Clear() {
-        Rows->clear();
-        ByteSize = 0;
-        LastKey = {};
-    }
-
-    void AddRow(TSerializedCellVec&& key, TSerializedCellVec&& targetPk, TString&& targetValue) {
-        Rows->emplace_back(std::move(targetPk), std::move(targetValue));
-        ByteSize += Rows->back().first.GetBuffer().size() + Rows->back().second.size();
-        LastKey = std::move(key);
-    }
-
-    bool IsEmpty() const {
-        return Rows->empty();
-    }
-
-    bool IsReachLimits(const TUploadLimits& Limits) {
-        return Rows->size() >= Limits.BatchRowsLimit || ByteSize > Limits.BatchBytesLimit;
-    }
-
-    void ExtractLastKey(TSerializedCellVec& out) {
-        out = std::move(LastKey);
-    }
-
-    const TSerializedCellVec& GetLastKey() const {
-        return LastKey;
-    }
-
-private:
-    std::shared_ptr<TRows> Rows;
-    ui64 ByteSize = 0;
-    TSerializedCellVec LastKey;
-};
-
 template <NKikimrServices::TActivity::EType Activity>
 class TBuildScanUpload: public TActor<TBuildScanUpload<Activity>>, public NTable::IScan {
     using TThis = TBuildScanUpload<Activity>;
@@ -202,8 +110,8 @@ protected:
     const ui64 DataShardId;
     const TActorId ProgressActorId;
 
-    TTags ScanTags;                             // first: columns we scan, order as in IndexTable
-    std::shared_ptr<TTypes> UploadColumnsTypes; // columns types we upload to indexTable
+    TTags ScanTags;                                             // first: columns we scan, order as in IndexTable
+    std::shared_ptr<NTxProxy::TUploadTypes> UploadColumnsTypes; // columns types we upload to indexTable
     NTxProxy::EUploadRowsMode UploadMode;
 
     const TTags KeyColumnIds;
@@ -242,8 +150,7 @@ protected:
         , KeyColumnIds(tableInfo.KeyColumnIds)
         , KeyTypes(tableInfo.KeyColumnTypes)
         , TableRange(tableInfo.Range)
-        , RequestedRange(range)
-    {
+        , RequestedRange(range) {
     }
 
     template <typename TAddRow>
@@ -322,11 +229,8 @@ public:
     }
 
     TAutoPtr<IDestructable> Finish(EAbort abort) noexcept override {
-        auto ctx = TActivationContext::AsActorContext().MakeFor(TBase::SelfId());
-
         if (Uploader) {
-            TAutoPtr<TEvents::TEvPoisonPill> poison = new TEvents::TEvPoisonPill;
-            ctx.Send(Uploader, poison.Release());
+            this->Send(Uploader, new TEvents::TEvPoisonPill);
             Uploader = {};
         }
 
@@ -349,7 +253,7 @@ public:
 
         UploadStatusToMessage(progress->Record);
 
-        ctx.Send(ProgressActorId, progress.Release());
+        this->Send(ProgressActorId, progress.Release());
 
         LOG_D("Finish " << Debug());
 
@@ -382,11 +286,7 @@ public:
               << " WriteBuf empty: " << WriteBuf.IsEmpty()
               << " " << Debug());
 
-        if (ReadBuf.IsEmpty()) {
-            return EScan::Feed;
-        }
-
-        if (WriteBuf.IsEmpty()) {
+        if (!ReadBuf.IsEmpty() && WriteBuf.IsEmpty()) {
             ReadBuf.FlushTo(WriteBuf);
             Upload();
         }
@@ -433,7 +333,7 @@ private:
 
         if (UploadStatus.IsSuccess()) {
             Stats.Aggr(&WriteBuf);
-            WriteBuf.ExtractLastKey(LastUploadedKey);
+            LastUploadedKey = WriteBuf.ExtractLastKey();
 
             //send progress
             TAutoPtr<TEvDataShard::TEvBuildIndexProgressResponse> progress = new TEvDataShard::TEvBuildIndexProgressResponse;
@@ -451,7 +351,7 @@ private:
             progress->Record.SetStatus(NKikimrIndexBuilder::EBuildStatus::IN_PROGRESS);
             UploadStatusToMessage(progress->Record);
 
-            ctx.Send(ProgressActorId, progress.Release());
+            this->Send(ProgressActorId, progress.Release());
 
             if (!ReadBuf.IsEmpty() && ReadBuf.IsReachLimits(Limits)) {
                 ReadBuf.FlushTo(WriteBuf);
@@ -488,13 +388,13 @@ private:
         LOG_D("Upload, last key " << DebugPrintPoint(KeyTypes, WriteBuf.GetLastKey().GetCells(), *AppData()->TypeRegistry) << " " << Debug());
 
         auto actor = NTxProxy::CreateUploadRowsInternal(
-            TBase::SelfId(), TargetTable,
+            this->SelfId(), TargetTable,
             UploadColumnsTypes,
             WriteBuf.GetRowsData(),
             UploadMode,
             true /*writeToPrivateTable*/);
 
-        Uploader = TActivationContext::AsActorContext().MakeFor(TBase::SelfId()).Register(actor);
+        Uploader = this->Register(actor);
     }
 };
 
@@ -513,8 +413,7 @@ public:
                     const TUserTable& tableInfo,
                     TUploadLimits limits)
         : TBuildScanUpload(buildIndexId, target, seqNo, dataShardId, progressActorId, range, tableInfo, limits)
-        , TargetDataColumnPos(targetIndexColumns.size())
-    {
+        , TargetDataColumnPos(targetIndexColumns.size()) {
         ScanTags = BuildTags(tableInfo, targetIndexColumns, targetDataColumns);
         UploadColumnsTypes = BuildTypes(tableInfo, targetIndexColumns, targetDataColumns);
         UploadMode = NTxProxy::EUploadRowsMode::WriteToTableShadow;
@@ -545,8 +444,7 @@ public:
                       const NKikimrIndexBuilder::TColumnBuildSettings& columnBuildSettings,
                       const TUserTable& tableInfo,
                       TUploadLimits limits)
-        : TBuildScanUpload(buildIndexId, target, seqNo, dataShardId, progressActorId, range, tableInfo, limits)
-    {
+        : TBuildScanUpload(buildIndexId, target, seqNo, dataShardId, progressActorId, range, tableInfo, limits) {
         Y_ABORT_UNLESS(columnBuildSettings.columnSize() > 0);
         UploadColumnsTypes = BuildTypes(tableInfo, columnBuildSettings);
         UploadMode = NTxProxy::EUploadRowsMode::UpsertIfExists;
@@ -582,8 +480,7 @@ TAutoPtr<NTable::IScan> CreateBuildIndexScan(
     TProtoColumnsCRef targetDataColumns,
     const NKikimrIndexBuilder::TColumnBuildSettings& columnsToBuild,
     const TUserTable& tableInfo,
-    TUploadLimits limits)
-{
+    TUploadLimits limits) {
     if (columnsToBuild.columnSize() > 0) {
         return new TBuildColumnsScan(
             buildIndexId, target, seqNo, dataShardId, progressActorId, range, columnsToBuild, tableInfo, limits);
@@ -596,8 +493,7 @@ class TDataShard::TTxHandleSafeBuildIndexScan: public NTabletFlatExecutor::TTran
 public:
     TTxHandleSafeBuildIndexScan(TDataShard* self, TEvDataShard::TEvBuildIndexCreateRequest::TPtr&& ev)
         : TTransactionBase(self)
-        , Ev(std::move(ev))
-    {
+        , Ev(std::move(ev)) {
     }
 
     bool Execute(TTransactionContext&, const TActorContext& ctx) {
@@ -628,7 +524,6 @@ void TDataShard::HandleSafe(TEvDataShard::TEvBuildIndexCreateRequest::TPtr& ev,
         return;
     }
 
-
     TScanRecord::TSeqNo seqNo = {record.GetSeqNoGeneration(), record.GetSeqNoRound()};
     auto badRequest = [&](const TString& error) {
         auto response = MakeHolder<TEvDataShard::TEvBuildIndexProgressResponse>();

+ 15 - 0
ydb/core/tx/datashard/datashard.h

@@ -332,6 +332,9 @@ struct TEvDataShard {
         EvSampleKRequest,
         EvSampleKResponse,
 
+        EvLocalKMeansRequest,
+        EvLocalKMeansProgressResponse,
+
         EvEnd
     };
 
@@ -1454,6 +1457,18 @@ struct TEvDataShard {
                           TEvDataShard::EvSampleKResponse> {
     };
 
+    struct TEvLocalKMeansRequest
+        : public TEventPB<TEvLocalKMeansRequest,
+                          NKikimrTxDataShard::TEvLocalKMeansRequest,
+                          TEvDataShard::EvLocalKMeansRequest> {
+    };
+
+    struct TEvLocalKMeansProgressResponse
+        : public TEventPB<TEvLocalKMeansProgressResponse,
+                          NKikimrTxDataShard::TEvLocalKMeansProgressResponse,
+                          TEvDataShard::EvLocalKMeansProgressResponse> {
+    };
+
     struct TEvKqpScan
         : public TEventPB<TEvKqpScan,
                           NKikimrTxDataShard::TEvKqpScan,

+ 4 - 0
ydb/core/tx/datashard/datashard_impl.h

@@ -253,6 +253,7 @@ class TDataShard
     class TTxHandleSafeKqpScan;
     class TTxHandleSafeBuildIndexScan;
     class TTxHandleSafeSampleKScan;
+    class TTxHandleSafeLocalKMeansScan;
     class TTxHandleSafeStatisticsScan;
 
     class TTxMediatorStateRestored;
@@ -1324,6 +1325,8 @@ class TDataShard
     void HandleSafe(TEvDataShard::TEvBuildIndexCreateRequest::TPtr& ev, const TActorContext& ctx);
     void Handle(TEvDataShard::TEvSampleKRequest::TPtr& ev, const TActorContext& ctx);
     void HandleSafe(TEvDataShard::TEvSampleKRequest::TPtr& ev, const TActorContext& ctx);
+    void Handle(TEvDataShard::TEvLocalKMeansRequest::TPtr& ev, const TActorContext& ctx);
+    void HandleSafe(TEvDataShard::TEvLocalKMeansRequest::TPtr& ev, const TActorContext& ctx);
     void Handle(TEvDataShard::TEvCdcStreamScanRequest::TPtr& ev, const TActorContext& ctx);
     void Handle(TEvPrivate::TEvCdcStreamScanRegistered::TPtr& ev, const TActorContext& ctx);
     void Handle(TEvPrivate::TEvCdcStreamScanProgress::TPtr& ev, const TActorContext& ctx);
@@ -3114,6 +3117,7 @@ protected:
             HFunc(TEvDataShard::TEvDiscardVolatileSnapshotRequest, Handle);
             HFuncTraced(TEvDataShard::TEvBuildIndexCreateRequest, Handle);
             HFunc(TEvDataShard::TEvSampleKRequest, Handle);
+            HFunc(TEvDataShard::TEvLocalKMeansRequest, Handle);
             HFunc(TEvDataShard::TEvCdcStreamScanRequest, Handle);
             HFunc(TEvPrivate::TEvCdcStreamScanRegistered, Handle);
             HFunc(TEvPrivate::TEvCdcStreamScanProgress, Handle);

+ 0 - 5
ydb/core/tx/datashard/datashard_ut_build_index.cpp

@@ -13,11 +13,6 @@
 
 #include <library/cpp/testing/unittest/registar.h>
 
-template <>
-inline void Out<NKikimrIndexBuilder::EBuildStatus>(IOutputStream& o, NKikimrIndexBuilder::EBuildStatus status) {
-    o << NKikimrIndexBuilder::EBuildStatus_Name(status);
-}
-
 namespace NKikimr {
 
 using namespace NKikimr::NDataShard::NKqpHelpers;

+ 700 - 0
ydb/core/tx/datashard/datashard_ut_local_kmeans.cpp

@@ -0,0 +1,700 @@
+#include <ydb/core/testlib/test_client.h>
+#include <ydb/core/tx/datashard/ut_common/datashard_ut_common.h>
+#include <ydb/core/tx/schemeshard/schemeshard.h>
+#include <ydb/core/tx/tx_proxy/proxy.h>
+#include <ydb/core/tx/tx_proxy/upload_rows.h>
+#include <ydb/core/protos/index_builder.pb.h>
+
+#include <ydb/library/yql/public/issue/yql_issue_message.h>
+
+#include <library/cpp/testing/unittest/registar.h>
+
+namespace NKikimr {
+using namespace Tests;
+using Ydb::Table::VectorIndexSettings;
+using namespace NTableIndex::NTableVectorKmeansTreeIndex;
+
+static std::atomic<ui64> sId = 1;
+static constexpr const char* kMainTable = "/Root/table-main";
+static constexpr const char* kLevelTable = "/Root/table-level";
+static constexpr const char* kPostingTable = "/Root/table-posting";
+
+Y_UNIT_TEST_SUITE (TTxDataShardLocalKMeansScan) {
+    static void DoBadRequest(Tests::TServer::TPtr server, TActorId sender, std::unique_ptr<TEvDataShard::TEvLocalKMeansRequest> & ev,
+                             size_t dims = 2, VectorIndexSettings::VectorType type = VectorIndexSettings::VECTOR_TYPE_FLOAT, VectorIndexSettings::Distance metric = VectorIndexSettings::DISTANCE_COSINE) {
+        auto id = sId.fetch_add(1, std::memory_order_relaxed);
+        auto& runtime = *server->GetRuntime();
+        auto snapshot = CreateVolatileSnapshot(server, {kMainTable});
+        auto datashards = GetTableShards(server, sender, kMainTable);
+        TTableId tableId = ResolveTableId(server, sender, kMainTable);
+
+        TStringBuilder data;
+        TString err;
+        UNIT_ASSERT(datashards.size() == 1);
+
+        for (auto tid : datashards) {
+            auto& rec = ev->Record;
+            rec.SetId(1);
+
+            rec.SetSeqNoGeneration(id);
+            rec.SetSeqNoRound(1);
+
+            if (!rec.HasTabletId()) {
+                rec.SetTabletId(tid);
+            }
+            if (!rec.HasPathId()) {
+                PathIdFromPathId(tableId.PathId, rec.MutablePathId());
+            }
+
+            rec.SetSnapshotTxId(snapshot.TxId);
+            rec.SetSnapshotStep(snapshot.Step);
+
+            VectorIndexSettings settings;
+            settings.set_vector_dimension(dims);
+            settings.set_vector_type(type);
+            settings.set_distance(metric);
+            *rec.MutableSettings() = settings;
+
+            if (!rec.HasK()) {
+                rec.SetK(2);
+            }
+            rec.SetSeed(1337);
+
+            rec.SetState(NKikimrTxDataShard::TEvLocalKMeansRequest::SAMPLE);
+            rec.SetUpload(NKikimrTxDataShard::TEvLocalKMeansRequest::UPLOAD_MAIN_TO_POSTING);
+
+            rec.SetDoneRounds(0);
+            rec.SetNeedsRounds(3);
+
+            rec.SetParent(0);
+            rec.SetChild(1);
+
+            if (rec.HasEmbeddingColumn()) {
+                rec.ClearEmbeddingColumn();
+            } else {
+                rec.SetEmbeddingColumn("embedding");
+            }
+
+            rec.SetLevelName(kLevelTable);
+            rec.SetPostingName(kPostingTable);
+
+            runtime.SendToPipe(tid, sender, ev.release(), 0, GetPipeConfigWithRetries());
+
+            TAutoPtr<IEventHandle> handle;
+            auto reply = runtime.GrabEdgeEventRethrow<TEvDataShard::TEvLocalKMeansProgressResponse>(handle);
+            UNIT_ASSERT_VALUES_EQUAL(reply->Record.GetStatus(), NKikimrIndexBuilder::EBuildStatus::BAD_REQUEST);
+        }
+    }
+
+    static std::tuple<TString, TString> DoLocalKMeans(Tests::TServer::TPtr server, TActorId sender, ui32 parent, ui64 seed, ui64 k,
+                                                      NKikimrTxDataShard::TEvLocalKMeansRequest::EState upload,
+                                                      VectorIndexSettings::VectorType type, auto metric) {
+        auto id = sId.fetch_add(1, std::memory_order_relaxed);
+        auto& runtime = *server->GetRuntime();
+        auto snapshot = CreateVolatileSnapshot(server, {kMainTable});
+        auto datashards = GetTableShards(server, sender, kMainTable);
+        TTableId tableId = ResolveTableId(server, sender, kMainTable);
+
+        TString err;
+
+        for (auto tid : datashards) {
+            auto ev1 = std::make_unique<TEvDataShard::TEvLocalKMeansRequest>();
+            auto ev2 = std::make_unique<TEvDataShard::TEvLocalKMeansRequest>();
+            auto fill = [&](std::unique_ptr<TEvDataShard::TEvLocalKMeansRequest>& ev) {
+                auto& rec = ev->Record;
+                rec.SetId(1);
+
+                rec.SetSeqNoGeneration(id);
+                rec.SetSeqNoRound(1);
+
+                rec.SetTabletId(tid);
+                PathIdFromPathId(tableId.PathId, rec.MutablePathId());
+
+                rec.SetSnapshotTxId(snapshot.TxId);
+                rec.SetSnapshotStep(snapshot.Step);
+
+                VectorIndexSettings settings;
+                settings.set_vector_dimension(2);
+                settings.set_vector_type(type);
+                if constexpr (std::is_same_v<decltype(metric), VectorIndexSettings::Distance>) {
+                    settings.set_distance(metric);
+                } else {
+                    settings.set_similarity(metric);
+                }
+                *rec.MutableSettings() = settings;
+
+                rec.SetK(k);
+                rec.SetSeed(seed);
+
+                rec.SetState(NKikimrTxDataShard::TEvLocalKMeansRequest::SAMPLE);
+                rec.SetUpload(upload);
+
+                rec.SetDoneRounds(0);
+                rec.SetNeedsRounds(3);
+
+                rec.SetParent(parent);
+                rec.SetChild(parent + 1);
+
+                rec.SetEmbeddingColumn("embedding");
+                rec.AddDataColumns("data");
+
+                rec.SetLevelName(kLevelTable);
+                rec.SetPostingName(kPostingTable);
+            };
+            fill(ev1);
+            fill(ev2);
+
+            runtime.SendToPipe(tid, sender, ev1.release(), 0, GetPipeConfigWithRetries());
+            runtime.SendToPipe(tid, sender, ev2.release(), 0, GetPipeConfigWithRetries());
+
+            TAutoPtr<IEventHandle> handle;
+            auto reply = runtime.GrabEdgeEventRethrow<TEvDataShard::TEvLocalKMeansProgressResponse>(handle);
+
+            NYql::TIssues issues;
+            NYql::IssuesFromMessage(reply->Record.GetIssues(), issues);
+            UNIT_ASSERT_EQUAL_C(reply->Record.GetStatus(), NKikimrIndexBuilder::EBuildStatus::DONE, issues.ToOneLineString());
+        }
+
+        auto level = ReadShardedTable(server, kLevelTable);
+        auto posting = ReadShardedTable(server, kPostingTable);
+        return {std::move(level), std::move(posting)};
+    }
+
+    static void DropTable(Tests::TServer::TPtr server, TActorId sender, const char* name) {
+        ui64 txId = AsyncDropTable(server, sender, "/Root", name);
+        WaitTxNotification(server, sender, txId);
+    }
+
+    static void CreateMainTable(Tests::TServer::TPtr server, TActorId sender, TShardedTableOptions options) {
+        options.AllowSystemColumnNames(false);
+        options.Columns({
+            {"key", "Uint32", true, true},
+            {"embedding", "String", false, false},
+            {"data", "String", false, false},
+        });
+        CreateShardedTable(server, sender, "/Root", "table-main", options);
+    }
+
+    static void CreateLevelTable(Tests::TServer::TPtr server, TActorId sender, TShardedTableOptions options) {
+        options.AllowSystemColumnNames(true);
+        options.Columns({
+            {LevelTable_ParentIdColumn, "Uint32", true, true},
+            {LevelTable_IdColumn, "Uint32", true, true},
+            {LevelTable_EmbeddingColumn, "String", false, true},
+        });
+        CreateShardedTable(server, sender, "/Root", "table-level", options);
+    }
+
+    static void CreatePostingTable(Tests::TServer::TPtr server, TActorId sender, TShardedTableOptions options) {
+        options.AllowSystemColumnNames(true);
+        options.Columns({
+            {PostingTable_ParentIdColumn, "Uint32", true, true},
+            {"key", "Uint32", true, true},
+            {"data", "String", false, false},
+        });
+        CreateShardedTable(server, sender, "/Root", "table-posting", options);
+    }
+
+    static void CreateTmpTable(Tests::TServer::TPtr server, TActorId sender, TShardedTableOptions options, const char* name) {
+        options.AllowSystemColumnNames(true);
+        options.Columns({
+            {PostingTable_ParentIdColumn, "Uint32", true, true},
+            {"key", "Uint32", true, true},
+            {"embedding", "String", false, false},
+            {"data", "String", false, false},
+        });
+        CreateShardedTable(server, sender, "/Root", name, options);
+    }
+
+    Y_UNIT_TEST (BadRequest) {
+        TPortManager pm;
+        TServerSettings serverSettings(pm.GetPort(2134));
+        serverSettings.SetDomainName("Root");
+
+        Tests::TServer::TPtr server = new TServer(serverSettings);
+        auto& runtime = *server->GetRuntime();
+        auto sender = runtime.AllocateEdgeActor();
+
+        runtime.SetLogPriority(NKikimrServices::TX_DATASHARD, NLog::PRI_DEBUG);
+
+        InitRoot(server, sender);
+
+        CreateShardedTable(server, sender, "/Root", "table-main", 1);
+
+        {
+            auto ev = std::make_unique<TEvDataShard::TEvLocalKMeansRequest>();
+            auto& rec = ev->Record;
+
+            rec.SetK(0);
+            DoBadRequest(server, sender, ev);
+        }
+        {
+            auto ev = std::make_unique<TEvDataShard::TEvLocalKMeansRequest>();
+            auto& rec = ev->Record;
+
+            rec.SetK(1);
+            DoBadRequest(server, sender, ev);
+        }
+        {
+            auto ev = std::make_unique<TEvDataShard::TEvLocalKMeansRequest>();
+            auto& rec = ev->Record;
+
+            rec.SetEmbeddingColumn("some");
+            DoBadRequest(server, sender, ev);
+        }
+        {
+            auto ev = std::make_unique<TEvDataShard::TEvLocalKMeansRequest>();
+            auto& rec = ev->Record;
+
+            rec.SetTabletId(0);
+            DoBadRequest(server, sender, ev);
+        }
+        {
+            auto ev = std::make_unique<TEvDataShard::TEvLocalKMeansRequest>();
+            auto& rec = ev->Record;
+
+            PathIdFromPathId({0, 0}, rec.MutablePathId());
+            DoBadRequest(server, sender, ev);
+        }
+        {
+            auto ev = std::make_unique<TEvDataShard::TEvLocalKMeansRequest>();
+
+            DoBadRequest(server, sender, ev, 0);
+        }
+        {
+            auto ev = std::make_unique<TEvDataShard::TEvLocalKMeansRequest>();
+
+            // TODO(mbkkt) bit vector not supported for now
+            DoBadRequest(server, sender, ev, 2, VectorIndexSettings::VECTOR_TYPE_BIT);
+        }
+        {
+            auto ev = std::make_unique<TEvDataShard::TEvLocalKMeansRequest>();
+
+            DoBadRequest(server, sender, ev, 2, VectorIndexSettings::VECTOR_TYPE_UNSPECIFIED);
+        }
+        {
+            auto ev = std::make_unique<TEvDataShard::TEvLocalKMeansRequest>();
+
+            DoBadRequest(server, sender, ev, 2, VectorIndexSettings::VECTOR_TYPE_FLOAT, VectorIndexSettings::DISTANCE_UNSPECIFIED);
+        }
+        // TODO(mbkkt) For now all build_index, sample_k, build_columns, local_kmeans doesn't really check this
+        // {
+        //     auto ev = std::make_unique<TEvDataShard::TEvLocalKMeansRequest>();
+        //     auto snapshotCopy = snapshot;
+        //     snapshotCopy.Step++;
+        //     DoBadRequest(server, sender, ev);
+        // }
+        // {
+        //     auto ev = std::make_unique<TEvDataShard::TEvLocalKMeansRequest>();
+        //     auto snapshotCopy = snapshot;
+        //     snapshotCopy.TxId++;
+        //     DoBadRequest(server, sender, ev);
+        // }
+    }
+
+    Y_UNIT_TEST (MainToPosting) {
+        TPortManager pm;
+        TServerSettings serverSettings(pm.GetPort(2134));
+        serverSettings.SetDomainName("Root");
+
+        Tests::TServer::TPtr server = new TServer(serverSettings);
+        auto& runtime = *server->GetRuntime();
+        auto sender = runtime.AllocateEdgeActor();
+
+        runtime.SetLogPriority(NKikimrServices::TX_DATASHARD, NLog::PRI_DEBUG);
+
+        InitRoot(server, sender);
+
+        TShardedTableOptions options;
+        options.EnableOutOfOrder(true); // TODO(mbkkt) what is it?
+        options.Shards(1);
+
+        CreateMainTable(server, sender, options);
+        // Upsert some initial values
+        ExecSQL(server, sender, R"(
+        UPSERT INTO `/Root/table-main` 
+            (key, embedding, data) 
+        VALUES )"
+                                "(1, \"\x30\x30\3\", \"one\"),"
+                                "(2, \"\x31\x31\3\", \"two\"),"
+                                "(3, \"\x32\x32\3\", \"three\"),"
+                                "(4, \"\x65\x65\3\", \"four\"),"
+                                "(5, \"\x75\x75\3\", \"five\");");
+
+        auto create = [&] {
+            CreateLevelTable(server, sender, options);
+            CreatePostingTable(server, sender, options);
+        };
+        create();
+        auto recreate = [&] {
+            DropTable(server, sender, "table-level");
+            DropTable(server, sender, "table-posting");
+            create();
+        };
+
+        ui64 seed, k;
+        k = 2;
+
+        seed = 0;
+        for (auto distance : {VectorIndexSettings::DISTANCE_MANHATTAN, VectorIndexSettings::DISTANCE_EUCLIDEAN}) {
+            auto [level, posting] = DoLocalKMeans(server, sender, 0, seed, k, NKikimrTxDataShard::TEvLocalKMeansRequest::UPLOAD_MAIN_TO_POSTING, VectorIndexSettings::VECTOR_TYPE_UINT8, distance);
+            UNIT_ASSERT_VALUES_EQUAL(level,
+                                     "__ydb_parent = 0, __ydb_id = 1, __ydb_embedding = mm\3\n"
+                                     "__ydb_parent = 0, __ydb_id = 2, __ydb_embedding = 11\3\n");
+            UNIT_ASSERT_VALUES_EQUAL(posting,
+                                     "__ydb_parent = 1, key = 4, data = four\n"
+                                     "__ydb_parent = 1, key = 5, data = five\n"
+                                     "__ydb_parent = 2, key = 1, data = one\n"
+                                     "__ydb_parent = 2, key = 2, data = two\n"
+                                     "__ydb_parent = 2, key = 3, data = three\n");
+            recreate();
+        }
+
+        seed = 111;
+        for (auto distance : {VectorIndexSettings::DISTANCE_MANHATTAN, VectorIndexSettings::DISTANCE_EUCLIDEAN}) {
+            auto [level, posting] = DoLocalKMeans(server, sender, 0, seed, k, NKikimrTxDataShard::TEvLocalKMeansRequest::UPLOAD_MAIN_TO_POSTING, VectorIndexSettings::VECTOR_TYPE_UINT8, distance);
+            UNIT_ASSERT_VALUES_EQUAL(level,
+                                     "__ydb_parent = 0, __ydb_id = 1, __ydb_embedding = 11\3\n"
+                                     "__ydb_parent = 0, __ydb_id = 2, __ydb_embedding = mm\3\n");
+            UNIT_ASSERT_VALUES_EQUAL(posting,
+                                     "__ydb_parent = 1, key = 1, data = one\n"
+                                     "__ydb_parent = 1, key = 2, data = two\n"
+                                     "__ydb_parent = 1, key = 3, data = three\n"
+                                     "__ydb_parent = 2, key = 4, data = four\n"
+                                     "__ydb_parent = 2, key = 5, data = five\n");
+            recreate();
+        }
+        seed = 32;
+        for (auto similarity : {VectorIndexSettings::SIMILARITY_INNER_PRODUCT, VectorIndexSettings::SIMILARITY_COSINE}) {
+            auto [level, posting] = DoLocalKMeans(server, sender, 0, seed, k, NKikimrTxDataShard::TEvLocalKMeansRequest::UPLOAD_MAIN_TO_POSTING, VectorIndexSettings::VECTOR_TYPE_UINT8, similarity);
+            UNIT_ASSERT_VALUES_EQUAL(level,
+                                     "__ydb_parent = 0, __ydb_id = 1, __ydb_embedding = II\3\n");
+            UNIT_ASSERT_VALUES_EQUAL(posting,
+                                     "__ydb_parent = 1, key = 1, data = one\n"
+                                     "__ydb_parent = 1, key = 2, data = two\n"
+                                     "__ydb_parent = 1, key = 3, data = three\n"
+                                     "__ydb_parent = 1, key = 4, data = four\n"
+                                     "__ydb_parent = 1, key = 5, data = five\n");
+            recreate();
+        }
+        seed = 13;
+        {
+            auto [level, posting] = DoLocalKMeans(server, sender, 0, seed, k, NKikimrTxDataShard::TEvLocalKMeansRequest::UPLOAD_MAIN_TO_POSTING, VectorIndexSettings::VECTOR_TYPE_UINT8, VectorIndexSettings::DISTANCE_COSINE);
+            UNIT_ASSERT_VALUES_EQUAL(level,
+                                     "__ydb_parent = 0, __ydb_id = 1, __ydb_embedding = II\3\n");
+            UNIT_ASSERT_VALUES_EQUAL(posting,
+                                     "__ydb_parent = 1, key = 1, data = one\n"
+                                     "__ydb_parent = 1, key = 2, data = two\n"
+                                     "__ydb_parent = 1, key = 3, data = three\n"
+                                     "__ydb_parent = 1, key = 4, data = four\n"
+                                     "__ydb_parent = 1, key = 5, data = five\n");
+            recreate();
+        }
+    }
+
+    Y_UNIT_TEST (MainToTmp) {
+        TPortManager pm;
+        TServerSettings serverSettings(pm.GetPort(2134));
+        serverSettings.SetDomainName("Root");
+
+        Tests::TServer::TPtr server = new TServer(serverSettings);
+        auto& runtime = *server->GetRuntime();
+        auto sender = runtime.AllocateEdgeActor();
+
+        runtime.SetLogPriority(NKikimrServices::TX_DATASHARD, NLog::PRI_DEBUG);
+
+        InitRoot(server, sender);
+
+        TShardedTableOptions options;
+        options.EnableOutOfOrder(true); // TODO(mbkkt) what is it?
+        options.Shards(1);
+
+        CreateMainTable(server, sender, options);
+        // Upsert some initial values
+        ExecSQL(server, sender, R"(
+        UPSERT INTO `/Root/table-main` 
+            (key, embedding, data) 
+        VALUES )"
+                                "(1, \"\x30\x30\3\", \"one\"),"
+                                "(2, \"\x31\x31\3\", \"two\"),"
+                                "(3, \"\x32\x32\3\", \"three\"),"
+                                "(4, \"\x65\x65\3\", \"four\"),"
+                                "(5, \"\x75\x75\3\", \"five\");");
+
+        auto create = [&] {
+            CreateLevelTable(server, sender, options);
+            CreateTmpTable(server, sender, options, "table-posting");
+        };
+        create();
+        auto recreate = [&] {
+            DropTable(server, sender, "table-level");
+            DropTable(server, sender, "table-posting");
+            create();
+        };
+
+        ui64 seed, k;
+        k = 2;
+
+        seed = 0;
+        for (auto distance : {VectorIndexSettings::DISTANCE_MANHATTAN, VectorIndexSettings::DISTANCE_EUCLIDEAN}) {
+            auto [level, posting] = DoLocalKMeans(server, sender, 0, seed, k, NKikimrTxDataShard::TEvLocalKMeansRequest::UPLOAD_MAIN_TO_TMP, VectorIndexSettings::VECTOR_TYPE_UINT8, distance);
+            UNIT_ASSERT_VALUES_EQUAL(level,
+                                     "__ydb_parent = 0, __ydb_id = 1, __ydb_embedding = mm\3\n"
+                                     "__ydb_parent = 0, __ydb_id = 2, __ydb_embedding = 11\3\n");
+            UNIT_ASSERT_VALUES_EQUAL(posting,
+                                     "__ydb_parent = 1, key = 4, embedding = \x65\x65\3, data = four\n"
+                                     "__ydb_parent = 1, key = 5, embedding = \x75\x75\3, data = five\n"
+                                     "__ydb_parent = 2, key = 1, embedding = \x30\x30\3, data = one\n"
+                                     "__ydb_parent = 2, key = 2, embedding = \x31\x31\3, data = two\n"
+                                     "__ydb_parent = 2, key = 3, embedding = \x32\x32\3, data = three\n");
+            recreate();
+        }
+
+        seed = 111;
+        for (auto distance : {VectorIndexSettings::DISTANCE_MANHATTAN, VectorIndexSettings::DISTANCE_EUCLIDEAN}) {
+            auto [level, posting] = DoLocalKMeans(server, sender, 0, seed, k, NKikimrTxDataShard::TEvLocalKMeansRequest::UPLOAD_MAIN_TO_TMP, VectorIndexSettings::VECTOR_TYPE_UINT8, distance);
+            UNIT_ASSERT_VALUES_EQUAL(level,
+                                     "__ydb_parent = 0, __ydb_id = 1, __ydb_embedding = 11\3\n"
+                                     "__ydb_parent = 0, __ydb_id = 2, __ydb_embedding = mm\3\n");
+            UNIT_ASSERT_VALUES_EQUAL(posting,
+                                     "__ydb_parent = 1, key = 1, embedding = \x30\x30\3, data = one\n"
+                                     "__ydb_parent = 1, key = 2, embedding = \x31\x31\3, data = two\n"
+                                     "__ydb_parent = 1, key = 3, embedding = \x32\x32\3, data = three\n"
+                                     "__ydb_parent = 2, key = 4, embedding = \x65\x65\3, data = four\n"
+                                     "__ydb_parent = 2, key = 5, embedding = \x75\x75\3, data = five\n");
+            recreate();
+        }
+        seed = 32;
+        for (auto similarity : {VectorIndexSettings::SIMILARITY_INNER_PRODUCT, VectorIndexSettings::SIMILARITY_COSINE}) {
+            auto [level, posting] = DoLocalKMeans(server, sender, 0, seed, k, NKikimrTxDataShard::TEvLocalKMeansRequest::UPLOAD_MAIN_TO_TMP, VectorIndexSettings::VECTOR_TYPE_UINT8, similarity);
+            UNIT_ASSERT_VALUES_EQUAL(level,
+                                     "__ydb_parent = 0, __ydb_id = 1, __ydb_embedding = II\3\n");
+            UNIT_ASSERT_VALUES_EQUAL(posting,
+                                     "__ydb_parent = 1, key = 1, embedding = \x30\x30\3, data = one\n"
+                                     "__ydb_parent = 1, key = 2, embedding = \x31\x31\3, data = two\n"
+                                     "__ydb_parent = 1, key = 3, embedding = \x32\x32\3, data = three\n"
+                                     "__ydb_parent = 1, key = 4, embedding = \x65\x65\3, data = four\n"
+                                     "__ydb_parent = 1, key = 5, embedding = \x75\x75\3, data = five\n");
+            recreate();
+        }
+        seed = 13;
+        {
+            auto [level, posting] = DoLocalKMeans(server, sender, 0, seed, k, NKikimrTxDataShard::TEvLocalKMeansRequest::UPLOAD_MAIN_TO_TMP, VectorIndexSettings::VECTOR_TYPE_UINT8, VectorIndexSettings::DISTANCE_COSINE);
+            UNIT_ASSERT_VALUES_EQUAL(level,
+                                     "__ydb_parent = 0, __ydb_id = 1, __ydb_embedding = II\3\n");
+            UNIT_ASSERT_VALUES_EQUAL(posting,
+                                     "__ydb_parent = 1, key = 1, embedding = \x30\x30\3, data = one\n"
+                                     "__ydb_parent = 1, key = 2, embedding = \x31\x31\3, data = two\n"
+                                     "__ydb_parent = 1, key = 3, embedding = \x32\x32\3, data = three\n"
+                                     "__ydb_parent = 1, key = 4, embedding = \x65\x65\3, data = four\n"
+                                     "__ydb_parent = 1, key = 5, embedding = \x75\x75\3, data = five\n");
+            recreate();
+        }
+    }
+
+    Y_UNIT_TEST (TmpToPosting) {
+        TPortManager pm;
+        TServerSettings serverSettings(pm.GetPort(2134));
+        serverSettings.SetDomainName("Root");
+
+        Tests::TServer::TPtr server = new TServer(serverSettings);
+        auto& runtime = *server->GetRuntime();
+        auto sender = runtime.AllocateEdgeActor();
+
+        runtime.SetLogPriority(NKikimrServices::TX_DATASHARD, NLog::PRI_DEBUG);
+
+        InitRoot(server, sender);
+
+        TShardedTableOptions options;
+        options.EnableOutOfOrder(true); // TODO(mbkkt) what is it?
+        options.Shards(1);
+
+        CreateTmpTable(server, sender, options, "table-main");
+        // Upsert some initial values
+        ExecSQL(server, sender, R"(
+        UPSERT INTO `/Root/table-main` 
+            (__ydb_parent, key, embedding, data) 
+        VALUES )"
+                                "(39, 1, \"\x30\x30\3\", \"one\"),"
+                                "(40, 1, \"\x30\x30\3\", \"one\"),"
+                                "(40, 2, \"\x31\x31\3\", \"two\"),"
+                                "(40, 3, \"\x32\x32\3\", \"three\"),"
+                                "(40, 4, \"\x65\x65\3\", \"four\"),"
+                                "(40, 5, \"\x75\x75\3\", \"five\"),"
+                                "(41, 5, \"\x75\x75\3\", \"five\");");
+
+        auto create = [&] {
+            CreateLevelTable(server, sender, options);
+            CreatePostingTable(server, sender, options);
+        };
+        create();
+        auto recreate = [&] {
+            DropTable(server, sender, "table-level");
+            DropTable(server, sender, "table-posting");
+            create();
+        };
+
+        ui64 seed, k;
+        k = 2;
+
+        seed = 0;
+        for (auto distance : {VectorIndexSettings::DISTANCE_MANHATTAN, VectorIndexSettings::DISTANCE_EUCLIDEAN}) {
+            auto [level, posting] = DoLocalKMeans(server, sender, 40, seed, k, NKikimrTxDataShard::TEvLocalKMeansRequest::UPLOAD_TMP_TO_POSTING, VectorIndexSettings::VECTOR_TYPE_UINT8, distance);
+            UNIT_ASSERT_VALUES_EQUAL(level,
+                                     "__ydb_parent = 40, __ydb_id = 41, __ydb_embedding = mm\3\n"
+                                     "__ydb_parent = 40, __ydb_id = 42, __ydb_embedding = 11\3\n");
+            UNIT_ASSERT_VALUES_EQUAL(posting,
+                                     "__ydb_parent = 41, key = 4, data = four\n"
+                                     "__ydb_parent = 41, key = 5, data = five\n"
+                                     "__ydb_parent = 42, key = 1, data = one\n"
+                                     "__ydb_parent = 42, key = 2, data = two\n"
+                                     "__ydb_parent = 42, key = 3, data = three\n");
+            recreate();
+        }
+
+        seed = 111;
+        for (auto distance : {VectorIndexSettings::DISTANCE_MANHATTAN, VectorIndexSettings::DISTANCE_EUCLIDEAN}) {
+            auto [level, posting] = DoLocalKMeans(server, sender, 40, seed, k, NKikimrTxDataShard::TEvLocalKMeansRequest::UPLOAD_TMP_TO_POSTING, VectorIndexSettings::VECTOR_TYPE_UINT8, distance);
+            UNIT_ASSERT_VALUES_EQUAL(level,
+                                     "__ydb_parent = 40, __ydb_id = 41, __ydb_embedding = 11\3\n"
+                                     "__ydb_parent = 40, __ydb_id = 42, __ydb_embedding = mm\3\n");
+            UNIT_ASSERT_VALUES_EQUAL(posting,
+                                     "__ydb_parent = 41, key = 1, data = one\n"
+                                     "__ydb_parent = 41, key = 2, data = two\n"
+                                     "__ydb_parent = 41, key = 3, data = three\n"
+                                     "__ydb_parent = 42, key = 4, data = four\n"
+                                     "__ydb_parent = 42, key = 5, data = five\n");
+            recreate();
+        }
+        seed = 32;
+        for (auto similarity : {VectorIndexSettings::SIMILARITY_INNER_PRODUCT, VectorIndexSettings::SIMILARITY_COSINE}) {
+            auto [level, posting] = DoLocalKMeans(server, sender, 40, seed, k, NKikimrTxDataShard::TEvLocalKMeansRequest::UPLOAD_TMP_TO_POSTING, VectorIndexSettings::VECTOR_TYPE_UINT8, similarity);
+            UNIT_ASSERT_VALUES_EQUAL(level,
+                                     "__ydb_parent = 40, __ydb_id = 41, __ydb_embedding = II\3\n");
+            UNIT_ASSERT_VALUES_EQUAL(posting,
+                                     "__ydb_parent = 41, key = 1, data = one\n"
+                                     "__ydb_parent = 41, key = 2, data = two\n"
+                                     "__ydb_parent = 41, key = 3, data = three\n"
+                                     "__ydb_parent = 41, key = 4, data = four\n"
+                                     "__ydb_parent = 41, key = 5, data = five\n");
+            recreate();
+        }
+        seed = 13;
+        {
+            auto [level, posting] = DoLocalKMeans(server, sender, 40, seed, k, NKikimrTxDataShard::TEvLocalKMeansRequest::UPLOAD_TMP_TO_POSTING, VectorIndexSettings::VECTOR_TYPE_UINT8, VectorIndexSettings::DISTANCE_COSINE);
+            UNIT_ASSERT_VALUES_EQUAL(level,
+                                     "__ydb_parent = 40, __ydb_id = 41, __ydb_embedding = II\3\n");
+            UNIT_ASSERT_VALUES_EQUAL(posting,
+                                     "__ydb_parent = 41, key = 1, data = one\n"
+                                     "__ydb_parent = 41, key = 2, data = two\n"
+                                     "__ydb_parent = 41, key = 3, data = three\n"
+                                     "__ydb_parent = 41, key = 4, data = four\n"
+                                     "__ydb_parent = 41, key = 5, data = five\n");
+            recreate();
+        }
+    }
+
+    Y_UNIT_TEST (TmpToTmp) {
+        TPortManager pm;
+        TServerSettings serverSettings(pm.GetPort(2134));
+        serverSettings.SetDomainName("Root");
+
+        Tests::TServer::TPtr server = new TServer(serverSettings);
+        auto& runtime = *server->GetRuntime();
+        auto sender = runtime.AllocateEdgeActor();
+
+        runtime.SetLogPriority(NKikimrServices::TX_DATASHARD, NLog::PRI_DEBUG);
+
+        InitRoot(server, sender);
+
+        TShardedTableOptions options;
+        options.EnableOutOfOrder(true); // TODO(mbkkt) what is it?
+        options.Shards(1);
+
+        CreateTmpTable(server, sender, options, "table-main");
+        // Upsert some initial values
+        ExecSQL(server, sender, R"(
+        UPSERT INTO `/Root/table-main` 
+            (__ydb_parent, key, embedding, data) 
+        VALUES )"
+                                "(39, 1, \"\x30\x30\3\", \"one\"),"
+                                "(40, 1, \"\x30\x30\3\", \"one\"),"
+                                "(40, 2, \"\x31\x31\3\", \"two\"),"
+                                "(40, 3, \"\x32\x32\3\", \"three\"),"
+                                "(40, 4, \"\x65\x65\3\", \"four\"),"
+                                "(40, 5, \"\x75\x75\3\", \"five\"),"
+                                "(41, 5, \"\x75\x75\3\", \"five\");");
+
+        auto create = [&] {
+            CreateLevelTable(server, sender, options);
+            CreateTmpTable(server, sender, options, "table-posting");
+        };
+        create();
+        auto recreate = [&] {
+            DropTable(server, sender, "table-level");
+            DropTable(server, sender, "table-posting");
+            create();
+        };
+
+        ui64 seed, k;
+        k = 2;
+
+        seed = 0;
+        for (auto distance : {VectorIndexSettings::DISTANCE_MANHATTAN, VectorIndexSettings::DISTANCE_EUCLIDEAN}) {
+            auto [level, posting] = DoLocalKMeans(server, sender, 40, seed, k, NKikimrTxDataShard::TEvLocalKMeansRequest::UPLOAD_TMP_TO_TMP, VectorIndexSettings::VECTOR_TYPE_UINT8, distance);
+            UNIT_ASSERT_VALUES_EQUAL(level,
+                                     "__ydb_parent = 40, __ydb_id = 41, __ydb_embedding = mm\3\n"
+                                     "__ydb_parent = 40, __ydb_id = 42, __ydb_embedding = 11\3\n");
+            UNIT_ASSERT_VALUES_EQUAL(posting,
+                                     "__ydb_parent = 41, key = 4, embedding = \x65\x65\3, data = four\n"
+                                     "__ydb_parent = 41, key = 5, embedding = \x75\x75\3, data = five\n"
+                                     "__ydb_parent = 42, key = 1, embedding = \x30\x30\3, data = one\n"
+                                     "__ydb_parent = 42, key = 2, embedding = \x31\x31\3, data = two\n"
+                                     "__ydb_parent = 42, key = 3, embedding = \x32\x32\3, data = three\n");
+            recreate();
+        }
+
+        seed = 111;
+        for (auto distance : {VectorIndexSettings::DISTANCE_MANHATTAN, VectorIndexSettings::DISTANCE_EUCLIDEAN}) {
+            auto [level, posting] = DoLocalKMeans(server, sender, 40, seed, k, NKikimrTxDataShard::TEvLocalKMeansRequest::UPLOAD_TMP_TO_TMP, VectorIndexSettings::VECTOR_TYPE_UINT8, distance);
+            UNIT_ASSERT_VALUES_EQUAL(level,
+                                     "__ydb_parent = 40, __ydb_id = 41, __ydb_embedding = 11\3\n"
+                                     "__ydb_parent = 40, __ydb_id = 42, __ydb_embedding = mm\3\n");
+            UNIT_ASSERT_VALUES_EQUAL(posting,
+                                     "__ydb_parent = 41, key = 1, embedding = \x30\x30\3, data = one\n"
+                                     "__ydb_parent = 41, key = 2, embedding = \x31\x31\3, data = two\n"
+                                     "__ydb_parent = 41, key = 3, embedding = \x32\x32\3, data = three\n"
+                                     "__ydb_parent = 42, key = 4, embedding = \x65\x65\3, data = four\n"
+                                     "__ydb_parent = 42, key = 5, embedding = \x75\x75\3, data = five\n");
+            recreate();
+        }
+        seed = 32;
+        for (auto similarity : {VectorIndexSettings::SIMILARITY_INNER_PRODUCT, VectorIndexSettings::SIMILARITY_COSINE}) {
+            auto [level, posting] = DoLocalKMeans(server, sender, 40, seed, k, NKikimrTxDataShard::TEvLocalKMeansRequest::UPLOAD_TMP_TO_TMP, VectorIndexSettings::VECTOR_TYPE_UINT8, similarity);
+            UNIT_ASSERT_VALUES_EQUAL(level,
+                                     "__ydb_parent = 40, __ydb_id = 41, __ydb_embedding = II\3\n");
+            UNIT_ASSERT_VALUES_EQUAL(posting,
+                                     "__ydb_parent = 41, key = 1, embedding = \x30\x30\3, data = one\n"
+                                     "__ydb_parent = 41, key = 2, embedding = \x31\x31\3, data = two\n"
+                                     "__ydb_parent = 41, key = 3, embedding = \x32\x32\3, data = three\n"
+                                     "__ydb_parent = 41, key = 4, embedding = \x65\x65\3, data = four\n"
+                                     "__ydb_parent = 41, key = 5, embedding = \x75\x75\3, data = five\n");
+            recreate();
+        }
+        seed = 13;
+        {
+            auto [level, posting] = DoLocalKMeans(server, sender, 40, seed, k, NKikimrTxDataShard::TEvLocalKMeansRequest::UPLOAD_TMP_TO_TMP, VectorIndexSettings::VECTOR_TYPE_UINT8, VectorIndexSettings::DISTANCE_COSINE);
+            UNIT_ASSERT_VALUES_EQUAL(level,
+                                     "__ydb_parent = 40, __ydb_id = 41, __ydb_embedding = II\3\n");
+            UNIT_ASSERT_VALUES_EQUAL(posting,
+                                     "__ydb_parent = 41, key = 1, embedding = \x30\x30\3, data = one\n"
+                                     "__ydb_parent = 41, key = 2, embedding = \x31\x31\3, data = two\n"
+                                     "__ydb_parent = 41, key = 3, embedding = \x32\x32\3, data = three\n"
+                                     "__ydb_parent = 41, key = 4, embedding = \x65\x65\3, data = four\n"
+                                     "__ydb_parent = 41, key = 5, embedding = \x75\x75\3, data = five\n");
+            recreate();
+        }
+    }
+}
+
+}

+ 3 - 8
ydb/core/tx/datashard/datashard_ut_sample_k.cpp

@@ -12,14 +12,9 @@
 
 #include <library/cpp/testing/unittest/registar.h>
 
-template <>
-inline void Out<NKikimrIndexBuilder::EBuildStatus>(IOutputStream& o, NKikimrIndexBuilder::EBuildStatus status) {
-    o << NKikimrIndexBuilder::EBuildStatus_Name(status);
-}
-
 namespace NKikimr {
 
-static ui64 sId = 1;
+static std::atomic<ui64> sId = 1;
 
 using namespace NKikimr::NDataShard::NKqpHelpers;
 using namespace NSchemeShard;
@@ -28,7 +23,7 @@ using namespace Tests;
 Y_UNIT_TEST_SUITE (TTxDataShardSampleKScan) {
     static void DoSampleKBad(Tests::TServer::TPtr server, TActorId sender,
                              const TString& tableFrom, const TRowVersion& snapshot, std::unique_ptr<TEvDataShard::TEvSampleKRequest>& ev) {
-        auto id = sId++;
+        auto id = sId.fetch_add(1, std::memory_order_relaxed);
         auto& runtime = *server->GetRuntime();
         auto datashards = GetTableShards(server, sender, tableFrom);
         TTableId tableId = ResolveTableId(server, sender, tableFrom);
@@ -78,7 +73,7 @@ Y_UNIT_TEST_SUITE (TTxDataShardSampleKScan) {
 
     static TString DoSampleK(Tests::TServer::TPtr server, TActorId sender,
                              const TString& tableFrom, const TRowVersion& snapshot, ui64 seed, ui64 k) {
-        auto id = sId++;
+        auto id = sId.fetch_add(1, std::memory_order_relaxed);
         auto& runtime = *server->GetRuntime();
         auto datashards = GetTableShards(server, sender, tableFrom);
         TTableId tableId = ResolveTableId(server, sender, tableFrom);

Some files were not shown because too many files changed in this diff