mkql_computation_node_pack_impl.h 6.8 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255
  1. #pragma once
  2. #include <yql/essentials/minikql/defs.h>
  3. #include <yql/essentials/minikql/mkql_node.h>
  4. #include <yql/essentials/minikql/pack_num.h>
  5. #include <yql/essentials/public/decimal/yql_decimal_serialize.h>
  6. #include <yql/essentials/public/udf/udf_value.h>
  7. #include <yql/essentials/utils/chunked_buffer.h>
  8. #include <library/cpp/packedtypes/zigzag.h>
  9. #include <util/generic/buffer.h>
  10. #include <util/generic/strbuf.h>
  11. namespace NKikimr {
  12. namespace NMiniKQL {
  13. namespace NDetails {
  14. template<typename TBuf>
  15. inline void PackUInt64(ui64 val, TBuf& buf) {
  16. buf.Advance(MAX_PACKED64_SIZE);
  17. char* dst = buf.Pos() - MAX_PACKED64_SIZE;
  18. buf.EraseBack(MAX_PACKED64_SIZE - Pack64(val, dst));
  19. }
  20. template<typename TBuf>
  21. inline void PackInt64(i64 val, TBuf& buf) {
  22. PackUInt64(ZigZagEncode(val), buf);
  23. }
  24. template<typename TBuf>
  25. inline void PackUInt32(ui32 val, TBuf& buf) {
  26. buf.Advance(MAX_PACKED32_SIZE);
  27. char* dst = buf.Pos() - MAX_PACKED32_SIZE;
  28. buf.EraseBack(MAX_PACKED32_SIZE - Pack32(val, dst));
  29. }
  30. template<typename TBuf>
  31. inline void PackInt32(i32 val, TBuf& buf) {
  32. PackUInt32(ZigZagEncode(val), buf);
  33. }
  34. template<typename TBuf>
  35. inline void PackUInt16(ui16 val, TBuf& buf) {
  36. buf.Advance(MAX_PACKED32_SIZE);
  37. char* dst = buf.Pos() - MAX_PACKED32_SIZE;
  38. buf.EraseBack(MAX_PACKED32_SIZE - Pack32(val, dst));
  39. }
  40. template<typename TBuf>
  41. inline void PackInt16(i16 val, TBuf& buf) {
  42. PackUInt16(ZigZagEncode(val), buf);
  43. }
  44. template <typename T, typename TBuf>
  45. void PutRawData(T val, TBuf& buf) {
  46. buf.Advance(sizeof(T));
  47. std::memcpy(buf.Pos() - sizeof(T), &val, sizeof(T));
  48. }
  49. constexpr size_t MAX_PACKED_DECIMAL_SIZE = sizeof(NYql::NDecimal::TInt128);
  50. template<typename TBuf>
  51. void PackDecimal(NYql::NDecimal::TInt128 val, TBuf& buf) {
  52. buf.Advance(MAX_PACKED_DECIMAL_SIZE);
  53. char* dst = buf.Pos() - MAX_PACKED_DECIMAL_SIZE;
  54. buf.EraseBack(MAX_PACKED_DECIMAL_SIZE - NYql::NDecimal::Serialize(val, dst));
  55. }
  56. class TChunkedInputBuffer : private TNonCopyable {
  57. public:
  58. explicit TChunkedInputBuffer(NYql::TChunkedBuffer&& rope)
  59. : Rope_(std::move(rope))
  60. {
  61. Next();
  62. }
  63. explicit TChunkedInputBuffer(TStringBuf input)
  64. : Rope_(NYql::TChunkedBuffer{})
  65. , Data_(input.data())
  66. , Len_(input.size())
  67. {
  68. }
  69. inline const char* data() const {
  70. return Data_;
  71. }
  72. inline size_t length() const {
  73. return Len_;
  74. }
  75. inline size_t size() const {
  76. return Len_;
  77. }
  78. inline void Skip(size_t size) {
  79. Y_DEBUG_ABORT_UNLESS(size <= Len_);
  80. Data_ += size;
  81. Len_ -= size;
  82. }
  83. bool IsEmpty() {
  84. if (size()) {
  85. return false;
  86. }
  87. Next();
  88. return size() == 0;
  89. }
  90. inline void CopyTo(char* dst, size_t toCopy) {
  91. if (Y_LIKELY(toCopy <= size())) {
  92. std::memcpy(dst, data(), toCopy);
  93. Skip(toCopy);
  94. } else {
  95. CopyToChunked(dst, toCopy);
  96. }
  97. }
  98. inline NYql::TChunkedBuffer ReleaseRope() {
  99. Y_DEBUG_ABORT_UNLESS(OriginalLen_ >= Len_);
  100. Rope_.Erase(OriginalLen_ - Len_);
  101. NYql::TChunkedBuffer result = std::move(Rope_);
  102. Data_ = nullptr;
  103. Len_ = OriginalLen_ = 0;
  104. Rope_.Clear();
  105. return result;
  106. }
  107. void Next() {
  108. Y_DEBUG_ABORT_UNLESS(Len_ == 0);
  109. Rope_.Erase(OriginalLen_);
  110. if (!Rope_.Empty()) {
  111. Len_ = OriginalLen_ = Rope_.Front().Buf.size();
  112. Data_ = Rope_.Front().Buf.data();
  113. Y_DEBUG_ABORT_UNLESS(Len_ > 0);
  114. } else {
  115. Len_ = OriginalLen_ = 0;
  116. Data_ = nullptr;
  117. }
  118. }
  119. private:
  120. void CopyToChunked(char* dst, size_t toCopy) {
  121. while (toCopy) {
  122. size_t chunkSize = std::min(size(), toCopy);
  123. std::memcpy(dst, data(), chunkSize);
  124. Skip(chunkSize);
  125. dst += chunkSize;
  126. toCopy -= chunkSize;
  127. if (toCopy) {
  128. Next();
  129. MKQL_ENSURE(size(), "Unexpected end of buffer");
  130. }
  131. }
  132. }
  133. NYql::TChunkedBuffer Rope_;
  134. const char* Data_ = nullptr;
  135. size_t Len_ = 0;
  136. size_t OriginalLen_ = 0;
  137. };
  138. template <typename T>
  139. T GetRawData(TChunkedInputBuffer& buf) {
  140. T val;
  141. buf.CopyTo(reinterpret_cast<char*>(&val), sizeof(val));
  142. return val;
  143. }
  144. template<typename T>
  145. T UnpackInteger(TChunkedInputBuffer& buf) {
  146. T res;
  147. size_t read;
  148. if constexpr (std::is_same_v<T, NYql::NDecimal::TInt128>) {
  149. std::tie(res, read) = NYql::NDecimal::Deserialize(buf.data(), buf.size());
  150. Y_DEBUG_ABORT_UNLESS((read != 0) xor (NYql::NDecimal::IsError(res)));
  151. } else if constexpr (std::is_same_v<T, ui64>) {
  152. read = Unpack64(buf.data(), buf.size(), res);
  153. } else {
  154. static_assert(std::is_same_v<T, ui32>, "Only ui32/ui64/TInt128 are supported");
  155. read = Unpack32(buf.data(), buf.size(), res);
  156. }
  157. if (Y_LIKELY(read > 0)) {
  158. buf.Skip(read);
  159. return res;
  160. }
  161. static_assert(MAX_PACKED_DECIMAL_SIZE > MAX_PACKED64_SIZE);
  162. char tmpBuf[MAX_PACKED_DECIMAL_SIZE];
  163. Y_DEBUG_ABORT_UNLESS(buf.size() < MAX_PACKED_DECIMAL_SIZE);
  164. std::memcpy(tmpBuf, buf.data(), buf.size());
  165. size_t pos = buf.size();
  166. buf.Skip(buf.size());
  167. for (;;) {
  168. if (buf.size() == 0) {
  169. buf.Next();
  170. MKQL_ENSURE(buf.size() > 0, (std::is_same_v<T, NYql::NDecimal::TInt128> ? "Bad decimal packed data" : "Bad uint packed data"));
  171. }
  172. Y_DEBUG_ABORT_UNLESS(pos < MAX_PACKED_DECIMAL_SIZE);
  173. tmpBuf[pos++] = *buf.data();
  174. buf.Skip(1);
  175. if constexpr (std::is_same_v<T, NYql::NDecimal::TInt128>) {
  176. std::tie(res, read) = NYql::NDecimal::Deserialize(tmpBuf, pos);
  177. Y_DEBUG_ABORT_UNLESS((read != 0) xor (NYql::NDecimal::IsError(res)));
  178. } else if constexpr (std::is_same_v<T, ui64>) {
  179. read = Unpack64(tmpBuf, pos, res);
  180. } else {
  181. read = Unpack32(tmpBuf, pos, res);
  182. }
  183. if (read) {
  184. break;
  185. }
  186. }
  187. return res;
  188. }
  189. inline NYql::NDecimal::TInt128 UnpackDecimal(TChunkedInputBuffer& buf) {
  190. return UnpackInteger<NYql::NDecimal::TInt128>(buf);
  191. }
  192. inline ui64 UnpackUInt64(TChunkedInputBuffer& buf) {
  193. return UnpackInteger<ui64>(buf);
  194. }
  195. inline i64 UnpackInt64(TChunkedInputBuffer& buf) {
  196. return ZigZagDecode(UnpackUInt64(buf));
  197. }
  198. inline ui32 UnpackUInt32(TChunkedInputBuffer& buf) {
  199. return UnpackInteger<ui32>(buf);
  200. }
  201. inline i32 UnpackInt32(TChunkedInputBuffer& buf) {
  202. return ZigZagDecode(UnpackUInt32(buf));
  203. }
  204. inline ui16 UnpackUInt16(TChunkedInputBuffer& buf) {
  205. ui32 res = UnpackUInt32(buf);
  206. MKQL_ENSURE(res <= Max<ui16>(), "Corrupted data");
  207. return res;
  208. }
  209. inline i16 UnpackInt16(TChunkedInputBuffer& buf) {
  210. return ZigZagDecode(UnpackUInt16(buf));
  211. }
  212. } // NDetails
  213. }
  214. }