Просмотр исходного кода

YQ-1037 + xz/gz

ref:932e07b9710d3a91403f53872f61c21d41b5b4c2
a-romanov 2 лет назад
Родитель
Сommit
f4e9c332e8

+ 1 - 0
CMakeLists.linux.txt

@@ -907,6 +907,7 @@ add_subdirectory(contrib/libs/poco/Foundation)
 add_subdirectory(contrib/libs/poco/JSON)
 add_subdirectory(contrib/libs/poco/JSON)
 add_subdirectory(contrib/libs/poco/XML)
 add_subdirectory(contrib/libs/poco/XML)
 add_subdirectory(ydb/library/yql/providers/s3/compressors)
 add_subdirectory(ydb/library/yql/providers/s3/compressors)
+add_subdirectory(contrib/libs/lzma)
 add_subdirectory(ydb/library/yql/udfs/common/clickhouse/client)
 add_subdirectory(ydb/library/yql/udfs/common/clickhouse/client)
 add_subdirectory(ydb/library/yql/public/udf/support)
 add_subdirectory(ydb/library/yql/public/udf/support)
 add_subdirectory(contrib/restricted/boost/libs/program_options)
 add_subdirectory(contrib/restricted/boost/libs/program_options)

+ 106 - 0
contrib/libs/lzma/CMakeLists.linux.txt

@@ -0,0 +1,106 @@
+
+# This file was gererated by the build system used internally in the Yandex monorepo.
+# Only simple modifications are allowed (adding source-files to targets, adding simple properties
+# like target_include_directories). These modifications will be ported to original
+# ya.make files by maintainers. Any complex modifications which can't be ported back to the
+# original buildsystem will not be accepted.
+
+
+
+add_library(contrib-libs-lzma)
+target_compile_options(contrib-libs-lzma PUBLIC
+  -DLZMA_API_STATIC
+)
+target_compile_options(contrib-libs-lzma PRIVATE
+  -DHAVE_CONFIG_H
+  -DTUKLIB_SYMBOL_PREFIX=lzma_
+)
+target_include_directories(contrib-libs-lzma PUBLIC
+  ${CMAKE_SOURCE_DIR}/contrib/libs/lzma/liblzma/api
+)
+target_include_directories(contrib-libs-lzma PRIVATE
+  ${CMAKE_SOURCE_DIR}/contrib/libs/lzma/common
+  ${CMAKE_SOURCE_DIR}/contrib/libs/lzma/liblzma
+  ${CMAKE_SOURCE_DIR}/contrib/libs/lzma/liblzma/check
+  ${CMAKE_SOURCE_DIR}/contrib/libs/lzma/liblzma/common
+  ${CMAKE_SOURCE_DIR}/contrib/libs/lzma/liblzma/delta
+  ${CMAKE_SOURCE_DIR}/contrib/libs/lzma/liblzma/lz
+  ${CMAKE_SOURCE_DIR}/contrib/libs/lzma/liblzma/lzma
+  ${CMAKE_SOURCE_DIR}/contrib/libs/lzma/liblzma/rangecoder
+  ${CMAKE_SOURCE_DIR}/contrib/libs/lzma/liblzma/simple
+)
+target_sources(contrib-libs-lzma PRIVATE
+  ${CMAKE_SOURCE_DIR}/contrib/libs/lzma/common/tuklib_cpucores.c
+  ${CMAKE_SOURCE_DIR}/contrib/libs/lzma/common/tuklib_physmem.c
+  ${CMAKE_SOURCE_DIR}/contrib/libs/lzma/liblzma/check/check.c
+  ${CMAKE_SOURCE_DIR}/contrib/libs/lzma/liblzma/check/crc32_fast.c
+  ${CMAKE_SOURCE_DIR}/contrib/libs/lzma/liblzma/check/crc32_table.c
+  ${CMAKE_SOURCE_DIR}/contrib/libs/lzma/liblzma/check/crc64_fast.c
+  ${CMAKE_SOURCE_DIR}/contrib/libs/lzma/liblzma/check/crc64_table.c
+  ${CMAKE_SOURCE_DIR}/contrib/libs/lzma/liblzma/check/sha256.c
+  ${CMAKE_SOURCE_DIR}/contrib/libs/lzma/liblzma/common/alone_decoder.c
+  ${CMAKE_SOURCE_DIR}/contrib/libs/lzma/liblzma/common/alone_encoder.c
+  ${CMAKE_SOURCE_DIR}/contrib/libs/lzma/liblzma/common/auto_decoder.c
+  ${CMAKE_SOURCE_DIR}/contrib/libs/lzma/liblzma/common/block_buffer_decoder.c
+  ${CMAKE_SOURCE_DIR}/contrib/libs/lzma/liblzma/common/block_buffer_encoder.c
+  ${CMAKE_SOURCE_DIR}/contrib/libs/lzma/liblzma/common/block_decoder.c
+  ${CMAKE_SOURCE_DIR}/contrib/libs/lzma/liblzma/common/block_encoder.c
+  ${CMAKE_SOURCE_DIR}/contrib/libs/lzma/liblzma/common/block_header_decoder.c
+  ${CMAKE_SOURCE_DIR}/contrib/libs/lzma/liblzma/common/block_header_encoder.c
+  ${CMAKE_SOURCE_DIR}/contrib/libs/lzma/liblzma/common/block_util.c
+  ${CMAKE_SOURCE_DIR}/contrib/libs/lzma/liblzma/common/common.c
+  ${CMAKE_SOURCE_DIR}/contrib/libs/lzma/liblzma/common/easy_buffer_encoder.c
+  ${CMAKE_SOURCE_DIR}/contrib/libs/lzma/liblzma/common/easy_decoder_memusage.c
+  ${CMAKE_SOURCE_DIR}/contrib/libs/lzma/liblzma/common/easy_encoder.c
+  ${CMAKE_SOURCE_DIR}/contrib/libs/lzma/liblzma/common/easy_encoder_memusage.c
+  ${CMAKE_SOURCE_DIR}/contrib/libs/lzma/liblzma/common/easy_preset.c
+  ${CMAKE_SOURCE_DIR}/contrib/libs/lzma/liblzma/common/filter_buffer_decoder.c
+  ${CMAKE_SOURCE_DIR}/contrib/libs/lzma/liblzma/common/filter_buffer_encoder.c
+  ${CMAKE_SOURCE_DIR}/contrib/libs/lzma/liblzma/common/filter_common.c
+  ${CMAKE_SOURCE_DIR}/contrib/libs/lzma/liblzma/common/filter_decoder.c
+  ${CMAKE_SOURCE_DIR}/contrib/libs/lzma/liblzma/common/filter_encoder.c
+  ${CMAKE_SOURCE_DIR}/contrib/libs/lzma/liblzma/common/filter_flags_decoder.c
+  ${CMAKE_SOURCE_DIR}/contrib/libs/lzma/liblzma/common/filter_flags_encoder.c
+  ${CMAKE_SOURCE_DIR}/contrib/libs/lzma/liblzma/common/hardware_cputhreads.c
+  ${CMAKE_SOURCE_DIR}/contrib/libs/lzma/liblzma/common/hardware_physmem.c
+  ${CMAKE_SOURCE_DIR}/contrib/libs/lzma/liblzma/common/index.c
+  ${CMAKE_SOURCE_DIR}/contrib/libs/lzma/liblzma/common/index_decoder.c
+  ${CMAKE_SOURCE_DIR}/contrib/libs/lzma/liblzma/common/index_encoder.c
+  ${CMAKE_SOURCE_DIR}/contrib/libs/lzma/liblzma/common/index_hash.c
+  ${CMAKE_SOURCE_DIR}/contrib/libs/lzma/liblzma/common/outqueue.c
+  ${CMAKE_SOURCE_DIR}/contrib/libs/lzma/liblzma/common/stream_buffer_decoder.c
+  ${CMAKE_SOURCE_DIR}/contrib/libs/lzma/liblzma/common/stream_buffer_encoder.c
+  ${CMAKE_SOURCE_DIR}/contrib/libs/lzma/liblzma/common/stream_decoder.c
+  ${CMAKE_SOURCE_DIR}/contrib/libs/lzma/liblzma/common/stream_encoder.c
+  ${CMAKE_SOURCE_DIR}/contrib/libs/lzma/liblzma/common/stream_encoder_mt.c
+  ${CMAKE_SOURCE_DIR}/contrib/libs/lzma/liblzma/common/stream_flags_common.c
+  ${CMAKE_SOURCE_DIR}/contrib/libs/lzma/liblzma/common/stream_flags_decoder.c
+  ${CMAKE_SOURCE_DIR}/contrib/libs/lzma/liblzma/common/stream_flags_encoder.c
+  ${CMAKE_SOURCE_DIR}/contrib/libs/lzma/liblzma/common/vli_decoder.c
+  ${CMAKE_SOURCE_DIR}/contrib/libs/lzma/liblzma/common/vli_encoder.c
+  ${CMAKE_SOURCE_DIR}/contrib/libs/lzma/liblzma/common/vli_size.c
+  ${CMAKE_SOURCE_DIR}/contrib/libs/lzma/liblzma/delta/delta_common.c
+  ${CMAKE_SOURCE_DIR}/contrib/libs/lzma/liblzma/delta/delta_decoder.c
+  ${CMAKE_SOURCE_DIR}/contrib/libs/lzma/liblzma/delta/delta_encoder.c
+  ${CMAKE_SOURCE_DIR}/contrib/libs/lzma/liblzma/lz/lz_decoder.c
+  ${CMAKE_SOURCE_DIR}/contrib/libs/lzma/liblzma/lz/lz_encoder.c
+  ${CMAKE_SOURCE_DIR}/contrib/libs/lzma/liblzma/lz/lz_encoder_mf.c
+  ${CMAKE_SOURCE_DIR}/contrib/libs/lzma/liblzma/lzma/fastpos_table.c
+  ${CMAKE_SOURCE_DIR}/contrib/libs/lzma/liblzma/lzma/lzma2_decoder.c
+  ${CMAKE_SOURCE_DIR}/contrib/libs/lzma/liblzma/lzma/lzma2_encoder.c
+  ${CMAKE_SOURCE_DIR}/contrib/libs/lzma/liblzma/lzma/lzma_decoder.c
+  ${CMAKE_SOURCE_DIR}/contrib/libs/lzma/liblzma/lzma/lzma_encoder.c
+  ${CMAKE_SOURCE_DIR}/contrib/libs/lzma/liblzma/lzma/lzma_encoder_optimum_fast.c
+  ${CMAKE_SOURCE_DIR}/contrib/libs/lzma/liblzma/lzma/lzma_encoder_optimum_normal.c
+  ${CMAKE_SOURCE_DIR}/contrib/libs/lzma/liblzma/lzma/lzma_encoder_presets.c
+  ${CMAKE_SOURCE_DIR}/contrib/libs/lzma/liblzma/rangecoder/price_table.c
+  ${CMAKE_SOURCE_DIR}/contrib/libs/lzma/liblzma/simple/arm.c
+  ${CMAKE_SOURCE_DIR}/contrib/libs/lzma/liblzma/simple/armthumb.c
+  ${CMAKE_SOURCE_DIR}/contrib/libs/lzma/liblzma/simple/ia64.c
+  ${CMAKE_SOURCE_DIR}/contrib/libs/lzma/liblzma/simple/powerpc.c
+  ${CMAKE_SOURCE_DIR}/contrib/libs/lzma/liblzma/simple/simple_coder.c
+  ${CMAKE_SOURCE_DIR}/contrib/libs/lzma/liblzma/simple/simple_decoder.c
+  ${CMAKE_SOURCE_DIR}/contrib/libs/lzma/liblzma/simple/simple_encoder.c
+  ${CMAKE_SOURCE_DIR}/contrib/libs/lzma/liblzma/simple/sparc.c
+  ${CMAKE_SOURCE_DIR}/contrib/libs/lzma/liblzma/simple/x86.c
+)

+ 11 - 0
contrib/libs/lzma/CMakeLists.txt

@@ -0,0 +1,11 @@
+
+# This file was gererated by the build system used internally in the Yandex monorepo.
+# Only simple modifications are allowed (adding source-files to targets, adding simple properties
+# like target_include_directories). These modifications will be ported to original
+# ya.make files by maintainers. Any complex modifications which can't be ported back to the
+# original buildsystem will not be accepted.
+
+
+if (UNIX)
+  include(CMakeLists.linux.txt)
+endif()

+ 3 - 0
ydb/library/yql/providers/s3/compressors/CMakeLists.linux.txt

@@ -24,11 +24,14 @@ target_link_libraries(providers-s3-compressors PUBLIC
   libs-brotli-dec
   libs-brotli-dec
   contrib-libs-libbz2
   contrib-libs-libbz2
   contrib-libs-lz4
   contrib-libs-lz4
+  contrib-libs-lzma
 )
 )
 target_sources(providers-s3-compressors PRIVATE
 target_sources(providers-s3-compressors PRIVATE
   ${CMAKE_SOURCE_DIR}/ydb/library/yql/providers/s3/compressors/brotli.cpp
   ${CMAKE_SOURCE_DIR}/ydb/library/yql/providers/s3/compressors/brotli.cpp
   ${CMAKE_SOURCE_DIR}/ydb/library/yql/providers/s3/compressors/bzip2.cpp
   ${CMAKE_SOURCE_DIR}/ydb/library/yql/providers/s3/compressors/bzip2.cpp
+  ${CMAKE_SOURCE_DIR}/ydb/library/yql/providers/s3/compressors/gz.cpp
   ${CMAKE_SOURCE_DIR}/ydb/library/yql/providers/s3/compressors/factory.cpp
   ${CMAKE_SOURCE_DIR}/ydb/library/yql/providers/s3/compressors/factory.cpp
   ${CMAKE_SOURCE_DIR}/ydb/library/yql/providers/s3/compressors/lz4io.cpp
   ${CMAKE_SOURCE_DIR}/ydb/library/yql/providers/s3/compressors/lz4io.cpp
   ${CMAKE_SOURCE_DIR}/ydb/library/yql/providers/s3/compressors/zstd.cpp
   ${CMAKE_SOURCE_DIR}/ydb/library/yql/providers/s3/compressors/zstd.cpp
+  ${CMAKE_SOURCE_DIR}/ydb/library/yql/providers/s3/compressors/xz.cpp
 )
 )

+ 6 - 5
ydb/library/yql/providers/s3/compressors/bzip2.cpp

@@ -34,15 +34,16 @@ bool TReadBuffer::nextImpl() {
 
 
     while (true) {
     while (true) {
         if (!BzStream_.avail_in) {
         if (!BzStream_.avail_in) {
-            BzStream_.next_in = InBuffer.data();
-            BzStream_.avail_in = Source_.read(InBuffer.data(), InBuffer.size());
-            if (!BzStream_.avail_in) {
+            if (const auto size = Source_.read(InBuffer.data(), InBuffer.size())) {
+                BzStream_.next_in = InBuffer.data();
+                BzStream_.avail_in = size;
+            } else {
                 set(nullptr, 0ULL);
                 set(nullptr, 0ULL);
                 return false;
                 return false;
             }
             }
         }
         }
 
 
-        switch (BZ2_bzDecompress(&BzStream_)) {
+        switch (const auto code = BZ2_bzDecompress(&BzStream_)) {
             case BZ_STREAM_END:
             case BZ_STREAM_END:
                 FreeDecoder();
                 FreeDecoder();
                 InitDecoder();
                 InitDecoder();
@@ -55,7 +56,7 @@ bool TReadBuffer::nextImpl() {
 
 
                 break;
                 break;
             default:
             default:
-                ythrow yexception() << "bzip error";
+                ythrow yexception() << "Bzip error: " << code;
         }
         }
     }
     }
 }
 }

+ 6 - 0
ydb/library/yql/providers/s3/compressors/factory.cpp

@@ -3,6 +3,8 @@
 #include "brotli.h"
 #include "brotli.h"
 #include "zstd.h"
 #include "zstd.h"
 #include "bzip2.h"
 #include "bzip2.h"
+#include "xz.h"
+#include "gz.h"
 
 
 namespace NYql {
 namespace NYql {
 
 
@@ -15,6 +17,10 @@ std::unique_ptr<NDB::ReadBuffer> MakeDecompressor(NDB::ReadBuffer& input, const
         return std::make_unique<NZstd::TReadBuffer>(input);
         return std::make_unique<NZstd::TReadBuffer>(input);
     if ("bzip2" == compression)
     if ("bzip2" == compression)
         return std::make_unique<NBzip2::TReadBuffer>(input);
         return std::make_unique<NBzip2::TReadBuffer>(input);
+    if ("xz" == compression)
+        return std::make_unique<NXz::TReadBuffer>(input);
+    if ("gzip" == compression)
+        return std::make_unique<NGz::TReadBuffer>(input);
 
 
     return nullptr;
     return nullptr;
 }
 }

+ 66 - 0
ydb/library/yql/providers/s3/compressors/gz.cpp

@@ -0,0 +1,66 @@
+#include "gz.h"
+
+#include <util/generic/size_literals.h>
+#include <ydb/library/yql/utils/yql_panic.h>
+
+namespace NYql {
+
+namespace NGz {
+
+namespace {
+
+const char* GetErrMsg(const z_stream& z) noexcept {
+    return z.msg ? z.msg : "Unknown error.";
+}
+
+}
+
+TReadBuffer::TReadBuffer(NDB::ReadBuffer& source)
+    : NDB::ReadBuffer(nullptr, 0ULL), Source_(source)
+{
+    InBuffer.resize(8_KB);
+    OutBuffer.resize(64_KB);
+    Zero(Z_);
+    YQL_ENSURE(inflateInit2(&Z_, 31) == Z_OK, "Can not init inflate engine.");
+}
+
+TReadBuffer::~TReadBuffer() {
+    inflateEnd(&Z_);
+}
+
+bool TReadBuffer::nextImpl() {
+    Z_.next_out = reinterpret_cast<unsigned char*>(OutBuffer.data());
+    Z_.avail_out = OutBuffer.size();
+
+    while (true) {
+        if (!Z_.avail_in) {
+            if (const auto size = Source_.read(InBuffer.data(), InBuffer.size())) {
+                Z_.next_in = reinterpret_cast<unsigned char*>(InBuffer.data());
+                Z_.avail_in = size;
+            } else {
+                set(nullptr, 0ULL);
+                return false;
+            }
+        }
+
+        switch (inflate(&Z_, Z_SYNC_FLUSH)) {
+            case Z_NEED_DICT:
+                ythrow yexception() << "Need dict.";
+            case Z_STREAM_END:
+                YQL_ENSURE(inflateReset(&Z_) == Z_OK, "Inflate reset error: " << GetErrMsg(Z_));
+                [[fallthrough]];
+            case Z_OK:
+                if (const auto processed = OutBuffer.size() - Z_.avail_out) {
+                    working_buffer = Buffer(OutBuffer.data(), OutBuffer.data() + processed);
+                    return true;
+                }
+                break;
+            default:
+                ythrow yexception() << GetErrMsg(Z_);
+        }
+    }
+}
+
+}
+
+}

+ 26 - 0
ydb/library/yql/providers/s3/compressors/gz.h

@@ -0,0 +1,26 @@
+#pragma once
+
+#include <ydb/library/yql/udfs/common/clickhouse/client/src/IO/ReadBuffer.h>
+#include <zlib.h>
+
+namespace NYql {
+
+namespace NGz {
+
+class TReadBuffer : public NDB::ReadBuffer {
+public:
+    TReadBuffer(NDB::ReadBuffer& source);
+    ~TReadBuffer();
+private:
+    bool nextImpl() final;
+
+    NDB::ReadBuffer& Source_;
+    std::vector<char> InBuffer, OutBuffer;
+
+    z_stream Z_;
+};
+
+}
+
+}
+

+ 89 - 0
ydb/library/yql/providers/s3/compressors/xz.cpp

@@ -0,0 +1,89 @@
+#include "xz.h"
+
+#include <util/generic/size_literals.h>
+#include <ydb/library/yql/utils/yql_panic.h>
+
+namespace NYql {
+
+namespace NXz {
+
+TReadBuffer::TReadBuffer(NDB::ReadBuffer& source)
+    : NDB::ReadBuffer(nullptr, 0ULL), Source_(source), Strm_(LZMA_STREAM_INIT)
+{
+    InBuffer.resize(8_KB);
+    OutBuffer.resize(64_KB);
+
+    switch (const lzma_ret ret = lzma_auto_decoder(&Strm_, UINT64_MAX, LZMA_CONCATENATED)) {
+        case LZMA_OK:
+            return;
+        case LZMA_MEM_ERROR:
+            throw yexception() << "Memory allocation failed.";
+        case LZMA_OPTIONS_ERROR:
+            throw yexception() << "Unsupported decompressor flags.";
+        default:
+            throw yexception() << "Unknown error << " << int(ret) << ", possibly a bug.";
+    }
+}
+
+TReadBuffer::~TReadBuffer() {
+    lzma_end(&Strm_);
+}
+
+bool TReadBuffer::nextImpl() {
+    if (IsOutFinished_) {
+        return false;
+    }
+
+    lzma_action action = LZMA_RUN;
+
+    Strm_.next_out = reinterpret_cast<unsigned char*>(OutBuffer.data());
+    Strm_.avail_out = OutBuffer.size();
+
+    while (true) {
+        if (!Strm_.avail_in && !IsInFinished_) {
+            if (const auto size = Source_.read(InBuffer.data(), InBuffer.size())) {
+                Strm_.next_in = reinterpret_cast<unsigned char*>(InBuffer.data());
+                Strm_.avail_in = size;
+            } else {
+                IsInFinished_ = true;
+                action = LZMA_FINISH;
+            }
+        }
+
+        const lzma_ret ret = lzma_code(&Strm_, action);
+        if (ret == LZMA_STREAM_END) {
+            IsOutFinished_ = true;
+        }
+
+        if (!Strm_.avail_out || ret == LZMA_STREAM_END) {
+            if (const auto outLen = OutBuffer.size() - Strm_.avail_out) {
+                working_buffer = Buffer(OutBuffer.data(), OutBuffer.data() + outLen);
+                return true;
+            } else {
+                set(nullptr, 0ULL);
+                return false;
+            }
+        }
+
+        switch (ret) {
+            case LZMA_OK:
+                continue;
+            case LZMA_MEM_ERROR:
+                throw yexception() << "Memory allocation failed.";
+            case LZMA_FORMAT_ERROR:
+                throw yexception() << "The input is not in the .xz format.";
+            case LZMA_OPTIONS_ERROR:
+                throw yexception() << "Unsupported compression options.";
+            case LZMA_DATA_ERROR:
+                throw yexception() << "Compressed file is corrupt.";
+            case LZMA_BUF_ERROR:
+                throw yexception() << "Compressed file is truncated or otherwise corrupt.";
+            default:
+                throw yexception() << "Unknown error " << int(ret) << ", possibly a bug.";
+        }
+    }
+}
+
+}
+
+}

+ 29 - 0
ydb/library/yql/providers/s3/compressors/xz.h

@@ -0,0 +1,29 @@
+#pragma once
+
+#include <ydb/library/yql/udfs/common/clickhouse/client/src/IO/ReadBuffer.h>
+#include <contrib/libs/lzma/liblzma/api/lzma.h>
+
+namespace NYql {
+
+namespace NXz {
+
+class TReadBuffer : public NDB::ReadBuffer {
+public:
+    TReadBuffer(NDB::ReadBuffer& source);
+    ~TReadBuffer();
+private:
+    bool nextImpl() final;
+
+    NDB::ReadBuffer& Source_;
+    std::vector<char> InBuffer, OutBuffer;
+
+    lzma_stream Strm_;
+
+    bool IsInFinished_ = false;
+    bool IsOutFinished_ = false;
+};
+
+}
+
+}
+