123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255 |
- #pragma once
- #include <yql/essentials/minikql/defs.h>
- #include <yql/essentials/minikql/mkql_node.h>
- #include <yql/essentials/minikql/pack_num.h>
- #include <yql/essentials/public/decimal/yql_decimal_serialize.h>
- #include <yql/essentials/public/udf/udf_value.h>
- #include <yql/essentials/utils/chunked_buffer.h>
- #include <library/cpp/packedtypes/zigzag.h>
- #include <util/generic/buffer.h>
- #include <util/generic/strbuf.h>
- namespace NKikimr {
- namespace NMiniKQL {
- namespace NDetails {
- template<typename TBuf>
- inline void PackUInt64(ui64 val, TBuf& buf) {
- buf.Advance(MAX_PACKED64_SIZE);
- char* dst = buf.Pos() - MAX_PACKED64_SIZE;
- buf.EraseBack(MAX_PACKED64_SIZE - Pack64(val, dst));
- }
- template<typename TBuf>
- inline void PackInt64(i64 val, TBuf& buf) {
- PackUInt64(ZigZagEncode(val), buf);
- }
- template<typename TBuf>
- inline void PackUInt32(ui32 val, TBuf& buf) {
- buf.Advance(MAX_PACKED32_SIZE);
- char* dst = buf.Pos() - MAX_PACKED32_SIZE;
- buf.EraseBack(MAX_PACKED32_SIZE - Pack32(val, dst));
- }
- template<typename TBuf>
- inline void PackInt32(i32 val, TBuf& buf) {
- PackUInt32(ZigZagEncode(val), buf);
- }
- template<typename TBuf>
- inline void PackUInt16(ui16 val, TBuf& buf) {
- buf.Advance(MAX_PACKED32_SIZE);
- char* dst = buf.Pos() - MAX_PACKED32_SIZE;
- buf.EraseBack(MAX_PACKED32_SIZE - Pack32(val, dst));
- }
- template<typename TBuf>
- inline void PackInt16(i16 val, TBuf& buf) {
- PackUInt16(ZigZagEncode(val), buf);
- }
- template <typename T, typename TBuf>
- void PutRawData(T val, TBuf& buf) {
- buf.Advance(sizeof(T));
- std::memcpy(buf.Pos() - sizeof(T), &val, sizeof(T));
- }
- constexpr size_t MAX_PACKED_DECIMAL_SIZE = sizeof(NYql::NDecimal::TInt128);
- template<typename TBuf>
- void PackDecimal(NYql::NDecimal::TInt128 val, TBuf& buf) {
- buf.Advance(MAX_PACKED_DECIMAL_SIZE);
- char* dst = buf.Pos() - MAX_PACKED_DECIMAL_SIZE;
- buf.EraseBack(MAX_PACKED_DECIMAL_SIZE - NYql::NDecimal::Serialize(val, dst));
- }
- class TChunkedInputBuffer : private TNonCopyable {
- public:
- explicit TChunkedInputBuffer(NYql::TChunkedBuffer&& rope)
- : Rope_(std::move(rope))
- {
- Next();
- }
- explicit TChunkedInputBuffer(TStringBuf input)
- : Rope_(NYql::TChunkedBuffer{})
- , Data_(input.data())
- , Len_(input.size())
- {
- }
- inline const char* data() const {
- return Data_;
- }
- inline size_t length() const {
- return Len_;
- }
- inline size_t size() const {
- return Len_;
- }
- inline void Skip(size_t size) {
- Y_DEBUG_ABORT_UNLESS(size <= Len_);
- Data_ += size;
- Len_ -= size;
- }
- bool IsEmpty() {
- if (size()) {
- return false;
- }
- Next();
- return size() == 0;
- }
- inline void CopyTo(char* dst, size_t toCopy) {
- if (Y_LIKELY(toCopy <= size())) {
- std::memcpy(dst, data(), toCopy);
- Skip(toCopy);
- } else {
- CopyToChunked(dst, toCopy);
- }
- }
- inline NYql::TChunkedBuffer ReleaseRope() {
- Y_DEBUG_ABORT_UNLESS(OriginalLen_ >= Len_);
- Rope_.Erase(OriginalLen_ - Len_);
- NYql::TChunkedBuffer result = std::move(Rope_);
- Data_ = nullptr;
- Len_ = OriginalLen_ = 0;
- Rope_.Clear();
- return result;
- }
- void Next() {
- Y_DEBUG_ABORT_UNLESS(Len_ == 0);
- Rope_.Erase(OriginalLen_);
- if (!Rope_.Empty()) {
- Len_ = OriginalLen_ = Rope_.Front().Buf.size();
- Data_ = Rope_.Front().Buf.data();
- Y_DEBUG_ABORT_UNLESS(Len_ > 0);
- } else {
- Len_ = OriginalLen_ = 0;
- Data_ = nullptr;
- }
- }
- private:
- void CopyToChunked(char* dst, size_t toCopy) {
- while (toCopy) {
- size_t chunkSize = std::min(size(), toCopy);
- std::memcpy(dst, data(), chunkSize);
- Skip(chunkSize);
- dst += chunkSize;
- toCopy -= chunkSize;
- if (toCopy) {
- Next();
- MKQL_ENSURE(size(), "Unexpected end of buffer");
- }
- }
- }
- NYql::TChunkedBuffer Rope_;
- const char* Data_ = nullptr;
- size_t Len_ = 0;
- size_t OriginalLen_ = 0;
- };
- template <typename T>
- T GetRawData(TChunkedInputBuffer& buf) {
- T val;
- buf.CopyTo(reinterpret_cast<char*>(&val), sizeof(val));
- return val;
- }
- template<typename T>
- T UnpackInteger(TChunkedInputBuffer& buf) {
- T res;
- size_t read;
- if constexpr (std::is_same_v<T, NYql::NDecimal::TInt128>) {
- std::tie(res, read) = NYql::NDecimal::Deserialize(buf.data(), buf.size());
- Y_DEBUG_ABORT_UNLESS((read != 0) xor (NYql::NDecimal::IsError(res)));
- } else if constexpr (std::is_same_v<T, ui64>) {
- read = Unpack64(buf.data(), buf.size(), res);
- } else {
- static_assert(std::is_same_v<T, ui32>, "Only ui32/ui64/TInt128 are supported");
- read = Unpack32(buf.data(), buf.size(), res);
- }
- if (Y_LIKELY(read > 0)) {
- buf.Skip(read);
- return res;
- }
- static_assert(MAX_PACKED_DECIMAL_SIZE > MAX_PACKED64_SIZE);
- char tmpBuf[MAX_PACKED_DECIMAL_SIZE];
- Y_DEBUG_ABORT_UNLESS(buf.size() < MAX_PACKED_DECIMAL_SIZE);
- std::memcpy(tmpBuf, buf.data(), buf.size());
- size_t pos = buf.size();
- buf.Skip(buf.size());
- for (;;) {
- if (buf.size() == 0) {
- buf.Next();
- MKQL_ENSURE(buf.size() > 0, (std::is_same_v<T, NYql::NDecimal::TInt128> ? "Bad decimal packed data" : "Bad uint packed data"));
- }
- Y_DEBUG_ABORT_UNLESS(pos < MAX_PACKED_DECIMAL_SIZE);
- tmpBuf[pos++] = *buf.data();
- buf.Skip(1);
- if constexpr (std::is_same_v<T, NYql::NDecimal::TInt128>) {
- std::tie(res, read) = NYql::NDecimal::Deserialize(tmpBuf, pos);
- Y_DEBUG_ABORT_UNLESS((read != 0) xor (NYql::NDecimal::IsError(res)));
- } else if constexpr (std::is_same_v<T, ui64>) {
- read = Unpack64(tmpBuf, pos, res);
- } else {
- read = Unpack32(tmpBuf, pos, res);
- }
- if (read) {
- break;
- }
- }
- return res;
- }
- inline NYql::NDecimal::TInt128 UnpackDecimal(TChunkedInputBuffer& buf) {
- return UnpackInteger<NYql::NDecimal::TInt128>(buf);
- }
- inline ui64 UnpackUInt64(TChunkedInputBuffer& buf) {
- return UnpackInteger<ui64>(buf);
- }
- inline i64 UnpackInt64(TChunkedInputBuffer& buf) {
- return ZigZagDecode(UnpackUInt64(buf));
- }
- inline ui32 UnpackUInt32(TChunkedInputBuffer& buf) {
- return UnpackInteger<ui32>(buf);
- }
- inline i32 UnpackInt32(TChunkedInputBuffer& buf) {
- return ZigZagDecode(UnpackUInt32(buf));
- }
- inline ui16 UnpackUInt16(TChunkedInputBuffer& buf) {
- ui32 res = UnpackUInt32(buf);
- MKQL_ENSURE(res <= Max<ui16>(), "Corrupted data");
- return res;
- }
- inline i16 UnpackInt16(TChunkedInputBuffer& buf) {
- return ZigZagDecode(UnpackUInt16(buf));
- }
- } // NDetails
- }
- }
|