mkql_block_compress.cpp 33 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735
  1. #include "mkql_block_compress.h"
  2. #include "mkql_counters.h"
  3. #include <yql/essentials/minikql/computation/mkql_block_builder.h>
  4. #include <yql/essentials/minikql/computation/mkql_block_impl.h>
  5. #include <yql/essentials/minikql/computation/mkql_block_impl_codegen.h> // Y_IGNORE
  6. #include <yql/essentials/minikql/arrow/arrow_util.h>
  7. #include <yql/essentials/minikql/arrow/mkql_bit_utils.h>
  8. #include <yql/essentials/minikql/mkql_type_builder.h>
  9. #include <yql/essentials/minikql/computation/mkql_computation_node_holders.h>
  10. #include <yql/essentials/minikql/computation/mkql_computation_node_codegen.h> // Y_IGNORE
  11. #include <yql/essentials/minikql/mkql_node_builder.h>
  12. #include <yql/essentials/minikql/mkql_node_cast.h>
  13. namespace NKikimr {
  14. namespace NMiniKQL {
  15. namespace {
  16. class TCompressWithScalarBitmap : public TStatefulWideFlowCodegeneratorNode<TCompressWithScalarBitmap> {
  17. using TBaseComputation = TStatefulWideFlowCodegeneratorNode<TCompressWithScalarBitmap>;
  18. public:
  19. TCompressWithScalarBitmap(TComputationMutables& mutables, IComputationWideFlowNode* flow, ui32 bitmapIndex, ui32 width)
  20. : TBaseComputation(mutables, flow, EValueRepresentation::Embedded)
  21. , Flow_(flow)
  22. , BitmapIndex_(bitmapIndex)
  23. , Width_(width)
  24. , WideFieldsIndex_(mutables.IncrementWideFieldsIndex(Width_))
  25. {
  26. }
  27. EFetchResult DoCalculate(NUdf::TUnboxedValue& state, TComputationContext& ctx, NUdf::TUnboxedValue*const* output) const {
  28. if (state.IsFinish())
  29. return EFetchResult::Finish;
  30. const auto fields = ctx.WideFields.data() + WideFieldsIndex_;
  31. NUdf::TUnboxedValue bitmap;
  32. for (ui32 i = 0, outIndex = 0; i < Width_; ++i) {
  33. fields[i] = i == BitmapIndex_ ? &bitmap : output[outIndex++];
  34. }
  35. if (const auto result = Flow_->FetchValues(ctx, fields); EFetchResult::One != result)
  36. return result;
  37. const bool bitmapValue = GetBitmapScalarValue(bitmap) & 1;
  38. state = bitmapValue ? NUdf::TUnboxedValuePod() : NUdf::TUnboxedValuePod::MakeFinish();
  39. return bitmapValue ? EFetchResult::One : EFetchResult::Finish;
  40. }
  41. #ifndef MKQL_DISABLE_CODEGEN
  42. ICodegeneratorInlineWideNode::TGenerateResult DoGenGetValues(const TCodegenContext& ctx, Value* statePtr, BasicBlock*& block) const {
  43. auto& context = ctx.Codegen.GetContext();
  44. const auto valueType = Type::getInt128Ty(context);
  45. const auto statusType = Type::getInt32Ty(context);
  46. const auto bitmapType = Type::getInt8Ty(context);
  47. const auto name = "GetBitmapScalarValue";
  48. ctx.Codegen.AddGlobalMapping(name, reinterpret_cast<const void*>(&GetBitmapScalarValue));
  49. const auto getBitmapType = FunctionType::get(bitmapType, { valueType }, false);
  50. const auto getBitmap = ctx.Codegen.GetModule().getOrInsertFunction(name, getBitmapType);
  51. const auto work = BasicBlock::Create(context, "work", ctx.Func);
  52. const auto test = BasicBlock::Create(context, "test", ctx.Func);
  53. const auto over = BasicBlock::Create(context, "over", ctx.Func);
  54. const auto result = PHINode::Create(statusType, 3U, "result", over);
  55. result->addIncoming(ConstantInt::get(statusType, static_cast<i32>(EFetchResult::Finish)), block);
  56. BranchInst::Create(over, work, IsFinish(statePtr, block, context), block);
  57. block = work;
  58. const auto getres = GetNodeValues(Flow_, ctx, block);
  59. const auto special = CmpInst::Create(Instruction::ICmp, ICmpInst::ICMP_SLE, getres.first, ConstantInt::get(getres.first->getType(), static_cast<i32>(EFetchResult::Yield)), "special", block);
  60. result->addIncoming(getres.first, block);
  61. BranchInst::Create(over, test, special, block);
  62. block = test;
  63. const auto bitmapValue = getres.second[BitmapIndex_](ctx, block);
  64. const auto bitmap = CallInst::Create(getBitmap, { bitmapValue }, "bitmap", block);
  65. ValueCleanup(EValueRepresentation::Any, bitmapValue, ctx, block);
  66. const auto one = ConstantInt::get(bitmapType, 1);
  67. const auto band = BinaryOperator::CreateAnd(bitmap, one, "band", block);
  68. const auto good = CmpInst::Create(Instruction::ICmp, ICmpInst::ICMP_EQ, band, one, "good", block);
  69. const auto state = SelectInst::Create(good, GetEmpty(context), GetFinish(context), "state", block);
  70. new StoreInst(state, statePtr, block);
  71. const auto status = SelectInst::Create(good, ConstantInt::get(statusType, static_cast<i32>(EFetchResult::One)), ConstantInt::get(statusType, static_cast<i32>(EFetchResult::Finish)), "status", block);
  72. result->addIncoming(status, block);
  73. BranchInst::Create(over, block);
  74. block = over;
  75. ICodegeneratorInlineWideNode::TGettersList getters(getres.second.size() - 1U);
  76. for (ui32 i = 0, j = 0; i < getres.second.size(); ++i) {
  77. if (i != BitmapIndex_)
  78. getters[j++] = std::move(getres.second[i]);
  79. }
  80. return {result, std::move(getters)};
  81. }
  82. #endif
  83. private:
  84. void RegisterDependencies() const final {
  85. FlowDependsOn(Flow_);
  86. }
  87. IComputationWideFlowNode *const Flow_;
  88. const ui32 BitmapIndex_;
  89. const ui32 Width_;
  90. const ui32 WideFieldsIndex_;
  91. };
  92. class TCompressScalars : public TStatelessWideFlowCodegeneratorNode<TCompressScalars> {
  93. using TBaseComputation = TStatelessWideFlowCodegeneratorNode<TCompressScalars>;
  94. public:
  95. TCompressScalars(TComputationMutables& mutables, IComputationWideFlowNode* flow, ui32 bitmapIndex, ui32 width)
  96. : TBaseComputation(flow)
  97. , Flow_(flow)
  98. , BitmapIndex_(bitmapIndex)
  99. , Width_(width)
  100. , WideFieldsIndex_(mutables.IncrementWideFieldsIndex(Width_))
  101. {
  102. }
  103. EFetchResult DoCalculate(TComputationContext& ctx, NUdf::TUnboxedValue*const* output) const {
  104. const auto fields = ctx.WideFields.data() + WideFieldsIndex_;
  105. NUdf::TUnboxedValue bitmap;
  106. for (ui32 i = 0, outIndex = 0; i < Width_; ++i) {
  107. fields[i] = i == BitmapIndex_ ? &bitmap : output[outIndex++];
  108. }
  109. for (;;) {
  110. if (const auto result = Flow_->FetchValues(ctx, fields); EFetchResult::One != result)
  111. return result;
  112. if (const auto popCount = GetBitmapPopCountCount(bitmap)) {
  113. if (const auto out = output[Width_ - 2])
  114. *out = ctx.HolderFactory.CreateArrowBlock(arrow::Datum(std::make_shared<arrow::UInt64Scalar>(popCount)));
  115. break;
  116. }
  117. }
  118. return EFetchResult::One;
  119. }
  120. #ifndef MKQL_DISABLE_CODEGEN
  121. ICodegeneratorInlineWideNode::TGenerateResult DoGenGetValues(const TCodegenContext& ctx, BasicBlock*& block) const {
  122. auto& context = ctx.Codegen.GetContext();
  123. const auto valueType = Type::getInt128Ty(context);
  124. const auto statusType = Type::getInt32Ty(context);
  125. const auto sizeType = Type::getInt64Ty(context);
  126. const auto atTop = &ctx.Func->getEntryBlock().back();
  127. const auto sizePtr = new AllocaInst(valueType, 0U, "size_ptr", atTop);
  128. new StoreInst(ConstantInt::get(valueType, 0), sizePtr, atTop);
  129. const auto name = "GetBitmapPopCountCount";
  130. ctx.Codegen.AddGlobalMapping(name, reinterpret_cast<const void*>(&GetBitmapPopCountCount));
  131. const auto getPopCountType = FunctionType::get(sizeType, { valueType }, false);
  132. const auto getPopCount = ctx.Codegen.GetModule().getOrInsertFunction(name, getPopCountType);
  133. const auto loop = BasicBlock::Create(context, "loop", ctx.Func);
  134. const auto work = BasicBlock::Create(context, "work", ctx.Func);
  135. const auto fill = BasicBlock::Create(context, "fill", ctx.Func);
  136. const auto over = BasicBlock::Create(context, "over", ctx.Func);
  137. BranchInst::Create(loop, block);
  138. block = loop;
  139. const auto getres = GetNodeValues(Flow_, ctx, block);
  140. const auto special = CmpInst::Create(Instruction::ICmp, ICmpInst::ICMP_SLE, getres.first, ConstantInt::get(getres.first->getType(), static_cast<i32>(EFetchResult::Yield)), "special", block);
  141. const auto result = PHINode::Create(statusType, 2U, "result", over);
  142. result->addIncoming(getres.first, block);
  143. BranchInst::Create(over, work, special, block);
  144. block = work;
  145. const auto bitmapValue = getres.second[BitmapIndex_](ctx, block);
  146. const auto pops = CallInst::Create(getPopCount, { bitmapValue }, "pops", block);
  147. ValueCleanup(EValueRepresentation::Any, bitmapValue, ctx, block);
  148. const auto good = CmpInst::Create(Instruction::ICmp, ICmpInst::ICMP_UGT, pops, ConstantInt::get(sizeType, 0), "good", block);
  149. BranchInst::Create(fill, loop, good, block);
  150. block = fill;
  151. const auto makeCountFunc = ConstantInt::get(Type::getInt64Ty(context), GetMethodPtr(&MakeBlockCount));
  152. const auto makeCountType = FunctionType::get(valueType, {ctx.GetFactory()->getType(), pops->getType()}, false);
  153. const auto makeCountPtr = CastInst::Create(Instruction::IntToPtr, makeCountFunc, PointerType::getUnqual(makeCountType), "make_count_func", block);
  154. const auto slice = CallInst::Create(makeCountType, makeCountPtr, {ctx.GetFactory(), pops}, "slice", block);
  155. new StoreInst(slice, sizePtr, block);
  156. result->addIncoming(ConstantInt::get(statusType, static_cast<i32>(EFetchResult::One)), block);
  157. BranchInst::Create(over, block);
  158. block = over;
  159. ICodegeneratorInlineWideNode::TGettersList getters(getres.second.size() - 1U);
  160. for (ui32 i = 0, j = 0; i < getters.size(); ++i) {
  161. if (i != BitmapIndex_)
  162. getters[j++] = std::move(getres.second[i]);
  163. }
  164. getters.back() = [sizePtr, valueType](const TCodegenContext&, BasicBlock*& block) {
  165. return new LoadInst(valueType, sizePtr, "count", block);
  166. };
  167. return {result, std::move(getters)};
  168. }
  169. #endif
  170. private:
  171. void RegisterDependencies() const final {
  172. FlowDependsOn(Flow_);
  173. }
  174. IComputationWideFlowNode *const Flow_;
  175. const ui32 BitmapIndex_;
  176. const ui32 Width_;
  177. const ui32 WideFieldsIndex_;
  178. };
  179. size_t GetBitmapPopCount(const std::shared_ptr<arrow::ArrayData>& arr) {
  180. size_t len = (size_t)arr->length;
  181. MKQL_ENSURE(arr->GetNullCount() == 0, "Bitmap block should not have nulls");
  182. const ui8* src = arr->GetValues<ui8>(1);
  183. return GetSparseBitmapPopCount(src, len);
  184. }
  185. class TCompressBlocks : public TStatefulWideFlowCodegeneratorNode<TCompressBlocks> {
  186. using TBaseComputation = TStatefulWideFlowCodegeneratorNode<TCompressBlocks>;
  187. public:
  188. TCompressBlocks(TComputationMutables& mutables, IComputationWideFlowNode* flow, ui32 bitmapIndex, TVector<TBlockType*>&& types)
  189. : TBaseComputation(mutables, flow, EValueRepresentation::Boxed)
  190. , Flow_(flow)
  191. , BitmapIndex_(bitmapIndex)
  192. , Types_(std::move(types))
  193. , WideFieldsIndex_(mutables.IncrementWideFieldsIndex(Types_.size() + 2U))
  194. {
  195. }
  196. EFetchResult DoCalculate(NUdf::TUnboxedValue& state, TComputationContext& ctx, NUdf::TUnboxedValue*const* output) const {
  197. auto& s = GetState(state, ctx);
  198. const auto fields = ctx.WideFields.data() + WideFieldsIndex_;
  199. for (auto i = 0U, j = 0U; i <= Types_.size() + 1U; ++i) {
  200. if (BitmapIndex_ != i)
  201. fields[i] = &s.Values[j++];
  202. }
  203. NUdf::TUnboxedValue bitmap;
  204. fields[BitmapIndex_] = &bitmap;
  205. if (!s.Count) {
  206. do if (!s.InputSize_) {
  207. s.ClearValues();
  208. switch (Flow_->FetchValues(ctx, fields)) {
  209. case EFetchResult::Yield:
  210. return EFetchResult::Yield;
  211. case EFetchResult::Finish:
  212. s.IsFinished_ = true;
  213. break;
  214. case EFetchResult::One:
  215. switch (s.Check(bitmap)) {
  216. case TState::EStep::Copy:
  217. for (ui32 i = 0; i < s.Values.size(); ++i) {
  218. if (const auto out = output[i]) {
  219. *out = s.Values[i];
  220. }
  221. }
  222. return EFetchResult::One;
  223. case TState::EStep::Skip:
  224. continue;
  225. case TState::EStep::Pass:
  226. break;
  227. }
  228. break;
  229. }
  230. } while (!s.IsFinished_ && s.Sparse());
  231. if (s.OutputPos_)
  232. s.FlushBuffers(ctx.HolderFactory);
  233. else
  234. return EFetchResult::Finish;
  235. }
  236. const auto sliceSize = s.Slice();
  237. for (size_t i = 0; i <= Types_.size(); ++i) {
  238. if (const auto out = output[i]) {
  239. *out = s.Get(sliceSize, ctx.HolderFactory, i);
  240. }
  241. }
  242. return EFetchResult::One;
  243. }
  244. #ifndef MKQL_DISABLE_CODEGEN
  245. ICodegeneratorInlineWideNode::TGenerateResult DoGenGetValues(const TCodegenContext& ctx, Value* statePtr, BasicBlock*& block) const {
  246. auto& context = ctx.Codegen.GetContext();
  247. const auto width = Types_.size() + 1U;
  248. const auto valueType = Type::getInt128Ty(context);
  249. const auto statusType = Type::getInt32Ty(context);
  250. const auto indexType = Type::getInt64Ty(context);
  251. const auto arrayType = ArrayType::get(valueType, width);
  252. const auto ptrValuesType = PointerType::getUnqual(arrayType);
  253. TLLVMFieldsStructureState stateFields(context, width);
  254. const auto stateType = StructType::get(context, stateFields.GetFieldsArray());
  255. const auto statePtrType = PointerType::getUnqual(stateType);
  256. const auto atTop = &ctx.Func->getEntryBlock().back();
  257. const auto getFunc = ConstantInt::get(Type::getInt64Ty(context), GetMethodPtr(&TState::Get));
  258. const auto getType = FunctionType::get(valueType, {statePtrType, indexType, ctx.GetFactory()->getType(), indexType}, false);
  259. const auto getPtr = CastInst::Create(Instruction::IntToPtr, getFunc, PointerType::getUnqual(getType), "get", atTop);
  260. const auto heightPtr = new AllocaInst(indexType, 0U, "height_ptr", atTop);
  261. const auto stateOnStack = new AllocaInst(statePtrType, 0U, "state_on_stack", atTop);
  262. new StoreInst(ConstantInt::get(indexType, 0), heightPtr, atTop);
  263. new StoreInst(ConstantPointerNull::get(statePtrType), stateOnStack, atTop);
  264. const auto make = BasicBlock::Create(context, "make", ctx.Func);
  265. const auto main = BasicBlock::Create(context, "main", ctx.Func);
  266. const auto loop = BasicBlock::Create(context, "loop", ctx.Func);
  267. const auto more = BasicBlock::Create(context, "more", ctx.Func);
  268. const auto read = BasicBlock::Create(context, "read", ctx.Func);
  269. const auto stop = BasicBlock::Create(context, "stop", ctx.Func);
  270. const auto good = BasicBlock::Create(context, "good", ctx.Func);
  271. const auto save = BasicBlock::Create(context, "save", ctx.Func);
  272. const auto work = BasicBlock::Create(context, "work", ctx.Func);
  273. const auto tail = BasicBlock::Create(context, "tail", ctx.Func);
  274. const auto done = BasicBlock::Create(context, "done", ctx.Func);
  275. const auto fill = BasicBlock::Create(context, "fill", ctx.Func);
  276. const auto over = BasicBlock::Create(context, "over", ctx.Func);
  277. BranchInst::Create(make, main, IsInvalid(statePtr, block, context), block);
  278. block = make;
  279. const auto ptrType = PointerType::getUnqual(StructType::get(context));
  280. const auto self = CastInst::Create(Instruction::IntToPtr, ConstantInt::get(Type::getInt64Ty(context), uintptr_t(this)), ptrType, "self", block);
  281. const auto makeFunc = ConstantInt::get(Type::getInt64Ty(context), GetMethodPtr(&TCompressBlocks::MakeState));
  282. const auto makeType = FunctionType::get(Type::getVoidTy(context), {self->getType(), ctx.Ctx->getType(), statePtr->getType()}, false);
  283. const auto makeFuncPtr = CastInst::Create(Instruction::IntToPtr, makeFunc, PointerType::getUnqual(makeType), "function", block);
  284. CallInst::Create(makeType, makeFuncPtr, {self, ctx.Ctx, statePtr}, "", block);
  285. BranchInst::Create(main, block);
  286. block = main;
  287. const auto state = new LoadInst(valueType, statePtr, "state", block);
  288. const auto half = CastInst::Create(Instruction::Trunc, state, Type::getInt64Ty(context), "half", block);
  289. const auto stateArg = CastInst::Create(Instruction::IntToPtr, half, statePtrType, "state_arg", block);
  290. const auto countPtr = GetElementPtrInst::CreateInBounds(stateType, stateArg, { stateFields.This(), stateFields.GetCount() }, "count_ptr", block);
  291. const auto inputSizePtr = GetElementPtrInst::CreateInBounds(stateType, stateArg, { stateFields.This(), stateFields.GetInputSize() }, "input_size_ptr", block);
  292. BranchInst::Create(loop, block);
  293. block = loop;
  294. const auto count = new LoadInst(indexType, countPtr, "count", block);
  295. const auto next = CmpInst::Create(Instruction::ICmp, ICmpInst::ICMP_EQ, count, ConstantInt::get(indexType, 0), "next", block);
  296. BranchInst::Create(more, fill, next, block);
  297. block = more;
  298. const auto inputSize = new LoadInst(indexType, inputSizePtr, "input_size", block);
  299. const auto zero = CmpInst::Create(Instruction::ICmp, ICmpInst::ICMP_EQ, inputSize, ConstantInt::get(indexType, 0), "zero", block);
  300. BranchInst::Create(read, work, zero, block);
  301. block = read;
  302. const auto clearFunc = ConstantInt::get(Type::getInt64Ty(context), GetMethodPtr(&TState::ClearValues));
  303. const auto clearType = FunctionType::get(Type::getVoidTy(context), {statePtrType}, false);
  304. const auto clearPtr = CastInst::Create(Instruction::IntToPtr, clearFunc, PointerType::getUnqual(clearType), "clear", block);
  305. CallInst::Create(clearType, clearPtr, {stateArg}, "", block);
  306. const auto getres = GetNodeValues(Flow_, ctx, block);
  307. new StoreInst(ConstantInt::get(indexType, 0), heightPtr, block);
  308. const auto result = PHINode::Create(statusType, 4U, "result", over);
  309. result->addIncoming(getres.first, block);
  310. const auto way = SwitchInst::Create(getres.first, good, 2U, block);
  311. way->addCase(ConstantInt::get(statusType, i32(EFetchResult::Finish)), stop);
  312. way->addCase(ConstantInt::get(statusType, i32(EFetchResult::Yield)), over);
  313. block = stop;
  314. const auto finishPtr = GetElementPtrInst::CreateInBounds(stateType, stateArg, { stateFields.This(), stateFields.GetIsFinished() }, "finish_ptr", block);
  315. new StoreInst(ConstantInt::getTrue(context), finishPtr, block);
  316. BranchInst::Create(tail, block);
  317. block = good;
  318. const auto bitmap = getres.second[BitmapIndex_](ctx, block);
  319. const auto bitmapArg = bitmap;
  320. const auto stepType = Type::getInt8Ty(context);
  321. const auto checkFunc = ConstantInt::get(Type::getInt64Ty(context), GetMethodPtr(&TState::Check));
  322. const auto checkType = FunctionType::get(stepType, {statePtrType, bitmapArg->getType()}, false);
  323. const auto checkPtr = CastInst::Create(Instruction::IntToPtr, checkFunc, PointerType::getUnqual(checkType), "check_func", block);
  324. const auto check = CallInst::Create(checkType, checkPtr, {stateArg, bitmapArg}, "check", block);
  325. ValueCleanup(EValueRepresentation::Any, bitmap, ctx, block);
  326. result->addIncoming(ConstantInt::get(statusType, static_cast<i32>(EFetchResult::One)), block);
  327. const auto step = SwitchInst::Create(check, save, 2U, block);
  328. step->addCase(ConstantInt::get(stepType, i8(TState::EStep::Skip)), read);
  329. step->addCase(ConstantInt::get(stepType, i8(TState::EStep::Copy)), over);
  330. block = save;
  331. const auto valuesPtr = GetElementPtrInst::CreateInBounds(stateType, stateArg, { stateFields.This(), stateFields.GetPointer() }, "values_ptr", block);
  332. const auto values = new LoadInst(ptrValuesType, valuesPtr, "values", block);
  333. for (size_t idx = 0U; idx <= Types_.size(); ++idx) {
  334. const auto pointer = GetElementPtrInst::CreateInBounds(arrayType, values, { ConstantInt::get(indexType, 0), ConstantInt::get(indexType, idx) }, "pointer", block);
  335. const auto value = getres.second[idx < BitmapIndex_ ? idx : idx + 1U](ctx, block);
  336. new StoreInst(value, pointer, block);
  337. AddRefBoxed(value, ctx, block);
  338. }
  339. BranchInst::Create(work, block);
  340. block = work;
  341. const auto sparseFunc = ConstantInt::get(Type::getInt64Ty(context), GetMethodPtr(&TState::Sparse));
  342. const auto sparseType = FunctionType::get(Type::getInt1Ty(context), {statePtrType}, false);
  343. const auto sparsePtr = CastInst::Create(Instruction::IntToPtr, sparseFunc, PointerType::getUnqual(sparseType), "sparse_func", block);
  344. const auto sparse = CallInst::Create(sparseType, sparsePtr, {stateArg}, "sparse", block);
  345. BranchInst::Create(loop, tail, sparse, block);
  346. block = tail;
  347. const auto outputPosPtr = GetElementPtrInst::CreateInBounds(stateType, stateArg, { stateFields.This(), stateFields.GetOutputPos() }, "output_pos_ptr", block);
  348. const auto outputPos = new LoadInst(indexType, outputPosPtr, "output_pos", block);
  349. const auto empty = CmpInst::Create(Instruction::ICmp, ICmpInst::ICMP_EQ, outputPos, ConstantInt::get(indexType, 0), "empty", block);
  350. result->addIncoming(ConstantInt::get(statusType, static_cast<i32>(EFetchResult::Finish)), block);
  351. BranchInst::Create(over, done, empty, block);
  352. block = done;
  353. const auto flushFunc = ConstantInt::get(Type::getInt64Ty(context), GetMethodPtr(&TState::FlushBuffers));
  354. const auto flushType = FunctionType::get(Type::getVoidTy(context), {statePtrType, ctx.GetFactory()->getType()}, false);
  355. const auto flushPtr = CastInst::Create(Instruction::IntToPtr, flushFunc, PointerType::getUnqual(flushType), "flush_func", block);
  356. CallInst::Create(flushType, flushPtr, {stateArg, ctx.GetFactory()}, "", block);
  357. BranchInst::Create(fill, block);
  358. block = fill;
  359. const auto sliceFunc = ConstantInt::get(Type::getInt64Ty(context), GetMethodPtr(&TState::Slice));
  360. const auto sliceType = FunctionType::get(indexType, {statePtrType}, false);
  361. const auto slicePtr = CastInst::Create(Instruction::IntToPtr, sliceFunc, PointerType::getUnqual(sliceType), "slice_func", block);
  362. const auto slice = CallInst::Create(sliceType, slicePtr, {stateArg}, "slice", block);
  363. new StoreInst(slice, heightPtr, block);
  364. new StoreInst(stateArg, stateOnStack, block);
  365. result->addIncoming(ConstantInt::get(statusType, static_cast<i32>(EFetchResult::One)), block);
  366. BranchInst::Create(over, block);
  367. block = over;
  368. ICodegeneratorInlineWideNode::TGettersList getters(width);
  369. for (size_t idx = 0U; idx < getters.size(); ++idx) {
  370. getters[idx] = [idx, getType, getPtr, heightPtr, indexType, valueType, statePtrType, stateOnStack, getter = getres.second[idx < BitmapIndex_ ? idx : idx + 1U]](const TCodegenContext& ctx, BasicBlock*& block) {
  371. auto& context = ctx.Codegen.GetContext();
  372. const auto pass = BasicBlock::Create(context, "pass", ctx.Func);
  373. const auto call = BasicBlock::Create(context, "call", ctx.Func);
  374. const auto done = BasicBlock::Create(context, "done", ctx.Func);
  375. const auto result = PHINode::Create(valueType, 2U, "result", done);
  376. const auto height = new LoadInst(indexType, heightPtr, "height", block);
  377. const auto zero = CmpInst::Create(Instruction::ICmp, ICmpInst::ICMP_EQ, height, ConstantInt::get(indexType, 0), "zero", block);
  378. BranchInst::Create(pass, call, zero, block);
  379. block = pass;
  380. const auto source = getter(ctx, block);
  381. result->addIncoming(source, block);
  382. BranchInst::Create(done, block);
  383. block = call;
  384. const auto stateArg = new LoadInst(statePtrType, stateOnStack, "state", block);
  385. const auto value = CallInst::Create(getType, getPtr, {stateArg, height, ctx.GetFactory(), ConstantInt::get(indexType, idx)}, "value", block);
  386. result->addIncoming(value, block);
  387. BranchInst::Create(done, block);
  388. block = done;
  389. return result;
  390. };
  391. }
  392. return {result, std::move(getters)};
  393. }
  394. #endif
  395. private:
  396. struct TState : public TBlockState {
  397. size_t InputSize_ = 0;
  398. size_t OutputPos_ = 0;
  399. bool IsFinished_ = false;
  400. const size_t MaxLength_;
  401. std::vector<std::shared_ptr<arrow::ArrayData>> Arrays_;
  402. std::vector<std::unique_ptr<IArrayBuilder>> Builders_;
  403. NYql::NUdf::TCounter CounterOutputRows_;
  404. TState(TMemoryUsageInfo* memInfo, TComputationContext& ctx, const TVector<TBlockType*>& types)
  405. : TBlockState(memInfo, types.size() + 1U)
  406. , MaxLength_(CalcBlockLen(std::accumulate(types.cbegin(), types.cend(), 0ULL, [](size_t max, const TBlockType* type){ return std::max(max, CalcMaxBlockItemSize(type->GetItemType())); })))
  407. , Arrays_(types.size() + 1U)
  408. , Builders_(types.size())
  409. {
  410. for (ui32 i = 0; i < types.size(); ++i) {
  411. if (types[i]->GetShape() != TBlockType::EShape::Scalar) {
  412. Builders_[i] = MakeArrayBuilder(TTypeInfoHelper(), types[i]->GetItemType(), ctx.ArrowMemoryPool, MaxLength_, &ctx.Builder->GetPgBuilder());
  413. }
  414. }
  415. if (ctx.CountersProvider) {
  416. // id will be assigned externally in future versions
  417. TString id = TString(Operator_Filter) + "0";
  418. CounterOutputRows_ = ctx.CountersProvider->GetCounter(id, Counter_OutputRows, false);
  419. }
  420. }
  421. enum class EStep : i8 {
  422. Copy = -1,
  423. Skip = 0,
  424. Pass = 1
  425. };
  426. EStep Check(const NUdf::TUnboxedValuePod bitmapValue) {
  427. Y_ABORT_UNLESS(!IsFinished_);
  428. Y_ABORT_UNLESS(!InputSize_);
  429. auto& bitmap = Arrays_.back();
  430. bitmap = TArrowBlock::From(bitmapValue).GetDatum().array();
  431. if (!bitmap->length)
  432. return EStep::Skip;
  433. const auto popCount = GetBitmapPopCount(bitmap);
  434. CounterOutputRows_.Add(popCount);
  435. if (!popCount)
  436. return EStep::Skip;
  437. if (!OutputPos_ && ui64(bitmap->length) == popCount)
  438. return EStep::Copy;
  439. return EStep::Pass;
  440. }
  441. bool Sparse() {
  442. auto& bitmap = Arrays_.back();
  443. if (!InputSize_) {
  444. InputSize_ = bitmap->length;
  445. for (size_t i = 0; i < Builders_.size(); ++i) {
  446. if (Builders_[i]) {
  447. Arrays_[i] = TArrowBlock::From(Values[i]).GetDatum().array();
  448. Y_ABORT_UNLESS(ui64(Arrays_[i]->length) == InputSize_);
  449. }
  450. }
  451. }
  452. size_t outputAvail = MaxLength_ - OutputPos_;
  453. size_t takeInputLen = 0;
  454. size_t takeInputPopcnt = 0;
  455. const auto bitmapData = bitmap->GetValues<ui8>(1);
  456. while (takeInputPopcnt < outputAvail && takeInputLen < InputSize_) {
  457. takeInputPopcnt += bitmapData[takeInputLen++];
  458. }
  459. Y_ABORT_UNLESS(takeInputLen > 0);
  460. for (size_t i = 0; i < Builders_.size(); ++i) {
  461. if (Builders_[i]) {
  462. auto& arr = Arrays_[i];
  463. auto& builder = Builders_[i];
  464. auto slice = Chop(arr, takeInputLen);
  465. builder->AddMany(*slice, takeInputPopcnt, bitmapData, takeInputLen);
  466. }
  467. }
  468. Chop(bitmap, takeInputLen);
  469. OutputPos_ += takeInputPopcnt;
  470. InputSize_ -= takeInputLen;
  471. return MaxLength_ > OutputPos_;
  472. }
  473. void FlushBuffers(const THolderFactory& holderFactory) {
  474. for (ui32 i = 0; i < Builders_.size(); ++i) {
  475. if (Builders_[i])
  476. Values[i] = holderFactory.CreateArrowBlock(Builders_[i]->Build(IsFinished_));
  477. }
  478. Values.back() = MakeBlockCount(holderFactory, OutputPos_);
  479. OutputPos_ = 0;
  480. FillArrays();
  481. }
  482. };
  483. #ifndef MKQL_DISABLE_CODEGEN
  484. class TLLVMFieldsStructureState: public TLLVMFieldsStructureBlockState {
  485. private:
  486. using TBase = TLLVMFieldsStructureBlockState;
  487. llvm::IntegerType*const InputSizeType;
  488. llvm::IntegerType*const OutputPosType;
  489. llvm::IntegerType*const IsFinishedType;
  490. protected:
  491. using TBase::Context;
  492. public:
  493. std::vector<llvm::Type*> GetFieldsArray() {
  494. std::vector<llvm::Type*> result = TBase::GetFieldsArray();
  495. result.emplace_back(InputSizeType);
  496. result.emplace_back(OutputPosType);
  497. result.emplace_back(IsFinishedType);
  498. return result;
  499. }
  500. llvm::Constant* GetInputSize() {
  501. return ConstantInt::get(Type::getInt32Ty(Context), TBase::GetFieldsCount() + BaseFields);
  502. }
  503. llvm::Constant* GetOutputPos() {
  504. return ConstantInt::get(Type::getInt32Ty(Context), TBase::GetFieldsCount() + BaseFields + 1);
  505. }
  506. llvm::Constant* GetIsFinished() {
  507. return ConstantInt::get(Type::getInt32Ty(Context), TBase::GetFieldsCount() + BaseFields + 2);
  508. }
  509. TLLVMFieldsStructureState(llvm::LLVMContext& context, size_t width)
  510. : TBase(context, width)
  511. , InputSizeType(Type::getInt64Ty(Context))
  512. , OutputPosType(Type::getInt64Ty(Context))
  513. , IsFinishedType(Type::getInt1Ty(Context))
  514. {}
  515. };
  516. #endif
  517. void RegisterDependencies() const final {
  518. FlowDependsOn(Flow_);
  519. }
  520. void MakeState(TComputationContext& ctx, NUdf::TUnboxedValue& state) const {
  521. state = ctx.HolderFactory.Create<TState>(ctx, Types_);
  522. }
  523. TState& GetState(NUdf::TUnboxedValue& state, TComputationContext& ctx) const {
  524. if (state.IsInvalid())
  525. MakeState(ctx, state);
  526. return *static_cast<TState*>(state.AsBoxed().Get());
  527. }
  528. IComputationWideFlowNode* const Flow_;
  529. const ui32 BitmapIndex_;
  530. const TVector<TBlockType*> Types_;
  531. const size_t WideFieldsIndex_;
  532. };
  533. } // namespace
  534. IComputationNode* WrapBlockCompress(TCallable& callable, const TComputationNodeFactoryContext& ctx) {
  535. MKQL_ENSURE(callable.GetInputsCount() == 2, "Expected 2 args, got " << callable.GetInputsCount());
  536. const auto flowType = AS_TYPE(TFlowType, callable.GetInput(0).GetStaticType());
  537. const auto wideComponents = GetWideComponents(flowType);
  538. const ui32 width = wideComponents.size();
  539. MKQL_ENSURE(width > 1, "Expected at least two columns");
  540. const auto indexData = AS_VALUE(TDataLiteral, callable.GetInput(1U));
  541. const auto index = indexData->AsValue().Get<ui32>();
  542. MKQL_ENSURE(index < width - 1, "Bad bitmap index");
  543. TVector<TBlockType*> types;
  544. types.reserve(width - 2U);
  545. bool bitmapIsScalar = false;
  546. bool allScalars = true;
  547. for (ui32 i = 0; i < width; ++i) {
  548. types.push_back(AS_TYPE(TBlockType, wideComponents[i]));
  549. const bool isScalar = types.back()->GetShape() == TBlockType::EShape::Scalar;
  550. if (i == width - 1) {
  551. MKQL_ENSURE(isScalar, "Expecting scalar block size as last column");
  552. bool isOptional;
  553. TDataType* unpacked = UnpackOptionalData(types.back()->GetItemType(), isOptional);
  554. auto slot = *unpacked->GetDataSlot();
  555. MKQL_ENSURE(!isOptional && slot == NUdf::EDataSlot::Uint64, "Expecting Uint64 as last column");
  556. types.pop_back();
  557. } else if (i == index) {
  558. bool isOptional;
  559. TDataType* unpacked = UnpackOptionalData(types.back()->GetItemType(), isOptional);
  560. auto slot = *unpacked->GetDataSlot();
  561. MKQL_ENSURE(!isOptional && slot == NUdf::EDataSlot::Bool, "Expecting Bool as bitmap column");
  562. bitmapIsScalar = isScalar;
  563. types.pop_back();
  564. } else {
  565. allScalars = allScalars && isScalar;
  566. }
  567. }
  568. const auto wideFlow = dynamic_cast<IComputationWideFlowNode*>(LocateNode(ctx.NodeLocator, callable, 0));
  569. MKQL_ENSURE(wideFlow != nullptr, "Expected wide flow node");
  570. if (bitmapIsScalar) {
  571. return new TCompressWithScalarBitmap(ctx.Mutables, wideFlow, index, width);
  572. } else if (allScalars) {
  573. return new TCompressScalars(ctx.Mutables, wideFlow, index, width);
  574. }
  575. return new TCompressBlocks(ctx.Mutables, wideFlow, index, std::move(types));
  576. }
  577. } // namespace NMiniKQL
  578. } // namespace NKikimr