Browse Source

add x/image/draw to arcadia

fedorov-o 1 year ago
parent
commit
5ccb6ae864
1 changed files with 54 additions and 11 deletions
  1. 54 11
      yt/yt/library/formats/arrow_writer.cpp

+ 54 - 11
yt/yt/library/formats/arrow_writer.cpp

@@ -158,6 +158,41 @@ std::tuple<org::apache::arrow::flatbuf::Type, flatbuffers::Offset<void>> Seriali
     }
 }
 
+int ExtractTableIndexFromColumn(const TBatchColumn* column)
+{
+    YT_VERIFY(column->Values);
+
+    // Expecting rle but not dictionary column.
+    YT_VERIFY(column->Rle);
+    YT_VERIFY(!column->Rle->ValueColumn->Dictionary);
+
+    const auto* valueColumn = column->Rle->ValueColumn;
+    auto values = valueColumn->GetTypedValues<ui64>();
+
+    // Expecting only one element.
+    YT_VERIFY(values.size() == 1);
+
+    auto rleIndexes = column->GetTypedValues<ui64>();
+
+    auto startIndex = column->StartIndex;
+
+    int tableIndex = 0;
+    DecodeIntegerVector(
+        startIndex,
+        startIndex + 1,
+        valueColumn->Values->BaseValue,
+        valueColumn->Values->ZigZagEncoded,
+        TRange<ui32>(),
+        rleIndexes,
+        [&] (auto index) {
+            return values[index];
+        },
+        [&] (auto value) {
+            tableIndex = value;
+        });
+    return tableIndex;
+}
+
 int GetIntegralLikeTypeByteSize(ESimpleLogicalValueType type)
 {
     switch (type) {
@@ -971,10 +1006,10 @@ private:
         Reset();
 
         ssize_t sameTableRangeBeginRowIndex = 0;
-        i32 tableIndex = 0;
+        int tableIndex = 0;
 
         for (ssize_t rowIndex = 0; rowIndex < std::ssize(rows); rowIndex++) {
-            i32 currentTableIndex = -1;
+            int currentTableIndex = -1;
             if (TableCount_ > 1) {
                 const auto& elems = rows[rowIndex].Elements();
                 for (ssize_t columnIndex = std::ssize(elems) - 1; columnIndex >= 0; --columnIndex) {
@@ -1002,25 +1037,33 @@ private:
 
     void DoWriteBatch(NTableClient::IUnversionedRowBatchPtr rowBatch) override
     {
-        if (TableCount_ > 1) {
-            DoWrite(rowBatch->MaterializeRows());
-            return;
-        }
-
         auto columnarBatch = rowBatch->TryAsColumnar();
         if (!columnarBatch) {
             DoWrite(rowBatch->MaterializeRows());
             return;
         }
+        int tableIndex = 0;
+        auto batchColumns = columnarBatch->MaterializeColumns();
+
+        if (TableCount_ > 1) {
+            tableIndex = -1;
+            for (const auto* column : batchColumns) {
+                if (column->Id == GetTableIndexColumnId()) {
+                    tableIndex = ExtractTableIndexFromColumn(column);
+                    break;
+                }
+            }
+            YT_VERIFY(tableIndex < TableCount_ && tableIndex >= 0);
+        }
 
         Reset();
         RowCount_ = rowBatch->GetRowCount();
-        PrepareColumns(columnarBatch->MaterializeColumns(), 0);
-        Encode(0);
+        PrepareColumns(batchColumns, tableIndex);
+        Encode(tableIndex);
         ++EncodedColumnarBatchCount_;
     }
 
-    void Encode(i32 tableIndex)
+    void Encode(int tableIndex)
     {
         auto output = GetOutputStream();
         if (tableIndex != PrevTableIndex_ || IsSchemaMessageNeeded()) {
@@ -1179,7 +1222,7 @@ private:
                 std::move(bodyWriter)});
     }
 
-    void PrepareSchema(i32 tableIndex)
+    void PrepareSchema(int tableIndex)
     {
         flatbuffers::FlatBufferBuilder flatbufBuilder;