mkql_computation_node_pack.cpp 53 KB


  1. #include "mkql_block_impl.h"
  2. #include "mkql_computation_node_pack.h"
  3. #include "mkql_computation_node_pack_impl.h"
  4. #include "mkql_computation_node_holders.h"
  5. #include "presort.h"
  6. #include <yql/essentials/parser/pg_wrapper/interface/pack.h>
  7. #include <yql/essentials/public/udf/arrow/memory_pool.h>
  8. #include <yql/essentials/public/decimal/yql_decimal_serialize.h>
  9. #include <yql/essentials/public/decimal/yql_decimal.h>
  10. #include <yql/essentials/minikql/defs.h>
  11. #include <yql/essentials/minikql/pack_num.h>
  12. #include <yql/essentials/minikql/mkql_string_util.h>
  13. #include <yql/essentials/minikql/mkql_type_builder.h>
  14. #include <library/cpp/resource/resource.h>
  15. #include <yql/essentials/utils/fp_bits.h>
  16. #include <util/system/yassert.h>
  17. #include <util/system/sanitizers.h>
  18. using NYql::TChunkedBuffer;
  19. namespace NKikimr {
  20. namespace NMiniKQL {
  21. namespace {
  22. using namespace NDetails;
  23. template<bool Fast, typename T, typename TBuf>
  24. void PackData(T value, TBuf& buffer) {
  25. static_assert(std::is_arithmetic_v<T>);
  26. if constexpr (Fast || sizeof(T) == 1 || std::is_floating_point_v<T>) {
  27. PutRawData(value, buffer);
  28. } else if constexpr (std::is_same_v<T, i16>) {
  29. PackInt16(value, buffer);
  30. } else if constexpr (std::is_same_v<T, ui16>) {
  31. PackUInt16(value, buffer);
  32. } else if constexpr (std::is_same_v<T, i32>) {
  33. PackInt32(value, buffer);
  34. } else if constexpr (std::is_same_v<T, ui32>) {
  35. PackUInt32(value, buffer);
  36. } else if constexpr (std::is_same_v<T, i64>) {
  37. PackInt64(value, buffer);
  38. } else {
  39. static_assert(std::is_same_v<T, ui64>);
  40. PackUInt64(value, buffer);
  41. }
  42. }
  43. template<typename TBuf>
  44. void PackBlob(const char* data, size_t size, TBuf& buf) {
  45. buf.Append(data, size);
  46. }
  47. template <bool Fast, typename T>
  48. T UnpackData(TChunkedInputBuffer& buf) {
  49. static_assert(std::is_arithmetic_v<T>);
  50. T res;
  51. if constexpr (Fast || sizeof(T) == 1 || std::is_floating_point_v<T>) {
  52. res = GetRawData<T>(buf);
  53. } else if constexpr (std::is_same_v<T, i16>) {
  54. res = UnpackInt16(buf);
  55. } else if constexpr (std::is_same_v<T, ui16>) {
  56. res = UnpackUInt16(buf);
  57. } else if constexpr (std::is_same_v<T, i32>) {
  58. res = UnpackInt32(buf);
  59. } else if constexpr (std::is_same_v<T, ui32>) {
  60. res = UnpackUInt32(buf);
  61. } else if constexpr (std::is_same_v<T, i64>) {
  62. res = UnpackInt64(buf);
  63. } else {
  64. static_assert(std::is_same_v<T, ui64>);
  65. res = UnpackUInt64(buf);
  66. }
  67. return res;
  68. }
  69. NUdf::TUnboxedValuePod UnpackString(TChunkedInputBuffer& buf, ui32 size) {
  70. auto res = MakeStringNotFilled(size, 0);
  71. NYql::NUdf::TMutableStringRef ref = res.AsStringRef();
  72. Y_DEBUG_ABORT_UNLESS(size == ref.Size());
  73. buf.CopyTo(ref.Data(), size);
  74. return res;
  75. }
  76. template<typename TBuf>
  77. void SerializeMeta(TBuf& buf, bool useMask, const NDetails::TOptionalUsageMask& mask, ui32 fullLen, bool singleOptional) {
  78. if (fullLen > 7) {
  79. NDetails::PutRawData(fullLen, buf);
  80. // Long length always singnals non-empty optional. So, don't check
  81. // EProps::SingleOptional here
  82. } else {
  83. ui8 length = 1 | (fullLen << 1);
  84. // Empty root optional always has short length. Embed empty flag
  85. // into the length
  86. if (singleOptional && !mask.IsEmptyMask()) {
  87. length |= 0x10;
  88. }
  89. NDetails::PutRawData(length, buf);
  90. }
  91. if (useMask) {
  92. // Prepend optional mask before data
  93. mask.Serialize(buf);
  94. }
  95. }
  96. class TFixedSizeBuffer {
  97. public:
  98. TFixedSizeBuffer(char* buf, size_t size)
  99. : Data_(buf)
  100. , Capacity_(size)
  101. {
  102. }
  103. inline char* Pos() {
  104. return Data_ + Size_;
  105. }
  106. inline size_t Size() const {
  107. return Size_;
  108. }
  109. inline void Advance(size_t len) {
  110. Size_ += len;
  111. }
  112. inline void EraseBack(size_t len) {
  113. Y_DEBUG_ABORT_UNLESS(Size_ >= len);
  114. Size_ -= len;
  115. }
  116. inline void Append(const char* data, size_t len) {
  117. Y_DEBUG_ABORT_UNLESS(Size_ + len <= Capacity_);
  118. std::memcpy(Data_ + Size_, data, len);
  119. Size_ += len;
  120. }
  121. inline void Append(char c) {
  122. Y_DEBUG_ABORT_UNLESS(Size_ + 1 <= Capacity_);
  123. *(Pos()) = c;
  124. ++Size_;
  125. }
  126. private:
  127. char* const Data_;
  128. size_t Size_ = 0;
  129. const size_t Capacity_;
  130. };
  131. template<bool Fast>
  132. std::pair<ui32, bool> SkipEmbeddedLength(TChunkedInputBuffer& buf, size_t totalBufSize) {
  133. if constexpr (Fast) {
  134. Y_ABORT("Should not be called");
  135. }
  136. ui32 length = 0;
  137. bool emptySingleOptional = false;
  138. if (totalBufSize > 8) {
  139. length = GetRawData<ui32>(buf);
  140. MKQL_ENSURE(length + 4 == totalBufSize, "Bad packed data. Invalid embedded size");
  141. } else {
  142. length = GetRawData<ui8>(buf);
  143. MKQL_ENSURE(length & 1, "Bad packed data. Invalid embedded size");
  144. emptySingleOptional = 0 != (length & 0x10);
  145. length = (length & 0x0f) >> 1;
  146. MKQL_ENSURE(length + 1 == totalBufSize, "Bad packed data. Invalid embedded size");
  147. }
  148. return {length, emptySingleOptional};
  149. }
  150. bool HasOptionalFields(const TType* type) {
  151. switch (type->GetKind()) {
  152. case TType::EKind::Void:
  153. case TType::EKind::Null:
  154. case TType::EKind::EmptyList:
  155. case TType::EKind::EmptyDict:
  156. case TType::EKind::Data:
  157. return false;
  158. case TType::EKind::Optional:
  159. return true;
  160. case TType::EKind::Pg:
  161. return true;
  162. case TType::EKind::List:
  163. return HasOptionalFields(static_cast<const TListType*>(type)->GetItemType());
  164. case TType::EKind::Struct: {
  165. auto structType = static_cast<const TStructType*>(type);
  166. for (ui32 index = 0; index < structType->GetMembersCount(); ++index) {
  167. if (HasOptionalFields(structType->GetMemberType(index))) {
  168. return true;
  169. }
  170. }
  171. return false;
  172. }
  173. case TType::EKind::Tuple: {
  174. auto tupleType = static_cast<const TTupleType*>(type);
  175. for (ui32 index = 0; index < tupleType->GetElementsCount(); ++index) {
  176. if (HasOptionalFields(tupleType->GetElementType(index))) {
  177. return true;
  178. }
  179. }
  180. return false;
  181. }
  182. case TType::EKind::Dict: {
  183. auto dictType = static_cast<const TDictType*>(type);
  184. return HasOptionalFields(dictType->GetKeyType()) || HasOptionalFields(dictType->GetPayloadType());
  185. }
  186. case TType::EKind::Variant: {
  187. auto variantType = static_cast<const TVariantType*>(type);
  188. return HasOptionalFields(variantType->GetUnderlyingType());
  189. }
  190. case TType::EKind::Tagged: {
  191. auto taggedType = static_cast<const TTaggedType*>(type);
  192. return HasOptionalFields(taggedType->GetBaseType());
  193. }
  194. case TType::EKind::Multi: {
  195. auto multiType = static_cast<const TMultiType*>(type);
  196. for (ui32 index = 0; index < multiType->GetElementsCount(); ++index) {
  197. if (HasOptionalFields(multiType->GetElementType(index))) {
  198. return true;
  199. }
  200. }
  201. return false;
  202. }
  203. case TType::EKind::Block: {
  204. auto blockType = static_cast<const TBlockType*>(type);
  205. return HasOptionalFields(blockType->GetItemType());
  206. }
  207. default:
  208. THROW yexception() << "Unsupported type: " << type->GetKindAsStr();
  209. }
  210. }
  211. TPackProperties ScanTypeProperties(const TType* type, bool assumeList) {
  212. TPackProperties props;
  213. if (HasOptionalFields(type)) {
  214. props.Set(EPackProps::UseOptionalMask);
  215. }
  216. if (assumeList) {
  217. return props;
  218. }
  219. if (type->GetKind() == TType::EKind::Optional) {
  220. type = static_cast<const TOptionalType*>(type)->GetItemType();
  221. if (!HasOptionalFields(type)) {
  222. props.Set(EPackProps::SingleOptional);
  223. props.Reset(EPackProps::UseOptionalMask);
  224. }
  225. }
  226. // Here and after the type is unwrapped!!
  227. if (type->GetKind() == TType::EKind::Data) {
  228. auto dataType = static_cast<const TDataType*>(type);
  229. switch (*dataType->GetDataSlot()) {
  230. case NUdf::EDataSlot::String:
  231. case NUdf::EDataSlot::Json:
  232. case NUdf::EDataSlot::Yson:
  233. case NUdf::EDataSlot::Utf8:
  234. case NUdf::EDataSlot::JsonDocument:
  235. // Reuse entire packed value length for strings
  236. props.Set(EPackProps::UseTopLength);
  237. break;
  238. default:
  239. break;
  240. }
  241. }
  242. return props;
  243. }
  244. template<bool Fast>
  245. NUdf::TUnboxedValue UnpackFromChunkedBuffer(const TType* type, TChunkedInputBuffer& buf, ui32 topLength,
  246. const THolderFactory& holderFactory, TPackerState& s)
  247. {
  248. switch (type->GetKind()) {
  249. case TType::EKind::Void:
  250. return NUdf::TUnboxedValuePod::Void();
  251. case TType::EKind::Null:
  252. return NUdf::TUnboxedValuePod();
  253. case TType::EKind::EmptyList:
  254. return holderFactory.GetEmptyContainerLazy();
  255. case TType::EKind::EmptyDict:
  256. return holderFactory.GetEmptyContainerLazy();
  257. case TType::EKind::Data: {
  258. auto dataType = static_cast<const TDataType*>(type);
  259. switch (*dataType->GetDataSlot()) {
  260. case NUdf::EDataSlot::Bool:
  261. return NUdf::TUnboxedValuePod(UnpackData<Fast, bool>(buf));
  262. case NUdf::EDataSlot::Int8:
  263. return NUdf::TUnboxedValuePod(UnpackData<Fast, i8>(buf));
  264. case NUdf::EDataSlot::Uint8:
  265. return NUdf::TUnboxedValuePod(UnpackData<Fast, ui8>(buf));
  266. case NUdf::EDataSlot::Int16:
  267. return NUdf::TUnboxedValuePod(UnpackData<Fast, i16>(buf));
  268. case NUdf::EDataSlot::Uint16:
  269. return NUdf::TUnboxedValuePod(UnpackData<Fast, ui16>(buf));
  270. case NUdf::EDataSlot::Int32:
  271. case NUdf::EDataSlot::Date32:
  272. return NUdf::TUnboxedValuePod(UnpackData<Fast, i32>(buf));
  273. case NUdf::EDataSlot::Uint32:
  274. return NUdf::TUnboxedValuePod(UnpackData<Fast, ui32>(buf));
  275. case NUdf::EDataSlot::Int64:
  276. return NUdf::TUnboxedValuePod(UnpackData<Fast, i64>(buf));
  277. case NUdf::EDataSlot::Uint64:
  278. return NUdf::TUnboxedValuePod(UnpackData<Fast, ui64>(buf));
  279. case NUdf::EDataSlot::Float:
  280. return NUdf::TUnboxedValuePod(UnpackData<Fast, float>(buf));
  281. case NUdf::EDataSlot::Double:
  282. return NUdf::TUnboxedValuePod(UnpackData<Fast, double>(buf));
  283. case NUdf::EDataSlot::Date:
  284. return NUdf::TUnboxedValuePod(UnpackData<Fast, ui16>(buf));
  285. case NUdf::EDataSlot::Datetime:
  286. return NUdf::TUnboxedValuePod(UnpackData<Fast, ui32>(buf));
  287. case NUdf::EDataSlot::Timestamp:
  288. return NUdf::TUnboxedValuePod(UnpackData<Fast, ui64>(buf));
  289. case NUdf::EDataSlot::Interval:
  290. case NUdf::EDataSlot::Datetime64:
  291. case NUdf::EDataSlot::Timestamp64:
  292. case NUdf::EDataSlot::Interval64:
  293. return NUdf::TUnboxedValuePod(UnpackData<Fast, i64>(buf));
  294. case NUdf::EDataSlot::TzDate: {
  295. auto ret = NUdf::TUnboxedValuePod(UnpackData<Fast, ui16>(buf));
  296. ret.SetTimezoneId(UnpackData<Fast, ui16>(buf));
  297. return ret;
  298. }
  299. case NUdf::EDataSlot::TzDatetime: {
  300. auto ret = NUdf::TUnboxedValuePod(UnpackData<Fast, ui32>(buf));
  301. ret.SetTimezoneId(UnpackData<Fast, ui16>(buf));
  302. return ret;
  303. }
  304. case NUdf::EDataSlot::TzTimestamp: {
  305. auto ret = NUdf::TUnboxedValuePod(UnpackData<Fast, ui64>(buf));
  306. ret.SetTimezoneId(UnpackData<Fast, ui16>(buf));
  307. return ret;
  308. }
  309. case NUdf::EDataSlot::TzDate32: {
  310. auto ret = NUdf::TUnboxedValuePod(UnpackData<Fast, i32>(buf));
  311. ret.SetTimezoneId(UnpackData<Fast, ui16>(buf));
  312. return ret;
  313. }
  314. case NUdf::EDataSlot::TzDatetime64: {
  315. auto ret = NUdf::TUnboxedValuePod(UnpackData<Fast, i64>(buf));
  316. ret.SetTimezoneId(UnpackData<Fast, ui16>(buf));
  317. return ret;
  318. }
  319. case NUdf::EDataSlot::TzTimestamp64: {
  320. auto ret = NUdf::TUnboxedValuePod(UnpackData<Fast, i64>(buf));
  321. ret.SetTimezoneId(UnpackData<Fast, ui16>(buf));
  322. return ret;
  323. }
  324. case NUdf::EDataSlot::Uuid: {
  325. return UnpackString(buf, 16);
  326. }
  327. case NUdf::EDataSlot::Decimal: {
  328. return NUdf::TUnboxedValuePod(UnpackDecimal(buf));
  329. }
  330. case NUdf::EDataSlot::String:
  331. case NUdf::EDataSlot::Utf8:
  332. case NUdf::EDataSlot::Yson:
  333. case NUdf::EDataSlot::Json:
  334. case NUdf::EDataSlot::JsonDocument:
  335. case NUdf::EDataSlot::DyNumber: {
  336. ui32 size = 0;
  337. if constexpr (Fast) {
  338. size = NDetails::GetRawData<ui32>(buf);
  339. } else {
  340. if (s.Properties.Test(EPackProps::UseTopLength)) {
  341. size = topLength;
  342. } else {
  343. size = NDetails::UnpackUInt32(buf);
  344. }
  345. }
  346. return UnpackString(buf, size);
  347. }
  348. }
  349. break;
  350. }
  351. case TType::EKind::Optional: {
  352. auto optionalType = static_cast<const TOptionalType*>(type);
  353. bool present;
  354. if constexpr (Fast) {
  355. present = NDetails::GetRawData<ui8>(buf);
  356. } else {
  357. present = !s.OptionalUsageMask.IsNextEmptyOptional();
  358. }
  359. if (present) {
  360. return UnpackFromChunkedBuffer<Fast>(optionalType->GetItemType(), buf, topLength, holderFactory, s).Release().MakeOptional();
  361. } else {
  362. return NUdf::TUnboxedValuePod();
  363. }
  364. }
  365. case TType::EKind::Pg: {
  366. auto pgType = static_cast<const TPgType*>(type);
  367. bool present;
  368. if constexpr (Fast) {
  369. present = NDetails::GetRawData<ui8>(buf);
  370. } else {
  371. present = !s.OptionalUsageMask.IsNextEmptyOptional();
  372. }
  373. if (present) {
  374. return PGUnpackImpl(pgType, buf);
  375. } else {
  376. return NUdf::TUnboxedValuePod();
  377. }
  378. }
  379. case TType::EKind::List: {
  380. auto listType = static_cast<const TListType*>(type);
  381. auto itemType = listType->GetItemType();
  382. ui64 len;
  383. if constexpr (Fast) {
  384. len = NDetails::GetRawData<ui64>(buf);
  385. } else {
  386. len = NDetails::UnpackUInt64(buf);
  387. }
  388. if (!len) {
  389. return holderFactory.GetEmptyContainerLazy();
  390. }
  391. TTemporaryUnboxedValueVector tmp;
  392. for (ui64 i = 0; i < len; ++i) {
  393. tmp.emplace_back(UnpackFromChunkedBuffer<Fast>(itemType, buf, topLength, holderFactory, s));
  394. }
  395. NUdf::TUnboxedValue *items = nullptr;
  396. auto list = holderFactory.CreateDirectArrayHolder(len, items);
  397. for (ui64 i = 0; i < len; ++i) {
  398. items[i] = std::move(tmp[i]);
  399. }
  400. return std::move(list);
  401. }
  402. case TType::EKind::Struct: {
  403. auto structType = static_cast<const TStructType*>(type);
  404. NUdf::TUnboxedValue* itemsPtr = nullptr;
  405. auto res = holderFactory.CreateDirectArrayHolder(structType->GetMembersCount(), itemsPtr);
  406. for (ui32 index = 0; index < structType->GetMembersCount(); ++index) {
  407. auto memberType = structType->GetMemberType(index);
  408. itemsPtr[index] = UnpackFromChunkedBuffer<Fast>(memberType, buf, topLength, holderFactory, s);
  409. }
  410. return std::move(res);
  411. }
  412. case TType::EKind::Tuple: {
  413. auto tupleType = static_cast<const TTupleType*>(type);
  414. NUdf::TUnboxedValue* itemsPtr = nullptr;
  415. auto res = holderFactory.CreateDirectArrayHolder(tupleType->GetElementsCount(), itemsPtr);
  416. for (ui32 index = 0; index < tupleType->GetElementsCount(); ++index) {
  417. auto elementType = tupleType->GetElementType(index);
  418. itemsPtr[index] = UnpackFromChunkedBuffer<Fast>(elementType, buf, topLength, holderFactory, s);
  419. }
  420. return std::move(res);
  421. }
  422. case TType::EKind::Dict: {
  423. auto dictType = static_cast<const TDictType*>(type);
  424. auto keyType = dictType->GetKeyType();
  425. auto payloadType = dictType->GetPayloadType();
  426. auto dictBuilder = holderFactory.NewDict(dictType, NUdf::TDictFlags::EDictKind::Hashed);
  427. ui64 len;
  428. if constexpr (Fast) {
  429. len = NDetails::GetRawData<ui64>(buf);
  430. } else {
  431. len = NDetails::UnpackUInt64(buf);
  432. }
  433. for (ui64 i = 0; i < len; ++i) {
  434. auto key = UnpackFromChunkedBuffer<Fast>(keyType, buf, topLength, holderFactory, s);
  435. auto payload = UnpackFromChunkedBuffer<Fast>(payloadType, buf, topLength, holderFactory, s);
  436. dictBuilder->Add(std::move(key), std::move(payload));
  437. }
  438. return dictBuilder->Build();
  439. }
  440. case TType::EKind::Variant: {
  441. auto variantType = static_cast<const TVariantType*>(type);
  442. ui32 variantIndex;
  443. if constexpr (Fast) {
  444. variantIndex = NDetails::GetRawData<ui32>(buf);
  445. } else {
  446. variantIndex = NDetails::UnpackUInt32(buf);
  447. }
  448. TType* innerType = variantType->GetUnderlyingType();
  449. if (innerType->IsStruct()) {
  450. MKQL_ENSURE(variantIndex < static_cast<TStructType*>(innerType)->GetMembersCount(), "Bad variant index: " << variantIndex);
  451. innerType = static_cast<TStructType*>(innerType)->GetMemberType(variantIndex);
  452. } else {
  453. MKQL_ENSURE(innerType->IsTuple(), "Unexpected underlying variant type: " << innerType->GetKindAsStr());
  454. MKQL_ENSURE(variantIndex < static_cast<TTupleType*>(innerType)->GetElementsCount(), "Bad variant index: " << variantIndex);
  455. innerType = static_cast<TTupleType*>(innerType)->GetElementType(variantIndex);
  456. }
  457. return holderFactory.CreateVariantHolder(UnpackFromChunkedBuffer<Fast>(innerType, buf, topLength, holderFactory, s).Release(), variantIndex);
  458. }
  459. case TType::EKind::Tagged: {
  460. auto taggedType = static_cast<const TTaggedType*>(type);
  461. return UnpackFromChunkedBuffer<Fast>(taggedType->GetBaseType(), buf, topLength, holderFactory, s);
  462. }
  463. default:
  464. THROW yexception() << "Unsupported type: " << type->GetKindAsStr();
  465. }
  466. }
  467. template<bool Fast>
  468. NUdf::TUnboxedValue DoUnpack(const TType* type, TChunkedInputBuffer& buf, size_t totalBufSize, const THolderFactory& holderFactory, TPackerState& s) {
  469. if constexpr (Fast) {
  470. NUdf::TUnboxedValue res;
  471. res = UnpackFromChunkedBuffer<Fast>(type, buf, 0, holderFactory, s);
  472. MKQL_ENSURE(buf.IsEmpty(), "Bad packed data - partial data read");
  473. return res;
  474. }
  475. auto pair = SkipEmbeddedLength<Fast>(buf, totalBufSize);
  476. ui32 length = pair.first;
  477. bool emptySingleOptional = pair.second;
  478. if (s.Properties.Test(EPackProps::UseOptionalMask)) {
  479. s.OptionalUsageMask.Reset(buf);
  480. }
  481. NUdf::TUnboxedValue res;
  482. if (s.Properties.Test(EPackProps::SingleOptional) && emptySingleOptional) {
  483. res = NUdf::TUnboxedValuePod();
  484. } else if (type->IsStruct()) {
  485. auto structType = static_cast<const TStructType*>(type);
  486. NUdf::TUnboxedValue* items = nullptr;
  487. res = s.TopStruct.NewArray(holderFactory, structType->GetMembersCount(), items);
  488. for (ui32 index = 0; index < structType->GetMembersCount(); ++index) {
  489. auto memberType = structType->GetMemberType(index);
  490. *items++ = UnpackFromChunkedBuffer<Fast>(memberType, buf, length, holderFactory, s);
  491. }
  492. } else {
  493. res = UnpackFromChunkedBuffer<Fast>(type, buf, length, holderFactory, s);
  494. }
  495. MKQL_ENSURE(buf.IsEmpty(), "Bad packed data - partial data read");
  496. return res;
  497. }
  498. template<bool Fast>
  499. void DoUnpackBatch(const TType* type, TChunkedInputBuffer& buf, size_t totalSize, const THolderFactory& holderFactory, TPackerState& s, TUnboxedValueBatch& result) {
  500. ui64 len;
  501. ui32 topLength;
  502. const TType* itemType = type;
  503. if constexpr (!Fast) {
  504. auto pair = SkipEmbeddedLength<Fast>(buf, totalSize);
  505. topLength = pair.first;
  506. bool emptySingleOptional = pair.second;
  507. if (s.Properties.Test(EPackProps::UseOptionalMask)) {
  508. s.OptionalUsageMask.Reset(buf);
  509. }
  510. MKQL_ENSURE(!s.Properties.Test(EPackProps::SingleOptional) || !emptySingleOptional, "Unexpected header settings");
  511. len = NDetails::UnpackUInt64(buf);
  512. } else {
  513. topLength = 0;
  514. len = NDetails::GetRawData<ui64>(buf);
  515. }
  516. if (type->IsMulti()) {
  517. auto multiType = static_cast<const TMultiType*>(type);
  518. const ui32 width = multiType->GetElementsCount();
  519. Y_DEBUG_ABORT_UNLESS(result.IsWide());
  520. Y_DEBUG_ABORT_UNLESS(result.Width() == width);
  521. for (ui64 i = 0; i < len; ++i) {
  522. result.PushRow([&](ui32 j) {
  523. return UnpackFromChunkedBuffer<Fast>(multiType->GetElementType(j), buf, topLength, holderFactory, s);
  524. });
  525. }
  526. } else {
  527. Y_DEBUG_ABORT_UNLESS(!result.IsWide());
  528. for (ui64 i = 0; i < len; ++i) {
  529. result.emplace_back(UnpackFromChunkedBuffer<Fast>(itemType, buf, topLength, holderFactory, s));
  530. }
  531. }
  532. MKQL_ENSURE(buf.IsEmpty(), "Bad packed data - partial data read");
  533. }
  534. template<bool Fast, bool Stable, typename TBuf>
  535. void PackImpl(const TType* type, TBuf& buffer, const NUdf::TUnboxedValuePod& value, TPackerState& s) {
  536. switch (type->GetKind()) {
  537. case TType::EKind::Void:
  538. break;
  539. case TType::EKind::Null:
  540. break;
  541. case TType::EKind::EmptyList:
  542. break;
  543. case TType::EKind::EmptyDict:
  544. break;
  545. case TType::EKind::Data: {
  546. auto dataType = static_cast<const TDataType*>(type);
  547. switch (*dataType->GetDataSlot()) {
  548. case NUdf::EDataSlot::Bool:
  549. PackData<Fast>(value.Get<bool>(), buffer);
  550. break;
  551. case NUdf::EDataSlot::Int8:
  552. PackData<Fast>(value.Get<i8>(), buffer);
  553. break;
  554. case NUdf::EDataSlot::Uint8:
  555. PackData<Fast>(value.Get<ui8>(), buffer);
  556. break;
  557. case NUdf::EDataSlot::Int16:
  558. PackData<Fast>(value.Get<i16>(), buffer);
  559. break;
  560. case NUdf::EDataSlot::Uint16:
  561. PackData<Fast>(value.Get<ui16>(), buffer);
  562. break;
  563. case NUdf::EDataSlot::Int32:
  564. case NUdf::EDataSlot::Date32:
  565. PackData<Fast>(value.Get<i32>(), buffer);
  566. break;
  567. case NUdf::EDataSlot::Uint32:
  568. PackData<Fast>(value.Get<ui32>(), buffer);
  569. break;
  570. case NUdf::EDataSlot::Int64:
  571. PackData<Fast>(value.Get<i64>(), buffer);
  572. break;
  573. case NUdf::EDataSlot::Uint64:
  574. PackData<Fast>(value.Get<ui64>(), buffer);
  575. break;
  576. case NUdf::EDataSlot::Float: {
  577. float x = value.Get<float>();
  578. if constexpr (Stable) {
  579. NYql::CanonizeFpBits<float>(&x);
  580. }
  581. PackData<Fast>(x, buffer);
  582. break;
  583. }
  584. case NUdf::EDataSlot::Double: {
  585. double x = value.Get<double>();
  586. if constexpr (Stable) {
  587. NYql::CanonizeFpBits<double>(&x);
  588. }
  589. PackData<Fast>(x, buffer);
  590. break;
  591. }
  592. case NUdf::EDataSlot::Date:
  593. PackData<Fast>(value.Get<ui16>(), buffer);
  594. break;
  595. case NUdf::EDataSlot::Datetime:
  596. PackData<Fast>(value.Get<ui32>(), buffer);
  597. break;
  598. case NUdf::EDataSlot::Timestamp:
  599. PackData<Fast>(value.Get<ui64>(), buffer);
  600. break;
  601. case NUdf::EDataSlot::Interval:
  602. case NUdf::EDataSlot::Datetime64:
  603. case NUdf::EDataSlot::Timestamp64:
  604. case NUdf::EDataSlot::Interval64:
  605. PackData<Fast>(value.Get<i64>(), buffer);
  606. break;
  607. case NUdf::EDataSlot::Uuid: {
  608. auto ref = value.AsStringRef();
  609. PackBlob(ref.Data(), ref.Size(), buffer);
  610. break;
  611. }
  612. case NUdf::EDataSlot::TzDate: {
  613. PackData<Fast>(value.Get<ui16>(), buffer);
  614. PackData<Fast>(value.GetTimezoneId(), buffer);
  615. break;
  616. }
  617. case NUdf::EDataSlot::TzDatetime: {
  618. PackData<Fast>(value.Get<ui32>(), buffer);
  619. PackData<Fast>(value.GetTimezoneId(), buffer);
  620. break;
  621. }
  622. case NUdf::EDataSlot::TzTimestamp: {
  623. PackData<Fast>(value.Get<ui64>(), buffer);
  624. PackData<Fast>(value.GetTimezoneId(), buffer);
  625. break;
  626. }
  627. case NUdf::EDataSlot::TzDate32: {
  628. PackData<Fast>(value.Get<i32>(), buffer);
  629. PackData<Fast>(value.GetTimezoneId(), buffer);
  630. break;
  631. }
  632. case NUdf::EDataSlot::TzDatetime64: {
  633. PackData<Fast>(value.Get<i64>(), buffer);
  634. PackData<Fast>(value.GetTimezoneId(), buffer);
  635. break;
  636. }
  637. case NUdf::EDataSlot::TzTimestamp64: {
  638. PackData<Fast>(value.Get<i64>(), buffer);
  639. PackData<Fast>(value.GetTimezoneId(), buffer);
  640. break;
  641. }
  642. case NUdf::EDataSlot::Decimal: {
  643. PackDecimal(value.GetInt128(), buffer);
  644. break;
  645. }
  646. case NUdf::EDataSlot::String:
  647. case NUdf::EDataSlot::Utf8:
  648. case NUdf::EDataSlot::Yson:
  649. case NUdf::EDataSlot::Json:
  650. case NUdf::EDataSlot::JsonDocument:
  651. case NUdf::EDataSlot::DyNumber: {
  652. auto stringRef = value.AsStringRef();
  653. if constexpr (Fast) {
  654. static_assert(std::is_same_v<decltype(stringRef.Size()), ui32>);
  655. PackData<Fast>(stringRef.Size(), buffer);
  656. } else {
  657. if (!s.Properties.Test(EPackProps::UseTopLength)) {
  658. PackData<Fast>(stringRef.Size(), buffer);
  659. }
  660. }
  661. PackBlob(stringRef.Data(), stringRef.Size(), buffer);
  662. }
  663. }
  664. break;
  665. }
  666. case TType::EKind::Optional: {
  667. auto optionalType = static_cast<const TOptionalType*>(type);
  668. if constexpr (Fast) {
  669. PackData<Fast>(ui8(bool(value)), buffer);
  670. } else {
  671. s.OptionalUsageMask.SetNextEmptyOptional(!value);
  672. }
  673. if (value) {
  674. PackImpl<Fast, Stable>(optionalType->GetItemType(), buffer, value.GetOptionalValue(), s);
  675. }
  676. break;
  677. }
  678. case TType::EKind::Pg: {
  679. auto pgType = static_cast<const TPgType*>(type);
  680. if constexpr (Fast) {
  681. PackData<Fast>(ui8(bool(value)), buffer);
  682. } else {
  683. s.OptionalUsageMask.SetNextEmptyOptional(!value);
  684. }
  685. if (value) {
  686. PGPackImpl(Stable, pgType, value, buffer);
  687. }
  688. break;
  689. }
  690. case TType::EKind::List: {
  691. auto listType = static_cast<const TListType*>(type);
  692. auto itemType = listType->GetItemType();
  693. if (value.HasFastListLength()) {
  694. ui64 len = value.GetListLength();
  695. PackData<Fast>(len, buffer);
  696. TThresher<false>::DoForEachItem(value,
  697. [&](const NYql::NUdf::TUnboxedValuePod& item) { PackImpl<Fast, Stable>(itemType, buffer, item, s); });
  698. } else {
  699. const auto iter = value.GetListIterator();
  700. if constexpr (Fast) {
  701. ui64 count = 0;
  702. buffer.Advance(sizeof(count));
  703. char* dst = buffer.Pos() - sizeof(count);
  704. for (NUdf::TUnboxedValue item; iter.Next(item);) {
  705. PackImpl<Fast, Stable>(itemType, buffer, item, s);
  706. ++count;
  707. }
  708. std::memcpy(dst, &count, sizeof(count));
  709. } else {
  710. TUnboxedValueVector items;
  711. for (NUdf::TUnboxedValue item; iter.Next(item);) {
  712. items.emplace_back(std::move(item));
  713. }
  714. PackData<Fast>(ui64(items.size()), buffer);
  715. for (const auto& item : items) {
  716. PackImpl<Fast, Stable>(itemType, buffer, item, s);
  717. }
  718. }
  719. }
  720. break;
  721. }
  722. case TType::EKind::Struct: {
  723. auto structType = static_cast<const TStructType*>(type);
  724. for (ui32 index = 0; index < structType->GetMembersCount(); ++index) {
  725. auto memberType = structType->GetMemberType(index);
  726. PackImpl<Fast, Stable>(memberType, buffer, value.GetElement(index), s);
  727. }
  728. break;
  729. }
  730. case TType::EKind::Tuple: {
  731. auto tupleType = static_cast<const TTupleType*>(type);
  732. for (ui32 index = 0; index < tupleType->GetElementsCount(); ++index) {
  733. auto elementType = tupleType->GetElementType(index);
  734. PackImpl<Fast, Stable>(elementType, buffer, value.GetElement(index), s);
  735. }
  736. break;
  737. }
  738. case TType::EKind::Dict: {
  739. auto dictType = static_cast<const TDictType*>(type);
  740. auto keyType = dictType->GetKeyType();
  741. auto payloadType = dictType->GetPayloadType();
  742. ui64 length = value.GetDictLength();
  743. PackData<Fast>(length, buffer);
  744. const auto iter = value.GetDictIterator();
  745. if constexpr (Fast) {
  746. for (NUdf::TUnboxedValue key, payload; iter.NextPair(key, payload);) {
  747. PackImpl<Fast, Stable>(keyType, buffer, key, s);
  748. PackImpl<Fast, Stable>(payloadType, buffer, payload, s);
  749. }
  750. } else {
  751. if (Stable && !value.IsSortedDict()) {
  752. // no key duplicates here
  753. TKeyTypes types;
  754. bool isTuple;
  755. bool encoded;
  756. bool useIHash;
  757. GetDictionaryKeyTypes(keyType, types, isTuple, encoded, useIHash);
  758. if (encoded) {
  759. TGenericPresortEncoder packer(keyType);
  760. typename decltype(s.EncodedDictBuffers)::value_type dictBuffer;
  761. if (!s.EncodedDictBuffers.empty()) {
  762. dictBuffer = std::move(s.EncodedDictBuffers.back());
  763. s.EncodedDictBuffers.pop_back();
  764. dictBuffer.clear();
  765. }
  766. dictBuffer.reserve(length);
  767. for (NUdf::TUnboxedValue key, payload; iter.NextPair(key, payload);) {
  768. NUdf::TUnboxedValue encodedKey = MakeString(packer.Encode(key, false));
  769. dictBuffer.emplace_back(std::move(encodedKey), std::move(key), std::move(payload));
  770. }
  771. Sort(dictBuffer.begin(), dictBuffer.end(),
  772. [&](const auto &left, const auto &right) {
  773. return CompareKeys(std::get<0>(left), std::get<0>(right), types, isTuple) < 0;
  774. });
  775. for (const auto& x : dictBuffer) {
  776. PackImpl<Fast, Stable>(keyType, buffer, std::get<1>(x), s);
  777. PackImpl<Fast, Stable>(payloadType, buffer, std::get<2>(x), s);
  778. }
  779. dictBuffer.clear();
  780. s.EncodedDictBuffers.push_back(std::move(dictBuffer));
  781. } else {
  782. typename decltype(s.DictBuffers)::value_type dictBuffer;
  783. if (!s.DictBuffers.empty()) {
  784. dictBuffer = std::move(s.DictBuffers.back());
  785. s.DictBuffers.pop_back();
  786. dictBuffer.clear();
  787. }
  788. dictBuffer.reserve(length);
  789. for (NUdf::TUnboxedValue key, payload; iter.NextPair(key, payload);) {
  790. dictBuffer.emplace_back(std::move(key), std::move(payload));
  791. }
  792. NUdf::ICompare::TPtr cmp = useIHash ? MakeCompareImpl(keyType) : nullptr;
  793. Sort(dictBuffer.begin(), dictBuffer.end(), TKeyPayloadPairLess(types, isTuple, cmp.Get()));
  794. for (const auto& p: dictBuffer) {
  795. PackImpl<Fast, Stable>(keyType, buffer, p.first, s);
  796. PackImpl<Fast, Stable>(payloadType, buffer, p.second, s);
  797. }
  798. dictBuffer.clear();
  799. s.DictBuffers.push_back(std::move(dictBuffer));
  800. }
  801. } else {
  802. for (NUdf::TUnboxedValue key, payload; iter.NextPair(key, payload);) {
  803. PackImpl<Fast, Stable>(keyType, buffer, key, s);
  804. PackImpl<Fast, Stable>(payloadType, buffer, payload, s);
  805. }
  806. }
  807. }
  808. break;
  809. }
  810. case TType::EKind::Variant: {
  811. auto variantType = static_cast<const TVariantType*>(type);
  812. ui32 variantIndex = value.GetVariantIndex();
  813. TType* innerType = variantType->GetUnderlyingType();
  814. if (innerType->IsStruct()) {
  815. innerType = static_cast<TStructType*>(innerType)->GetMemberType(variantIndex);
  816. } else {
  817. MKQL_ENSURE(innerType->IsTuple(), "Unexpected underlying variant type: " << innerType->GetKindAsStr());
  818. innerType = static_cast<TTupleType*>(innerType)->GetElementType(variantIndex);
  819. }
  820. PackData<Fast>(variantIndex, buffer);
  821. PackImpl<Fast, Stable>(innerType, buffer, value.GetVariantItem(), s);
  822. break;
  823. }
  824. case TType::EKind::Tagged: {
  825. auto taggedType = static_cast<const TTaggedType*>(type);
  826. PackImpl<Fast, Stable>(taggedType->GetBaseType(), buffer, value, s);
  827. break;
  828. }
  829. default:
  830. THROW yexception() << "Unsupported type: " << type->GetKindAsStr();
  831. }
  832. }
  833. bool HasOffset(const arrow::ArrayData& array, i64 expectedOffset) {
  834. return array.offset == expectedOffset &&
  835. AllOf(array.child_data, [&](const auto& child) { return HasOffset(*child, expectedOffset); });
  836. }
  837. bool IsUi64Scalar(const TBlockType* blockType) {
  838. if (blockType->GetShape() != TBlockType::EShape::Scalar) {
  839. return false;
  840. }
  841. if (!blockType->GetItemType()->IsData()) {
  842. return false;
  843. }
  844. return static_cast<const TDataType*>(blockType->GetItemType())->GetDataSlot() == NUdf::EDataSlot::Uint64;
  845. }
  846. bool IsLegacyStructBlock(const TType* type, ui32& blockLengthIndex, TVector<const TBlockType*>& items) {
  847. items.clear();
  848. blockLengthIndex = Max<ui32>();
  849. if (!type->IsStruct()) {
  850. return false;
  851. }
  852. const TStructType* structType = static_cast<const TStructType*>(type);
  853. static const TStringBuf blockLenColumnName = "_yql_block_length";
  854. auto index = structType->FindMemberIndex(blockLenColumnName);
  855. if (!index) {
  856. return false;
  857. }
  858. for (ui32 i = 0; i < structType->GetMembersCount(); i++) {
  859. auto type = structType->GetMemberType(i);
  860. if (!type->IsBlock()) {
  861. return false;
  862. }
  863. const TBlockType* blockType = static_cast<const TBlockType*>(type);
  864. items.push_back(blockType);
  865. if (i == *index && !IsUi64Scalar(blockType)) {
  866. return false;
  867. }
  868. }
  869. blockLengthIndex = *index;
  870. return true;
  871. }
  872. bool IsMultiBlock(const TType* type, ui32& blockLengthIndex, TVector<const TBlockType*>& items) {
  873. items.clear();
  874. blockLengthIndex = Max<ui32>();
  875. if (!type->IsMulti()) {
  876. return false;
  877. }
  878. const TMultiType* multiType = static_cast<const TMultiType*>(type);
  879. ui32 width = multiType->GetElementsCount();
  880. if (!width) {
  881. return false;
  882. }
  883. for (ui32 i = 0; i < width; i++) {
  884. auto type = multiType->GetElementType(i);
  885. if (!type->IsBlock()) {
  886. return false;
  887. }
  888. const TBlockType* blockType = static_cast<const TBlockType*>(type);
  889. items.push_back(blockType);
  890. if (i == width - 1 && !IsUi64Scalar(blockType)) {
  891. return false;
  892. }
  893. }
  894. blockLengthIndex = width - 1;
  895. return true;
  896. }
  897. } // namespace
  898. template<bool Fast>
  899. TValuePackerGeneric<Fast>::TValuePackerGeneric(bool stable, const TType* type)
  900. : Stable_(stable)
  901. , Type_(type)
  902. , State_(ScanTypeProperties(Type_, false))
  903. {
  904. MKQL_ENSURE(!Fast || !Stable_, "Stable mode is not supported");
  905. }
  906. template<bool Fast>
  907. NUdf::TUnboxedValue TValuePackerGeneric<Fast>::Unpack(TStringBuf buf, const THolderFactory& holderFactory) const {
  908. TChunkedInputBuffer chunked(buf);
  909. return DoUnpack<Fast>(Type_, chunked, buf.size(), holderFactory, State_);
  910. }
  911. template<bool Fast>
  912. TStringBuf TValuePackerGeneric<Fast>::Pack(const NUdf::TUnboxedValuePod& value) const {
  913. auto& s = State_;
  914. if constexpr (Fast) {
  915. Buffer_.Proceed(0);
  916. if (Stable_) {
  917. PackImpl<Fast, true>(Type_, Buffer_, value, s);
  918. } else {
  919. PackImpl<Fast, false>(Type_, Buffer_, value, s);
  920. }
  921. return TStringBuf(Buffer_.data(), Buffer_.size());
  922. }
  923. s.OptionalUsageMask.Reset();
  924. const size_t lengthReserve = sizeof(ui32);
  925. Buffer_.Proceed(lengthReserve + s.OptionalMaskReserve);
  926. if (Stable_) {
  927. PackImpl<Fast, true>(Type_, Buffer_, value, s);
  928. } else {
  929. PackImpl<Fast, false>(Type_, Buffer_, value, s);
  930. }
  931. size_t delta = 0;
  932. size_t len = Buffer_.Size();
  933. if (s.Properties.Test(EPackProps::UseOptionalMask)) {
  934. // Prepend optional mask
  935. const size_t actualOptionalMaskSize = s.OptionalUsageMask.CalcSerializedSize();
  936. if (actualOptionalMaskSize > s.OptionalMaskReserve) {
  937. TBuffer buf(Buffer_.Size() + actualOptionalMaskSize - s.OptionalMaskReserve);
  938. buf.Proceed(actualOptionalMaskSize - s.OptionalMaskReserve);
  939. buf.Append(Buffer_.Data(), Buffer_.Size());
  940. Buffer_.Swap(buf);
  941. s.OptionalMaskReserve = actualOptionalMaskSize;
  942. len = Buffer_.Size();
  943. }
  944. delta = s.OptionalMaskReserve - actualOptionalMaskSize;
  945. Buffer_.Proceed(lengthReserve + delta);
  946. s.OptionalUsageMask.Serialize(Buffer_);
  947. }
  948. // Prepend length
  949. if (len - delta - lengthReserve > 7) {
  950. const ui32 length = len - delta - lengthReserve;
  951. Buffer_.Proceed(delta);
  952. Buffer_.Append((const char*)&length, sizeof(length));
  953. // Long length always singnals non-empty optional. So, don't check EProps::SingleOptional here
  954. } else {
  955. ui8 length = 1 | ((len - delta - lengthReserve) << 1);
  956. // Empty root optional always has short length. Embed empty flag into the length
  957. if (s.Properties.Test(EPackProps::SingleOptional) && !s.OptionalUsageMask.IsEmptyMask()) {
  958. length |= 0x10;
  959. }
  960. delta += 3;
  961. Buffer_.Proceed(delta);
  962. Buffer_.Append((const char*)&length, sizeof(length));
  963. }
  964. return TStringBuf(Buffer_.Data() + delta, len - delta);
  965. }
  966. // Transport packer
  967. template<bool Fast>
  968. TValuePackerTransport<Fast>::TValuePackerTransport(bool stable, const TType* type, arrow::MemoryPool* pool)
  969. : Type_(type)
  970. , State_(ScanTypeProperties(Type_, false))
  971. , IncrementalState_(ScanTypeProperties(Type_, true))
  972. , ArrowPool_(pool ? *pool : *NYql::NUdf::GetYqlMemoryPool())
  973. {
  974. MKQL_ENSURE(!stable, "Stable packing is not supported");
  975. InitBlocks();
  976. }
  977. template<bool Fast>
  978. TValuePackerTransport<Fast>::TValuePackerTransport(const TType* type, arrow::MemoryPool* pool)
  979. : Type_(type)
  980. , State_(ScanTypeProperties(Type_, false))
  981. , IncrementalState_(ScanTypeProperties(Type_, true))
  982. , ArrowPool_(pool ? *pool : *NYql::NUdf::GetYqlMemoryPool())
  983. {
  984. InitBlocks();
  985. }
  986. template<bool Fast>
  987. void TValuePackerTransport<Fast>::InitBlocks() {
  988. TVector<const TBlockType*> items;
  989. if (IsLegacyStructBlock(Type_, BlockLenIndex_, items)) {
  990. IsLegacyBlock_ = true;
  991. } else if (!IsMultiBlock(Type_, BlockLenIndex_, items)) {
  992. return;
  993. }
  994. IsBlock_ = true;
  995. ConvertedScalars_.resize(items.size());
  996. BlockReaders_.resize(items.size());
  997. BlockSerializers_.resize(items.size());
  998. BlockDeserializers_.resize(items.size());
  999. for (ui32 i = 0; i < items.size(); ++i) {
  1000. if (i != BlockLenIndex_) {
  1001. const TBlockType* itemType = items[i];
  1002. BlockSerializers_[i] = MakeBlockSerializer(TTypeInfoHelper(), itemType->GetItemType());
  1003. BlockDeserializers_[i] = MakeBlockDeserializer(TTypeInfoHelper(), itemType->GetItemType());
  1004. if (itemType->GetShape() == TBlockType::EShape::Scalar) {
  1005. BlockReaders_[i] = NYql::NUdf::MakeBlockReader(TTypeInfoHelper(), itemType->GetItemType());
  1006. }
  1007. }
  1008. }
  1009. }
  1010. template<bool Fast>
  1011. NUdf::TUnboxedValue TValuePackerTransport<Fast>::Unpack(TChunkedBuffer&& buf, const THolderFactory& holderFactory) const {
  1012. MKQL_ENSURE(!IsBlock_, "Unpack() should not be used for blocks");
  1013. const size_t totalSize = buf.Size();
  1014. TChunkedInputBuffer chunked(std::move(buf));
  1015. return DoUnpack<Fast>(Type_, chunked, totalSize, holderFactory, State_);
  1016. }
  1017. template<bool Fast>
  1018. void TValuePackerTransport<Fast>::UnpackBatch(TChunkedBuffer&& buf, const THolderFactory& holderFactory, TUnboxedValueBatch& result) const {
  1019. if (IsBlock_) {
  1020. return UnpackBatchBlocks(std::move(buf), holderFactory, result);
  1021. }
  1022. const size_t totalSize = buf.Size();
  1023. TChunkedInputBuffer chunked(std::move(buf));
  1024. DoUnpackBatch<Fast>(Type_, chunked, totalSize, holderFactory, IncrementalState_, result);
  1025. }
  1026. template<bool Fast>
  1027. TChunkedBuffer TValuePackerTransport<Fast>::Pack(const NUdf::TUnboxedValuePod& value) const {
  1028. MKQL_ENSURE(ItemCount_ == 0, "Can not mix Pack() and AddItem() calls");
  1029. MKQL_ENSURE(!IsBlock_, "Pack() should not be used for blocks");
  1030. TPagedBuffer::TPtr result = std::make_shared<TPagedBuffer>();
  1031. if constexpr (Fast) {
  1032. PackImpl<Fast, false>(Type_, *result, value, State_);
  1033. } else {
  1034. State_.OptionalUsageMask.Reset();
  1035. result->ReserveHeader(sizeof(ui32) + State_.OptionalMaskReserve);
  1036. PackImpl<Fast, false>(Type_, *result, value, State_);
  1037. BuildMeta(result, false);
  1038. }
  1039. return TPagedBuffer::AsChunkedBuffer(result);
  1040. }
  1041. template<bool Fast>
  1042. void TValuePackerTransport<Fast>::StartPack() {
  1043. Buffer_ = std::make_shared<TPagedBuffer>();
  1044. if constexpr (Fast) {
  1045. // reserve place for list item count
  1046. Buffer_->ReserveHeader(sizeof(ItemCount_));
  1047. } else {
  1048. IncrementalState_.OptionalUsageMask.Reset();
  1049. Buffer_->ReserveHeader(sizeof(ui32) + State_.OptionalMaskReserve + MAX_PACKED64_SIZE);
  1050. }
  1051. }
  1052. template<bool Fast>
  1053. TValuePackerTransport<Fast>& TValuePackerTransport<Fast>::AddItem(const NUdf::TUnboxedValuePod& value) {
  1054. Y_DEBUG_ABORT_UNLESS(!Type_->IsMulti());
  1055. if (IsLegacyBlock_) {
  1056. static_assert(sizeof(NUdf::TUnboxedValuePod) == sizeof(NUdf::TUnboxedValue));
  1057. const NUdf::TUnboxedValuePod* values = static_cast<const NUdf::TUnboxedValuePod*>(value.GetElements());
  1058. return AddWideItemBlocks(values, BlockSerializers_.size());
  1059. }
  1060. const TType* itemType = Type_;
  1061. if (!ItemCount_) {
  1062. StartPack();
  1063. }
  1064. PackImpl<Fast, false>(itemType, *Buffer_, value, IncrementalState_);
  1065. ++ItemCount_;
  1066. return *this;
  1067. }
  1068. template<bool Fast>
  1069. TValuePackerTransport<Fast>& TValuePackerTransport<Fast>::AddWideItem(const NUdf::TUnboxedValuePod* values, ui32 width) {
  1070. Y_DEBUG_ABORT_UNLESS(Type_->IsMulti());
  1071. Y_DEBUG_ABORT_UNLESS(static_cast<const TMultiType*>(Type_)->GetElementsCount() == width);
  1072. if (IsBlock_) {
  1073. return AddWideItemBlocks(values, width);
  1074. }
  1075. const TMultiType* itemType = static_cast<const TMultiType*>(Type_);
  1076. if (!ItemCount_) {
  1077. StartPack();
  1078. }
  1079. for (ui32 i = 0; i < width; ++i) {
  1080. PackImpl<Fast, false>(itemType->GetElementType(i), *Buffer_, values[i], IncrementalState_);
  1081. }
  1082. ++ItemCount_;
  1083. return *this;
  1084. }
  1085. template<bool Fast>
  1086. TValuePackerTransport<Fast>& TValuePackerTransport<Fast>::AddWideItemBlocks(const NUdf::TUnboxedValuePod* values, ui32 width) {
  1087. MKQL_ENSURE(width == BlockSerializers_.size(), "Invalid width");
  1088. const ui64 len = TArrowBlock::From(values[BlockLenIndex_]).GetDatum().scalar_as<arrow::UInt64Scalar>().value;
  1089. auto metadataBuffer = std::make_shared<TBuffer>();
  1090. ui32 totalMetadataCount = 0;
  1091. for (size_t i = 0; i < width; ++i) {
  1092. if (i != BlockLenIndex_) {
  1093. MKQL_ENSURE(BlockSerializers_[i], "Invalid serializer");
  1094. totalMetadataCount += BlockSerializers_[i]->ArrayMetadataCount();
  1095. }
  1096. }
  1097. // calculate approximate metadata size
  1098. const size_t metadataReservedSize =
  1099. MAX_PACKED64_SIZE + // block len
  1100. MAX_PACKED64_SIZE + // feature flags
  1101. (width - 1) + // 1-byte offsets
  1102. MAX_PACKED32_SIZE + // metadata words count
  1103. MAX_PACKED64_SIZE * totalMetadataCount; // metadata words
  1104. metadataBuffer->Reserve(len ? metadataReservedSize : MAX_PACKED64_SIZE);
  1105. // save block length
  1106. PackData<false>(len, *metadataBuffer);
  1107. if (!len) {
  1108. // only block len should be serialized in this case
  1109. BlockBuffer_.Append(TStringBuf(metadataBuffer->data(), metadataBuffer->size()), metadataBuffer);
  1110. ++ItemCount_;
  1111. return *this;
  1112. }
  1113. // save feature flags
  1114. // 1 = "scalars are present"
  1115. const ui64 metadataFlags = 1 << 0;
  1116. PackData<false>(metadataFlags, *metadataBuffer);
  1117. TVector<std::shared_ptr<arrow::ArrayData>> arrays(width);
  1118. // save reminder of original offset for each column - it is needed to properly handle offset in bitmaps
  1119. for (size_t i = 0; i < width; ++i) {
  1120. if (i == BlockLenIndex_) {
  1121. continue;
  1122. }
  1123. arrow::Datum datum = TArrowBlock::From(values[i]).GetDatum();
  1124. ui8 reminder = 0;
  1125. if (datum.is_array()) {
  1126. i64 offset = datum.array()->offset;
  1127. MKQL_ENSURE(offset >= 0, "Negative offset");
  1128. // all offsets should be equal
  1129. MKQL_ENSURE(HasOffset(*datum.array(), offset), "Unexpected offset in child data");
  1130. reminder = offset % 8;
  1131. arrays[i] = datum.array();
  1132. } else {
  1133. MKQL_ENSURE(datum.is_scalar(), "Expecting array or scalar");
  1134. if (!ConvertedScalars_[i]) {
  1135. const TType* itemType = IsLegacyBlock_ ? static_cast<const TStructType*>(Type_)->GetMemberType(i) :
  1136. static_cast<const TMultiType*>(Type_)->GetElementType(i);
  1137. datum = MakeArrayFromScalar(*datum.scalar(), 1, static_cast<const TBlockType*>(itemType)->GetItemType(), ArrowPool_);
  1138. MKQL_ENSURE(HasOffset(*datum.array(), 0), "Expected zero array offset after scalar is converted to array");
  1139. ConvertedScalars_[i] = datum.array();
  1140. }
  1141. arrays[i] = ConvertedScalars_[i];
  1142. }
  1143. PackData<false>(reminder, *metadataBuffer);
  1144. }
  1145. // save count of metadata words
  1146. PackData<false>(totalMetadataCount, *metadataBuffer);
  1147. // save metadata itself
  1148. ui32 savedMetadata = 0;
  1149. for (size_t i = 0; i < width; ++i) {
  1150. if (i != BlockLenIndex_) {
  1151. BlockSerializers_[i]->StoreMetadata(*arrays[i], [&](ui64 meta) {
  1152. PackData<false>(meta, *metadataBuffer);
  1153. ++savedMetadata;
  1154. });
  1155. }
  1156. }
  1157. MKQL_ENSURE(savedMetadata == totalMetadataCount, "Serialization metadata error");
  1158. BlockBuffer_.Append(TStringBuf(metadataBuffer->data(), metadataBuffer->size()), metadataBuffer);
  1159. // save buffers
  1160. for (size_t i = 0; i < width; ++i) {
  1161. if (i != BlockLenIndex_) {
  1162. BlockSerializers_[i]->StoreArray(*arrays[i], BlockBuffer_);
  1163. }
  1164. }
  1165. ++ItemCount_;
  1166. return *this;
  1167. }
  1168. template<bool Fast>
  1169. void TValuePackerTransport<Fast>::UnpackBatchBlocks(TChunkedBuffer&& buf, const THolderFactory& holderFactory, TUnboxedValueBatch& result) const {
  1170. while (!buf.Empty()) {
  1171. TChunkedInputBuffer chunked(std::move(buf));
  1172. // unpack block length
  1173. const ui64 len = UnpackData<false, ui64>(chunked);
  1174. if (len == 0) {
  1175. continue;
  1176. }
  1177. // unpack flags
  1178. const ui64 metadataFlags = UnpackData<false, ui64>(chunked);
  1179. MKQL_ENSURE(metadataFlags == 1, "Unsupported metadata flags");
  1180. // unpack array offsets
  1181. const ui32 width = BlockDeserializers_.size();
  1182. MKQL_ENSURE(width > 0, "Invalid width");
  1183. TVector<ui64> offsets(width);
  1184. for (ui32 i = 0; i < width; ++i) {
  1185. if (BlockDeserializers_[i]) {
  1186. offsets[i] = UnpackData<false, ui8>(chunked);
  1187. MKQL_ENSURE(offsets[i] < 8, "Unexpected offset value");
  1188. }
  1189. }
  1190. // unpack metadata
  1191. ui32 metaCount = UnpackData<false, ui32>(chunked);
  1192. for (ui32 i = 0; i < width; ++i) {
  1193. if (BlockDeserializers_[i]) {
  1194. BlockDeserializers_[i]->LoadMetadata([&]() -> ui64 {
  1195. MKQL_ENSURE(metaCount > 0, "No more metadata available");
  1196. --metaCount;
  1197. return UnpackData<false, ui64>(chunked);
  1198. });
  1199. }
  1200. }
  1201. MKQL_ENSURE(metaCount == 0, "Partial buffers read");
  1202. TChunkedBuffer ropeTail = chunked.ReleaseRope();
  1203. // unpack buffers
  1204. auto producer = [&](ui32 i) {
  1205. MKQL_ENSURE(i < width, "Unexpected row index");
  1206. if (i != BlockLenIndex_) {
  1207. MKQL_ENSURE(BlockDeserializers_[i], "Missing deserializer");
  1208. const bool isScalar = BlockReaders_[i] != nullptr;
  1209. auto array = BlockDeserializers_[i]->LoadArray(ropeTail, isScalar ? 1 : len, offsets[i]);
  1210. if (isScalar) {
  1211. TBlockItem item = BlockReaders_[i]->GetItem(*array, 0);
  1212. const TType* itemType = IsLegacyBlock_ ? static_cast<const TStructType*>(Type_)->GetMemberType(i) :
  1213. static_cast<const TMultiType*>(Type_)->GetElementType(i);
  1214. return holderFactory.CreateArrowBlock(ConvertScalar(static_cast<const TBlockType*>(itemType)->GetItemType(), item, ArrowPool_));
  1215. }
  1216. return holderFactory.CreateArrowBlock(array);
  1217. }
  1218. return holderFactory.CreateArrowBlock(arrow::Datum(std::make_shared<arrow::UInt64Scalar>(len)));
  1219. };
  1220. if (IsLegacyBlock_) {
  1221. NYql::NUdf::TUnboxedValue* valueItems;
  1222. auto structValue = holderFactory.CreateDirectArrayHolder(width, valueItems);
  1223. for (ui32 i = 0; i < width; ++i) {
  1224. valueItems[i] = producer(i);
  1225. }
  1226. result.emplace_back(std::move(structValue));
  1227. } else {
  1228. result.PushRow(producer);
  1229. }
  1230. buf = std::move(ropeTail);
  1231. }
  1232. }
  1233. template<bool Fast>
  1234. void TValuePackerTransport<Fast>::Clear() {
  1235. Buffer_.reset();
  1236. BlockBuffer_.Clear();
  1237. ItemCount_ = 0;
  1238. }
  1239. template<bool Fast>
  1240. TChunkedBuffer TValuePackerTransport<Fast>::Finish() {
  1241. if (IsBlock_) {
  1242. return FinishBlocks();
  1243. }
  1244. if (!ItemCount_) {
  1245. StartPack();
  1246. }
  1247. if constexpr (Fast) {
  1248. char* dst = Buffer_->Header(sizeof(ItemCount_));
  1249. Y_DEBUG_ABORT_UNLESS(dst);
  1250. std::memcpy(dst, &ItemCount_, sizeof(ItemCount_));
  1251. } else {
  1252. BuildMeta(Buffer_, true);
  1253. }
  1254. TPagedBuffer::TPtr result = std::move(Buffer_);
  1255. Clear();
  1256. return TPagedBuffer::AsChunkedBuffer(result);
  1257. }
  1258. template<bool Fast>
  1259. TChunkedBuffer TValuePackerTransport<Fast>::FinishBlocks() {
  1260. TChunkedBuffer result = std::move(BlockBuffer_);
  1261. Clear();
  1262. return result;
  1263. }
  1264. template<bool Fast>
  1265. void TValuePackerTransport<Fast>::BuildMeta(TPagedBuffer::TPtr& buffer, bool addItemCount) const {
  1266. const size_t itemCountSize = addItemCount ? GetPack64Length(ItemCount_) : 0;
  1267. const size_t packedSize = buffer->Size() + itemCountSize;
  1268. auto& s = addItemCount ? IncrementalState_ : State_;
  1269. const bool useMask = s.Properties.Test(EPackProps::UseOptionalMask);
  1270. const size_t maskSize = useMask ? s.OptionalUsageMask.CalcSerializedSize() : 0;
  1271. const size_t fullLen = maskSize + packedSize;
  1272. MKQL_ENSURE(fullLen <= Max<ui32>(), "Packed obbject size exceeds 4G");
  1273. size_t metaSize = (fullLen > 7 ? sizeof(ui32) : sizeof(ui8)) + maskSize;
  1274. if (char* header = buffer->Header(metaSize + itemCountSize)) {
  1275. TFixedSizeBuffer buf(header, metaSize + itemCountSize);
  1276. SerializeMeta(buf, useMask, s.OptionalUsageMask, fullLen, s.Properties.Test(EPackProps::SingleOptional));
  1277. if (addItemCount) {
  1278. if constexpr (Fast) {
  1279. PackData<Fast>(ItemCount_, buf);
  1280. } else {
  1281. // PackData() can not be used here - it may overwrite some bytes past the end of header
  1282. char tmp[MAX_PACKED64_SIZE];
  1283. size_t actualItemCountSize = Pack64(ItemCount_, tmp);
  1284. std::memcpy(buf.Pos(), tmp, actualItemCountSize);
  1285. buf.Advance(actualItemCountSize);
  1286. }
  1287. }
  1288. MKQL_ENSURE(buf.Size() == metaSize + itemCountSize, "Partial header write");
  1289. } else {
  1290. s.OptionalMaskReserve = maskSize;
  1291. TPagedBuffer::TPtr resultBuffer = std::make_shared<TPagedBuffer>();
  1292. SerializeMeta(*resultBuffer, useMask, s.OptionalUsageMask, fullLen, s.Properties.Test(EPackProps::SingleOptional));
  1293. if (addItemCount) {
  1294. PackData<Fast>(ItemCount_, *resultBuffer);
  1295. }
  1296. buffer->ForEachPage([&resultBuffer](const char* data, size_t len) {
  1297. resultBuffer->Append(data, len);
  1298. });
  1299. buffer = std::move(resultBuffer);
  1300. }
  1301. }
  1302. template class TValuePackerGeneric<true>;
  1303. template class TValuePackerGeneric<false>;
  1304. template class TValuePackerTransport<true>;
  1305. template class TValuePackerTransport<false>;
  1306. TValuePackerBoxed::TValuePackerBoxed(TMemoryUsageInfo* memInfo, bool stable, const TType* type)
  1307. : TBase(memInfo)
  1308. , TValuePacker(stable, type)
  1309. {}
  1310. } // NMiniKQL
  1311. } // NKikimr