Browse Source

KIKIMR-18343: Split IDataContainer iface

nsofya 1 year ago
parent
commit
ef61acdadf

+ 1 - 1
ydb/core/tx/columnshard/columnshard__write.cpp

@@ -48,7 +48,7 @@ bool TTxWrite::Execute(TTransactionContext& txc, const TActorContext&) {
     NIceDb::TNiceDb db(txc.DB);
 
     auto writeId = writeMeta.GetWriteId();
-    const TString& data = blobData.GetBlobData();
+    const TString data = blobData.GetBlobData();
 
     NKikimrTxColumnShard::TLogicalMetadata meta;
     Y_VERIFY(meta.ParseFromString(blobData.GetLogicalMeta()));

+ 1 - 1
ydb/core/tx/columnshard/columnshard_ut_common.h

@@ -469,7 +469,7 @@ namespace NKikimr::NColumnShard {
                 Y_VERIFY(Index < Owner.Builders.size());
                 auto& builder = Owner.Builders[Index];
                 auto type = builder->type();
-                
+
                 NArrow::SwitchType(type->id(), [&](const auto& t) {
                     using TWrap = std::decay_t<decltype(t)>;
                     using T = typename TWrap::T;

+ 8 - 8
ydb/core/tx/columnshard/engines/scheme/abstract_scheme.cpp

@@ -42,15 +42,15 @@ std::shared_ptr<arrow::RecordBatch> ISnapshotSchema::NormalizeBatch(const ISnaps
     return arrow::RecordBatch::Make(resultArrowSchema, batch->num_rows(), newColumns);
 }
 
-std::shared_ptr<arrow::RecordBatch> ISnapshotSchema::PrepareForInsert(const TString& data, const std::shared_ptr<arrow::Schema>& dataSchema, TString& strError) const {
+std::shared_ptr<arrow::RecordBatch> ISnapshotSchema::PrepareForInsert(const TString& data, const std::shared_ptr<arrow::Schema>& dataSchema) const {
     std::shared_ptr<arrow::Schema> dstSchema = GetIndexInfo().ArrowSchema();
     auto batch = NArrow::DeserializeBatch(data, (dataSchema ? dataSchema : dstSchema));
     if (!batch) {
-        strError = "DeserializeBatch() failed";
+        AFL_WARN(NKikimrServices::TX_COLUMNSHARD)("error", "DeserializeBatch() failed");
         return nullptr;
     }
     if (batch->num_rows() == 0) {
-        strError = "empty batch";
+        AFL_WARN(NKikimrServices::TX_COLUMNSHARD)("error", "empty batch");
         return nullptr;
     }
 
@@ -58,13 +58,13 @@ std::shared_ptr<arrow::RecordBatch> ISnapshotSchema::PrepareForInsert(const TStr
     if (dataSchema) {
         batch = NArrow::ExtractColumns(batch, dstSchema, true);
         if (!batch) {
-            strError = "cannot correct schema";
+            AFL_WARN(NKikimrServices::TX_COLUMNSHARD)("error", "cannot correct schema");
             return nullptr;
         }
     }
 
     if (!batch->schema()->Equals(dstSchema)) {
-        strError = "unexpected schema for insert batch: '" + batch->schema()->ToString() + "'";
+        AFL_WARN(NKikimrServices::TX_COLUMNSHARD)("error", TStringBuilder() << "unexpected schema for insert batch: '" << batch->schema()->ToString() << "'");
         return nullptr;
     }
 
@@ -75,18 +75,18 @@ std::shared_ptr<arrow::RecordBatch> ISnapshotSchema::PrepareForInsert(const TStr
     for (auto& field : sortingKey->fields()) {
         auto column = batch->GetColumnByName(field->name());
         if (!column) {
-            strError = "missing PK column '" + field->name() + "'";
+            AFL_WARN(NKikimrServices::TX_COLUMNSHARD)("error", TStringBuilder() << "missing PK column '" << field->name() << "'");
             return nullptr;
         }
         if (NArrow::HasNulls(column)) {
-            strError = "PK column '" + field->name() + "' contains NULLs";
+            AFL_WARN(NKikimrServices::TX_COLUMNSHARD)("error", TStringBuilder() << "PK column '" << field->name() << "' contains NULLs");
             return nullptr;
         }
     }
 
     auto status = batch->ValidateFull();
     if (!status.ok()) {
-        strError = status.ToString();
+        AFL_WARN(NKikimrServices::TX_COLUMNSHARD)("error", status.ToString());
         return nullptr;
     }
     batch = NArrow::SortBatch(batch, sortingKey);

+ 2 - 1
ydb/core/tx/columnshard/engines/scheme/abstract_scheme.h

@@ -36,13 +36,14 @@ public:
     virtual int GetFieldIndex(const ui32 columnId) const = 0;
     std::shared_ptr<arrow::Field> GetFieldByIndex(const int index) const;
     std::shared_ptr<arrow::Field> GetFieldByColumnId(const ui32 columnId) const;
+
     virtual const std::shared_ptr<arrow::Schema>& GetSchema() const = 0;
     virtual const TIndexInfo& GetIndexInfo() const = 0;
     virtual const TSnapshot& GetSnapshot() const = 0;
     virtual ui32 GetColumnsCount() const = 0;
 
     std::shared_ptr<arrow::RecordBatch> NormalizeBatch(const ISnapshotSchema& dataSchema, const std::shared_ptr<arrow::RecordBatch> batch) const;
-    std::shared_ptr<arrow::RecordBatch> PrepareForInsert(const TString& data, const std::shared_ptr<arrow::Schema>& dataSchema, TString& strError) const;
+    std::shared_ptr<arrow::RecordBatch> PrepareForInsert(const TString& data, const std::shared_ptr<arrow::Schema>& dataSchema) const;
 };
 
 } // namespace NKikimr::NOlap

+ 2 - 14
ydb/core/tx/columnshard/operations/write_data.cpp

@@ -5,11 +5,6 @@
 
 namespace NKikimr::NColumnShard {
 
-void TArrowData::Serialize(NKikimrDataEvents::TOperationData& proto) const {
-    Y_FAIL("Not implemented");
-    Y_UNUSED(proto);
-}
-
 bool TArrowData::Parse(const NKikimrDataEvents::TOperationData& proto, const IPayloadData& payload) {
     IncomingData = payload.GetDataFromPayload(proto.GetArrowData().GetPayloadIndex());
 
@@ -22,13 +17,7 @@ bool TArrowData::Parse(const NKikimrDataEvents::TOperationData& proto, const IPa
 }
 
 std::shared_ptr<arrow::RecordBatch> TArrowData::GetArrowBatch() const {
-    TString err;
-    return IndexSchema->PrepareForInsert(IncomingData, BatchSchema->GetSchema(), err);
-}
-
-void TProtoArrowData::Serialize(NKikimrDataEvents::TOperationData& proto) const {
-    Y_FAIL("Not implemented");
-    Y_UNUSED(proto);
+    return IndexSchema->PrepareForInsert(IncomingData, BatchSchema->GetSchema());
 }
 
 bool TProtoArrowData::ParseFromProto(const NKikimrTxColumnShard::TEvWrite& proto) {
@@ -48,8 +37,7 @@ bool TProtoArrowData::ParseFromProto(const NKikimrTxColumnShard::TEvWrite& proto
 }
 
 std::shared_ptr<arrow::RecordBatch> TProtoArrowData::GetArrowBatch() const {
-    TString err;
-    return IndexSchema->PrepareForInsert(IncomingData, ArrowSchema, err);
+    return IndexSchema->PrepareForInsert(IncomingData, ArrowSchema);
 }
 
 }

+ 1 - 2
ydb/core/tx/columnshard/operations/write_data.h

@@ -4,6 +4,7 @@
 #include <ydb/core/tx/columnshard/common/snapshot.h>
 #include <ydb/core/tx/columnshard/engines/scheme/abstract_scheme.h>
 #include <ydb/core/tx/columnshard/engines/scheme/filtered_scheme.h>
+#include <ydb/core/protos/ev_write.pb.h>
 
 
 namespace NKikimr::NColumnShard {
@@ -49,7 +50,6 @@ public:
 
     bool Parse(const NKikimrDataEvents::TOperationData& proto, const IPayloadData& payload);
     std::shared_ptr<arrow::RecordBatch> GetArrowBatch() const override;
-    void Serialize(NKikimrDataEvents::TOperationData& proto) const override;
 
 private:
     NOlap::ISnapshotSchema::TPtr IndexSchema;
@@ -69,7 +69,6 @@ public:
 
     bool ParseFromProto(const NKikimrTxColumnShard::TEvWrite& proto);
     std::shared_ptr<arrow::RecordBatch> GetArrowBatch() const override;
-    void Serialize(NKikimrDataEvents::TOperationData& proto) const override;
 
 private:
     NOlap::ISnapshotSchema::TPtr IndexSchema;

+ 1 - 9
ydb/core/tx/columnshard/ut_rw/ut_columnshard_read_write.cpp

@@ -1840,7 +1840,7 @@ void TestReadAggregate(const std::vector<std::pair<TString, TTypeInfo>>& ydbSche
 }
 
 Y_UNIT_TEST_SUITE(EvWrite) {
-    class TArrowData : public NKikimr::NEvWrite::IDataContainer {
+    class TArrowData : public NKikimr::NEvents::IDataConstructor {
         std::vector<std::pair<TString, TTypeInfo>> YdbSchema;
         ui64 Index;
 
@@ -1857,14 +1857,6 @@ Y_UNIT_TEST_SUITE(EvWrite) {
             }
             proto.MutableArrowData()->SetPayloadIndex(Index);
         }
-
-        std::shared_ptr<arrow::RecordBatch> GetArrowBatch() const override {
-            return nullptr;
-        }
-
-        const TString& GetData() const override {
-            return Default<TString>();
-        }
     };
 
     Y_UNIT_TEST(WriteInTransaction) {

+ 9 - 1
ydb/core/tx/ev_write/events.h

@@ -11,6 +11,14 @@
 
 namespace NKikimr::NEvents {
 
+
+class IDataConstructor {
+public:
+    using TPtr = std::shared_ptr<IDataConstructor>;
+    virtual ~IDataConstructor() {}
+    virtual void Serialize(NKikimrDataEvents::TOperationData& proto) const = 0;
+};
+
 struct TDataEvents {
 
     class TCoordinatorInfo {
@@ -42,7 +50,7 @@ struct TDataEvents {
             Record.SetTxId(txId);
         }
 
-        void AddReplaceOp(const ui64 tableId, const NEvWrite::IDataContainer::TPtr& data) {
+        void AddReplaceOp(const ui64 tableId, const IDataConstructor::TPtr& data) {
             Record.MutableTableId()->SetTableId(tableId);
             Y_VERIFY(data);
             data->Serialize(*Record.MutableReplace());

+ 0 - 3
ydb/core/tx/ev_write/write_data.h

@@ -3,8 +3,6 @@
 #include <ydb/core/tx/long_tx_service/public/types.h>
 #include <ydb/core/formats/arrow/arrow_helpers.h>
 
-#include <ydb/core/protos/tx_columnshard.pb.h>
-#include <ydb/core/protos/ev_write.pb.h>
 
 
 namespace NKikimr::NEvWrite {
@@ -13,7 +11,6 @@ class IDataContainer {
 public:
     using TPtr = std::shared_ptr<IDataContainer>;
     virtual ~IDataContainer() {}
-    virtual void Serialize(NKikimrDataEvents::TOperationData& proto) const = 0;
     virtual std::shared_ptr<arrow::RecordBatch> GetArrowBatch() const = 0;
     virtual const TString& GetData() const = 0;
 };