12345678910111213141516171819202122232425262728293031323334353637383940414243444546474849505152535455565758596061626364656667686970717273747576777879808182838485868788899091929394959697989910010110210310410510610710810911011111211311411511611711811912012112212312412512612712812913013113213313413513613713813914014114214314414514614714814915015115215315415515615715815916016116216316416516616716816917017117217317417517617717817918018118218318418518618718818919019119219319419519619719819920020120220320420520620720820921021121221321421521621721821922022122222322422522622722822923023123223323423523623723823924024124224324424524624724824925025125225325425525625725825926026126226326426526626726826927027127227327427527627727827928028128228328428528628728828929029129229329429529629729829930030130230330430530630730830931031131231331431531631731831932032132232332432532632732832933033133233333433533633733833934034134234334434534634734834935035135235335435535635735835936036136236336436536636736836937037137237337437537637737837938038138238338438538638738838939039139239339439539639739839940040140240340440540640740840941041141241341441541641741841942042142242342442542642742842943043143243343443543643743843944044144244344444544644744844945045145245345445545645745845946046146246346446546646746846947047147247347447547647747847948048148248348448548648748848949049149249349449549649749849950050150250350450550650750850951051151251351451551651751851952052152252352452552652752852953053153253353453553653753853954054154254354454554654754854955055155255355455555655755855956056156256356456556656756856957057157257357457557657757857958058158258358458558658758858959059159259359459559659759859960060160260360460560660760860961061161261361461561661761861962062162262362462562662762862963063163263363463563663763863964064164264364464564664764864965065165265365465565665765865966066166266366466566666766866967067167267367467567667767867968068168268368468568668768868969069169269369469569669769869970070170270370470570670770870971071171271371471571671771871972072172272372472572672772872973073173273373473573673773873974074174274374474574674774874975075175275375475575675775875976076176276376476576676776876977077177277377477577677777877978078178278378478578678778878979079179279379479579679779879980080180280380480580680780880981081181281381481581681781881982082182282382482582682782882983083183283383483583683783883984084184284384484584684784884985085185285385485585685785885986086186286386486586686786886987087187287387487587687787887988088188288388488588688788888989089189289389489589689789889990090190290390490590690790890991091191291391491591691791891992092192292392492592692792892993093193293393493593693793893994094194294394494594694794894995095195295395495595695795895996096196296396496596696796896997097197297397497597697797897998098198298398498598698798898999099199299399499599699799899910001001100210031004100510061007100810091010101110121013101410151016101710181019102010211022102310241025102610271028102910301031103210331034103510361037103810391040104110421043104410451046104710481049105010511052105310541055105610571058105910601061106210631064106510661067106810691070107110721073107410751076107710781079108010811082108310841085108610871088108910901091109210931094109510961097109810991100110111021103110411051106110711081109111011111112111311141115111611171118111911201121112211231124112511261127112811291130113111321133113411351136113711381139114011411142114311441145114611471148114911501151115211531154115511561157115811591160116111621163116411651166116711681169117011711172117311741175117611771178117911801181118211831184118511861187118811891190119111921193119411951196119711981199120012011202120312041205120612071208120912101211121212131214121512161217121812191220122112221223122412251226122712281229123012311232123312341235123612371238123912401241124212431244124512461247124812491250125112521253125412551256125712581259126012611262126312641265126612671268126912701271127212731274127512761277127812791280128112821283128412851286128712881289129012911292129312941295129612971298129913001301130213031304130513061307130813091310131113121313131413151316131713181319132013211322132313241325132613271328132913301331133213331334133513361337133813391340134113421343134413451346134713481349135013511352135313541355135613571358135913601361136213631364136513661367136813691370137113721373137413751376137713781379138013811382138313841385138613871388138913901391139213931394139513961397139813991400140114021403140414051406140714081409141014111412141314141415141614171418141914201421142214231424142514261427142814291430143114321433143414351436143714381439144014411442144314441445144614471448144914501451145214531454145514561457145814591460146114621463146414651466146714681469147014711472147314741475147614771478147914801481148214831484148514861487148814891490149114921493149414951496149714981499150015011502150315041505150615071508150915101511151215131514151515161517151815191520152115221523152415251526152715281529153015311532153315341535153615371538 |
- #include "mkql_block_map_join.h"
- #include <yql/essentials/minikql/computation/mkql_block_builder.h>
- #include <yql/essentials/minikql/computation/mkql_block_impl.h>
- #include <yql/essentials/minikql/computation/mkql_block_reader.h>
- #include <yql/essentials/minikql/computation/mkql_block_trimmer.h>
- #include <yql/essentials/minikql/computation/mkql_computation_node_holders_codegen.h>
- #include <yql/essentials/minikql/comp_nodes/mkql_rh_hash.h>
- #include <yql/essentials/minikql/invoke_builtins/mkql_builtins.h>
- #include <yql/essentials/minikql/mkql_block_map_join_utils.h>
- #include <yql/essentials/minikql/mkql_node_cast.h>
- #include <yql/essentials/minikql/mkql_program_builder.h>
- #include <util/generic/serialized_enum.h>
- namespace NKikimr {
- namespace NMiniKQL {
- namespace {
- size_t CalcMaxBlockLength(const TVector<TType*>& items) {
- return CalcBlockLen(std::accumulate(items.cbegin(), items.cend(), 0ULL,
- [](size_t max, const TType* type) {
- const TType* itemType = AS_TYPE(TBlockType, type)->GetItemType();
- return std::max(max, CalcMaxBlockItemSize(itemType));
- }));
- }
- ui64 CalculateTupleHash(const std::vector<ui64>& hashes) {
- ui64 hash = 0;
- for (size_t i = 0; i < hashes.size(); i++) {
- if (!hashes[i]) {
- return 0;
- }
- hash = CombineHashes(hash, hashes[i]);
- }
- return hash;
- }
- template <bool RightRequired>
- class TBlockJoinState : public TBlockState {
- public:
- TBlockJoinState(TMemoryUsageInfo* memInfo, TComputationContext& ctx,
- const TVector<TType*>& inputItems,
- const TVector<ui32>& leftIOMap,
- const TVector<TType*> outputItems)
- : TBlockState(memInfo, outputItems.size())
- , InputWidth_(inputItems.size() - 1)
- , OutputWidth_(outputItems.size() - 1)
- , Inputs_(inputItems.size())
- , LeftIOMap_(leftIOMap)
- , InputsDescr_(ToValueDescr(inputItems))
- {
- const auto& pgBuilder = ctx.Builder->GetPgBuilder();
- MaxLength_ = CalcMaxBlockLength(outputItems);
- TBlockTypeHelper helper;
- for (size_t i = 0; i < inputItems.size(); i++) {
- TType* blockItemType = AS_TYPE(TBlockType, inputItems[i])->GetItemType();
- Readers_.push_back(MakeBlockReader(TTypeInfoHelper(), blockItemType));
- Converters_.push_back(MakeBlockItemConverter(TTypeInfoHelper(), blockItemType, pgBuilder));
- Hashers_.push_back(helper.MakeHasher(blockItemType));
- }
- // The last output column (i.e. block length) doesn't require a block builder.
- for (size_t i = 0; i < OutputWidth_; i++) {
- const TType* blockItemType = AS_TYPE(TBlockType, outputItems[i])->GetItemType();
- Builders_.push_back(MakeArrayBuilder(TTypeInfoHelper(), blockItemType, ctx.ArrowMemoryPool, MaxLength_, &pgBuilder, &BuilderAllocatedSize_));
- }
- MaxBuilderAllocatedSize_ = MaxAllocatedFactor_ * BuilderAllocatedSize_;
- }
- void CopyRow() {
- // Copy items from the "left" stream.
- // Use the mapping from input fields to output ones to
- // produce a tight loop to copy row items.
- for (size_t i = 0; i < LeftIOMap_.size(); i++) {
- AddItem(GetItem(LeftIOMap_[i]), i);
- }
- OutputRows_++;
- }
- void MakeRow(const NUdf::TUnboxedValuePod& value) {
- size_t builderIndex = 0;
- // Copy items from the "left" stream.
- // Use the mapping from input fields to output ones to
- // produce a tight loop to copy row items.
- for (size_t i = 0; i < LeftIOMap_.size(); i++, builderIndex++) {
- AddItem(GetItem(LeftIOMap_[i]), i);
- }
- // Convert and append items from the "right" dict.
- // Since the keys are copied to the output only from the
- // "left" stream, process all values unconditionally.
- if constexpr (RightRequired) {
- for (size_t i = 0; builderIndex < OutputWidth_; i++) {
- AddValue(value.GetElement(i), builderIndex++);
- }
- } else {
- if (value) {
- for (size_t i = 0; builderIndex < OutputWidth_; i++) {
- AddValue(value.GetElement(i), builderIndex++);
- }
- } else {
- while (builderIndex < OutputWidth_) {
- AddValue(value, builderIndex++);
- }
- }
- }
- OutputRows_++;
- }
- void MakeRow(const std::vector<NYql::NUdf::TBlockItem>& rightColumns) {
- size_t builderIndex = 0;
- for (size_t i = 0; i < LeftIOMap_.size(); i++, builderIndex++) {
- AddItem(GetItem(LeftIOMap_[i]), builderIndex);
- }
- if (!rightColumns.empty()) {
- Y_ENSURE(LeftIOMap_.size() + rightColumns.size() == OutputWidth_);
- for (size_t i = 0; i < rightColumns.size(); i++) {
- AddItem(rightColumns[i], builderIndex++);
- }
- } else {
- while (builderIndex < OutputWidth_) {
- AddItem(TBlockItem(), builderIndex++);
- }
- }
- OutputRows_++;
- }
- void MakeBlocks(const THolderFactory& holderFactory) {
- Values.back() = holderFactory.CreateArrowBlock(arrow::Datum(std::make_shared<arrow::UInt64Scalar>(OutputRows_)));
- OutputRows_ = 0;
- BuilderAllocatedSize_ = 0;
- for (size_t i = 0; i < Builders_.size(); i++) {
- Values[i] = holderFactory.CreateArrowBlock(Builders_[i]->Build(IsFinished_));
- }
- FillArrays();
- }
- TBlockItem GetItem(size_t idx, size_t offset = 0) const {
- Y_ENSURE(Current_ + offset < InputRows_);
- const auto& datum = TArrowBlock::From(Inputs_[idx]).GetDatum();
- ARROW_DEBUG_CHECK_DATUM_TYPES(InputsDescr_[idx], datum.descr());
- if (datum.is_scalar()) {
- return Readers_[idx]->GetScalarItem(*datum.scalar());
- }
- MKQL_ENSURE(datum.is_array(), "Expecting array");
- return Readers_[idx]->GetItem(*datum.array(), Current_ + offset);
- }
- std::pair<TBlockItem, ui64> GetItemWithHash(size_t idx, size_t offset) const {
- auto item = GetItem(idx, offset);
- ui64 hash = Hashers_[idx]->Hash(item);
- return std::make_pair(item, hash);
- }
- NUdf::TUnboxedValuePod GetValue(const THolderFactory& holderFactory, size_t idx) const {
- return Converters_[idx]->MakeValue(GetItem(idx), holderFactory);
- }
- void Reset() {
- Current_ = 0;
- InputRows_ = GetBlockCount(Inputs_.back());
- }
- void Finish() {
- IsFinished_ = true;
- }
- void NextRow() {
- Current_++;
- }
- bool HasBlocks() {
- return Count > 0;
- }
- bool IsNotFull() const {
- return OutputRows_ < MaxLength_
- && BuilderAllocatedSize_ <= MaxBuilderAllocatedSize_;
- }
- bool IsEmpty() const {
- return OutputRows_ == 0;
- }
- bool IsFinished() const {
- return IsFinished_;
- }
- size_t RemainingRowsCount() const {
- Y_ENSURE(InputRows_ >= Current_);
- return InputRows_ - Current_;
- }
- NUdf::TUnboxedValue* GetRawInputFields() {
- return Inputs_.data();
- }
- size_t GetInputWidth() const {
- // Mind the last block length column.
- return InputWidth_ + 1;
- }
- size_t GetOutputWidth() const {
- // Mind the last block length column.
- return OutputWidth_ + 1;
- }
- private:
- void AddItem(const TBlockItem& item, size_t idx) {
- Builders_[idx]->Add(item);
- }
- void AddValue(const NUdf::TUnboxedValuePod& value, size_t idx) {
- Builders_[idx]->Add(value);
- }
- size_t Current_ = 0;
- bool IsFinished_ = false;
- size_t MaxLength_;
- size_t BuilderAllocatedSize_ = 0;
- size_t MaxBuilderAllocatedSize_ = 0;
- static const size_t MaxAllocatedFactor_ = 4;
- size_t InputRows_ = 0;
- size_t OutputRows_ = 0;
- size_t InputWidth_;
- size_t OutputWidth_;
- TUnboxedValueVector Inputs_;
- const TVector<ui32> LeftIOMap_;
- const std::vector<arrow::ValueDescr> InputsDescr_;
- TVector<std::unique_ptr<IBlockReader>> Readers_;
- TVector<std::unique_ptr<IBlockItemConverter>> Converters_;
- TVector<std::unique_ptr<IArrayBuilder>> Builders_;
- TVector<NYql::NUdf::IBlockItemHasher::TPtr> Hashers_;
- };
- class TBlockStorage : public TComputationValue<TBlockStorage> {
- using TBase = TComputationValue<TBlockStorage>;
- public:
- struct TBlock {
- size_t Size;
- std::vector<arrow::Datum> Columns;
- TBlock() = default;
- TBlock(size_t size, std::vector<arrow::Datum> columns)
- : Size(size)
- , Columns(std::move(columns))
- {}
- };
- struct TRowEntry {
- ui32 BlockOffset;
- ui32 ItemOffset;
- TRowEntry() = default;
- TRowEntry(ui32 blockOffset, ui32 itemOffset)
- : BlockOffset(blockOffset)
- , ItemOffset(itemOffset)
- {}
- };
- class TRowIterator {
- friend class TBlockStorage;
- public:
- TRowIterator() = default;
- TRowIterator(const TRowIterator&) = default;
- TRowIterator& operator=(const TRowIterator&) = default;
- TMaybe<TRowEntry> Next() {
- Y_ENSURE(IsValid());
- if (IsEmpty()) {
- return Nothing();
- }
- auto entry = TRowEntry(CurrentBlockOffset_, CurrentItemOffset_);
- auto& block = BlockStorage_->GetBlock(CurrentBlockOffset_);
- CurrentItemOffset_++;
- if (CurrentItemOffset_ == block.Size) {
- CurrentBlockOffset_++;
- CurrentItemOffset_ = 0;
- }
- return entry;
- }
- bool IsValid() const {
- return BlockStorage_;
- }
- bool IsEmpty() const {
- Y_ENSURE(IsValid());
- return CurrentBlockOffset_ >= BlockStorage_->GetBlockCount();
- }
- private:
- TRowIterator(const TBlockStorage* blockStorage)
- : BlockStorage_(blockStorage)
- {}
- private:
- size_t CurrentBlockOffset_ = 0;
- size_t CurrentItemOffset_ = 0;
- const TBlockStorage* BlockStorage_ = nullptr;
- };
- TBlockStorage(
- TMemoryUsageInfo* memInfo,
- const TVector<TType*>& itemTypes,
- NUdf::TUnboxedValue stream,
- TStringBuf resourceTag,
- arrow::MemoryPool* pool
- )
- : TBase(memInfo)
- , InputsDescr_(ToValueDescr(itemTypes))
- , Stream_(std::move(stream))
- , Inputs_(itemTypes.size())
- , ResourceTag_(std::move(resourceTag))
- {
- TBlockTypeHelper helper;
- for (size_t i = 0; i < itemTypes.size(); i++) {
- TType* blockItemType = AS_TYPE(TBlockType, itemTypes[i])->GetItemType();
- Readers_.push_back(MakeBlockReader(TTypeInfoHelper(), blockItemType));
- Hashers_.push_back(helper.MakeHasher(blockItemType));
- Comparators_.push_back(helper.MakeComparator(blockItemType));
- Trimmers_.push_back(MakeBlockTrimmer(TTypeInfoHelper(), blockItemType, pool));
- }
- }
- NUdf::EFetchStatus FetchStream() {
- switch (Stream_.WideFetch(Inputs_.data(), Inputs_.size())) {
- case NUdf::EFetchStatus::Yield:
- return NUdf::EFetchStatus::Yield;
- case NUdf::EFetchStatus::Finish:
- IsFinished_ = true;
- return NUdf::EFetchStatus::Finish;
- case NUdf::EFetchStatus::Ok:
- break;
- }
- Y_ENSURE(!IsFinished_, "Got data on finished stream");
- std::vector<arrow::Datum> blockColumns;
- for (size_t i = 0; i < Inputs_.size() - 1; i++) {
- auto& datum = TArrowBlock::From(Inputs_[i]).GetDatum();
- ARROW_DEBUG_CHECK_DATUM_TYPES(InputsDescr_[i], datum.descr());
- if (datum.is_scalar()) {
- blockColumns.push_back(datum);
- } else {
- MKQL_ENSURE(datum.is_array(), "Expecting array");
- blockColumns.push_back(Trimmers_[i]->Trim(datum.array()));
- }
- }
- auto blockSize = ::GetBlockCount(Inputs_[Inputs_.size() - 1]);
- Data_.emplace_back(blockSize, std::move(blockColumns));
- RowCount_ += blockSize;
- return NUdf::EFetchStatus::Ok;
- }
- const TBlock& GetBlock(size_t blockOffset) const {
- Y_ENSURE(blockOffset < GetBlockCount());
- return Data_[blockOffset];
- }
- size_t GetBlockCount() const {
- return Data_.size();
- }
- TRowEntry GetRowEntry(size_t blockOffset, size_t itemOffset) const {
- auto& block = GetBlock(blockOffset);
- Y_ENSURE(itemOffset < block.Size);
- return TRowEntry(blockOffset, itemOffset);
- }
- TRowIterator GetRowIterator() const {
- return TRowIterator(this);
- }
- size_t GetRowCount() const {
- return RowCount_;
- }
- TBlockItem GetItem(TRowEntry entry, ui32 columnIdx) const {
- Y_ENSURE(columnIdx < Inputs_.size() - 1);
- return GetItemFromBlock(GetBlock(entry.BlockOffset), columnIdx, entry.ItemOffset);
- }
- TBlockItem GetItemFromBlock(const TBlock& block, ui32 columnIdx, size_t offset) const {
- Y_ENSURE(offset < block.Size);
- const auto& datum = block.Columns[columnIdx];
- if (datum.is_scalar()) {
- return Readers_[columnIdx]->GetScalarItem(*datum.scalar());
- } else {
- MKQL_ENSURE(datum.is_array(), "Expecting array");
- return Readers_[columnIdx]->GetItem(*datum.array(), offset);
- }
- }
- void GetRow(TRowEntry entry, const TVector<ui32>& ioMap, std::vector<NYql::NUdf::TBlockItem>& row) const {
- Y_ENSURE(row.size() == ioMap.size());
- for (size_t i = 0; i < row.size(); i++) {
- row[i] = GetItem(entry, ioMap[i]);
- }
- }
- const TVector<NUdf::IBlockItemComparator::TPtr>& GetItemComparators() const {
- return Comparators_;
- }
- const TVector<NUdf::IBlockItemHasher::TPtr>& GetItemHashers() const {
- return Hashers_;
- }
- bool IsFinished() const {
- return IsFinished_;
- }
- private:
- NUdf::TStringRef GetResourceTag() const override {
- return NUdf::TStringRef(ResourceTag_);
- }
- void* GetResource() override {
- return this;
- }
- protected:
- const std::vector<arrow::ValueDescr> InputsDescr_;
- TVector<std::unique_ptr<IBlockReader>> Readers_;
- TVector<NUdf::IBlockItemHasher::TPtr> Hashers_;
- TVector<NUdf::IBlockItemComparator::TPtr> Comparators_;
- TVector<IBlockTrimmer::TPtr> Trimmers_;
- std::vector<TBlock> Data_;
- size_t RowCount_ = 0;
- bool IsFinished_ = false;
- NUdf::TUnboxedValue Stream_;
- TUnboxedValueVector Inputs_;
- const TStringBuf ResourceTag_;
- };
- class TBlockStorageWrapper : public TMutableComputationNode<TBlockStorageWrapper> {
- using TBaseComputation = TMutableComputationNode<TBlockStorageWrapper>;
- public:
- TBlockStorageWrapper(
- TComputationMutables& mutables,
- TVector<TType*>&& itemTypes,
- IComputationNode* stream,
- const TStringBuf& resourceTag
- )
- : TBaseComputation(mutables, EValueRepresentation::Boxed)
- , ItemTypes_(std::move(itemTypes))
- , Stream_(stream)
- , ResourceTag_(resourceTag)
- {}
- NUdf::TUnboxedValuePod DoCalculate(TComputationContext& ctx) const {
- return ctx.HolderFactory.Create<TBlockStorage>(
- ItemTypes_,
- std::move(Stream_->GetValue(ctx)),
- ResourceTag_,
- &ctx.ArrowMemoryPool
- );
- }
- private:
- void RegisterDependencies() const final {
- DependsOn(Stream_);
- }
- private:
- const TVector<TType*> ItemTypes_;
- IComputationNode* const Stream_;
- const TString ResourceTag_;
- };
- class TBlockIndex : public TComputationValue<TBlockIndex> {
- using TBase = TComputationValue<TBlockIndex>;
- struct TIndexNode {
- TBlockStorage::TRowEntry Entry;
- TIndexNode* Next;
- TIndexNode() = delete;
- TIndexNode(TBlockStorage::TRowEntry entry, TIndexNode* next = nullptr)
- : Entry(entry)
- , Next(next)
- {}
- };
- class TIndexMapValue {
- public:
- TIndexMapValue()
- : Raw(0)
- {}
- TIndexMapValue(TBlockStorage::TRowEntry entry) {
- TIndexEntryUnion un;
- un.Entry = entry;
- Y_ENSURE(((un.Raw << 1) >> 1) == un.Raw);
- Raw = (un.Raw << 1) | 1;
- }
- TIndexMapValue(TIndexNode* entryList)
- : EntryList(entryList)
- {}
- bool IsInplace() const {
- return Raw & 1;
- }
- TIndexNode* GetList() const {
- Y_ENSURE(!IsInplace());
- return EntryList;
- }
- TBlockStorage::TRowEntry GetEntry() const {
- Y_ENSURE(IsInplace());
- TIndexEntryUnion un;
- un.Raw = Raw >> 1;
- return un.Entry;
- }
- private:
- union TIndexEntryUnion {
- TBlockStorage::TRowEntry Entry;
- ui64 Raw;
- };
- union {
- TIndexNode* EntryList;
- ui64 Raw;
- };
- };
- using TIndexMap = TRobinHoodHashFixedMap<
- ui64,
- TIndexMapValue,
- std::equal_to<ui64>,
- std::hash<ui64>,
- TMKQLHugeAllocator<char>
- >;
- static_assert(sizeof(TIndexMapValue) == 8);
- static_assert(std::max(TIndexMap::GetCellSize(), static_cast<ui32>(sizeof(TIndexNode))) == BlockMapJoinIndexEntrySize);
- public:
- class TIterator {
- friend class TBlockIndex;
- enum class EIteratorType {
- EMPTY,
- INPLACE,
- LIST
- };
- public:
- TIterator() = default;
- TIterator(const TIterator&) = delete;
- TIterator& operator=(const TIterator&) = delete;
- TIterator(TIterator&& other) {
- *this = std::move(other);
- }
- TIterator& operator=(TIterator&& other) {
- if (this != &other) {
- Type_ = other.Type_;
- BlockIndex_ = other.BlockIndex_;
- ItemsToLookup_ = std::move(other.ItemsToLookup_);
- switch (Type_) {
- case EIteratorType::EMPTY:
- break;
- case EIteratorType::INPLACE:
- Entry_ = other.Entry_;
- EntryConsumed_ = other.EntryConsumed_;
- break;
- case EIteratorType::LIST:
- Node_ = other.Node_;
- break;
- }
- other.BlockIndex_ = nullptr;
- }
- return *this;
- }
- TMaybe<TBlockStorage::TRowEntry> Next() {
- Y_ENSURE(IsValid());
- switch (Type_) {
- case EIteratorType::EMPTY:
- return Nothing();
- case EIteratorType::INPLACE:
- if (EntryConsumed_) {
- return Nothing();
- }
- EntryConsumed_ = true;
- return BlockIndex_->IsKeyEquals(Entry_, ItemsToLookup_) ? TMaybe<TBlockStorage::TRowEntry>(Entry_) : Nothing();
- case EIteratorType::LIST:
- for (; Node_ != nullptr; Node_ = Node_->Next) {
- if (BlockIndex_->IsKeyEquals(Node_->Entry, ItemsToLookup_)) {
- auto entry = Node_->Entry;
- Node_ = Node_->Next;
- return entry;
- }
- }
- return Nothing();
- }
- }
- bool IsValid() const {
- return BlockIndex_;
- }
- bool IsEmpty() const {
- Y_ENSURE(IsValid());
- switch (Type_) {
- case EIteratorType::EMPTY:
- return true;
- case EIteratorType::INPLACE:
- return EntryConsumed_;
- case EIteratorType::LIST:
- return Node_ == nullptr;
- }
- }
- void Reset() {
- *this = TIterator();
- }
- private:
- TIterator(const TBlockIndex* blockIndex)
- : Type_(EIteratorType::EMPTY)
- , BlockIndex_(blockIndex)
- {}
- TIterator(const TBlockIndex* blockIndex, TBlockStorage::TRowEntry entry, std::vector<NYql::NUdf::TBlockItem> itemsToLookup)
- : Type_(EIteratorType::INPLACE)
- , BlockIndex_(blockIndex)
- , Entry_(entry)
- , EntryConsumed_(false)
- , ItemsToLookup_(std::move(itemsToLookup))
- {}
- TIterator(const TBlockIndex* blockIndex, TIndexNode* node, std::vector<NYql::NUdf::TBlockItem> itemsToLookup)
- : Type_(EIteratorType::LIST)
- , BlockIndex_(blockIndex)
- , Node_(node)
- , ItemsToLookup_(std::move(itemsToLookup))
- {}
- private:
- EIteratorType Type_;
- const TBlockIndex* BlockIndex_ = nullptr;
- union {
- TIndexNode* Node_;
- struct {
- TBlockStorage::TRowEntry Entry_;
- bool EntryConsumed_;
- };
- };
- std::vector<NYql::NUdf::TBlockItem> ItemsToLookup_;
- };
- public:
- TBlockIndex(
- TMemoryUsageInfo* memInfo,
- const TVector<ui32>& keyColumns,
- NUdf::TUnboxedValue blockStorage,
- bool any,
- TStringBuf resourceTag
- )
- : TBase(memInfo)
- , KeyColumns_(keyColumns)
- , BlockStorage_(std::move(blockStorage))
- , Any_(any)
- , ResourceTag_(std::move(resourceTag))
- {}
- void BuildIndex() {
- if (Index_) {
- return;
- }
- auto& blockStorage = *static_cast<TBlockStorage*>(BlockStorage_.GetResource());
- Y_ENSURE(blockStorage.IsFinished(), "Index build should be done after all data has been read");
- Index_ = std::make_unique<TIndexMap>(CalculateRHHashTableCapacity(blockStorage.GetRowCount()));
- for (size_t blockOffset = 0; blockOffset < blockStorage.GetBlockCount(); blockOffset++) {
- const auto& block = blockStorage.GetBlock(blockOffset);
- auto blockSize = block.Size;
- std::array<TRobinHoodBatchRequestItem<ui64>, PrefetchBatchSize> insertBatch;
- std::array<TBlockStorage::TRowEntry, PrefetchBatchSize> insertBatchEntries;
- std::array<std::vector<NYql::NUdf::TBlockItem>, PrefetchBatchSize> insertBatchKeys;
- ui32 insertBatchLen = 0;
- auto processInsertBatch = [&]() {
- Index_->BatchInsert({insertBatch.data(), insertBatchLen}, [&](size_t i, TIndexMap::iterator iter, bool isNew) {
- auto value = static_cast<TIndexMapValue*>(Index_->GetMutablePayload(iter));
- if (isNew) {
- // Store single entry inplace
- *value = TIndexMapValue(insertBatchEntries[i]);
- Index_->CheckGrow();
- } else {
- if (Any_ && ContainsKey(value, insertBatchKeys[i])) {
- return;
- }
- // Store as list
- if (value->IsInplace()) {
- *value = TIndexMapValue(InsertIndexNode(value->GetEntry()));
- }
- *value = TIndexMapValue(InsertIndexNode(insertBatchEntries[i], value->GetList()));
- }
- });
- };
- Y_ENSURE(blockOffset <= std::numeric_limits<ui32>::max());
- Y_ENSURE(blockSize <= std::numeric_limits<ui32>::max());
- for (size_t itemOffset = 0; itemOffset < blockSize; itemOffset++) {
- ui64 keyHash = GetKey(block, itemOffset, insertBatchKeys[insertBatchLen]);
- if (!keyHash) {
- continue;
- }
- insertBatchEntries[insertBatchLen] = TBlockStorage::TRowEntry(blockOffset, itemOffset);
- insertBatch[insertBatchLen].ConstructKey(keyHash);
- insertBatchLen++;
- if (insertBatchLen == PrefetchBatchSize) {
- processInsertBatch();
- insertBatchLen = 0;
- }
- }
- if (insertBatchLen > 0) {
- processInsertBatch();
- }
- }
- }
- template<typename TGetKey>
- void BatchLookup(size_t batchSize, std::array<TIterator, PrefetchBatchSize>& iterators, TGetKey&& getKey) {
- Y_ENSURE(batchSize <= PrefetchBatchSize);
- std::array<TRobinHoodBatchRequestItem<ui64>, PrefetchBatchSize> lookupBatch;
- std::array<std::vector<NYql::NUdf::TBlockItem>, PrefetchBatchSize> itemsBatch;
- for (size_t i = 0; i < batchSize; i++) {
- const auto& [items, keyHash] = getKey(i);
- lookupBatch[i].ConstructKey(keyHash);
- itemsBatch[i] = items;
- }
- Index_->BatchLookup({lookupBatch.data(), batchSize}, [&](size_t i, TIndexMap::iterator iter) {
- if (!iter) {
- // Empty iterator
- iterators[i] = TIterator(this);
- return;
- }
- auto value = static_cast<const TIndexMapValue*>(Index_->GetPayload(iter));
- if (value->IsInplace()) {
- iterators[i] = TIterator(this, value->GetEntry(), std::move(itemsBatch[i]));
- } else {
- iterators[i] = TIterator(this, value->GetList(), std::move(itemsBatch[i]));
- }
- });
- }
- bool IsKeyEquals(TBlockStorage::TRowEntry entry, const std::vector<NYql::NUdf::TBlockItem>& keyItems) const {
- auto& blockStorage = *static_cast<TBlockStorage*>(BlockStorage_.GetResource());
- Y_ENSURE(keyItems.size() == KeyColumns_.size());
- for (size_t i = 0; i < KeyColumns_.size(); i++) {
- auto indexItem = blockStorage.GetItem(entry, KeyColumns_[i]);
- if (blockStorage.GetItemComparators()[KeyColumns_[i]]->Equals(indexItem, keyItems[i])) {
- return true;
- }
- }
- return false;
- }
- const NUdf::TUnboxedValue& GetBlockStorage() const {
- return BlockStorage_;
- }
- private:
- ui64 GetKey(const TBlockStorage::TBlock& block, size_t offset, std::vector<NYql::NUdf::TBlockItem>& keyItems) const {
- auto& blockStorage = *static_cast<TBlockStorage*>(BlockStorage_.GetResource());
- ui64 keyHash = 0;
- keyItems.clear();
- for (ui32 keyColumn : KeyColumns_) {
- auto item = blockStorage.GetItemFromBlock(block, keyColumn, offset);
- if (!item) {
- keyItems.clear();
- return 0;
- }
- keyHash = CombineHashes(keyHash, blockStorage.GetItemHashers()[keyColumn]->Hash(item));
- keyItems.push_back(std::move(item));
- }
- return keyHash;
- }
- TIndexNode* InsertIndexNode(TBlockStorage::TRowEntry entry, TIndexNode* currentHead = nullptr) {
- return &IndexNodes_.emplace_back(entry, currentHead);
- }
- bool ContainsKey(const TIndexMapValue* chain, const std::vector<NYql::NUdf::TBlockItem>& keyItems) const {
- if (chain->IsInplace()) {
- return IsKeyEquals(chain->GetEntry(), keyItems);
- } else {
- for (TIndexNode* node = chain->GetList(); node != nullptr; node = node->Next) {
- if (IsKeyEquals(node->Entry, keyItems)) {
- return true;
- }
- node = node->Next;
- }
- return false;
- }
- }
- NUdf::TStringRef GetResourceTag() const override {
- return NUdf::TStringRef(ResourceTag_);
- }
- void* GetResource() override {
- return this;
- }
- private:
- const TVector<ui32>& KeyColumns_;
- NUdf::TUnboxedValue BlockStorage_;
- std::unique_ptr<TIndexMap> Index_;
- std::deque<TIndexNode> IndexNodes_;
- const bool Any_;
- const TStringBuf ResourceTag_;
- };
- class TBlockMapJoinIndexWrapper : public TMutableComputationNode<TBlockMapJoinIndexWrapper> {
- using TBaseComputation = TMutableComputationNode<TBlockMapJoinIndexWrapper>;
- public:
- TBlockMapJoinIndexWrapper(
- TComputationMutables& mutables,
- TVector<ui32>&& keyColumns,
- IComputationNode* blockStorage,
- bool any,
- const TStringBuf& resourceTag
- )
- : TBaseComputation(mutables, EValueRepresentation::Boxed)
- , KeyColumns_(std::move(keyColumns))
- , BlockStorage_(blockStorage)
- , Any_(any)
- , ResourceTag_(resourceTag)
- {}
- NUdf::TUnboxedValuePod DoCalculate(TComputationContext& ctx) const {
- return ctx.HolderFactory.Create<TBlockIndex>(
- KeyColumns_,
- std::move(BlockStorage_->GetValue(ctx)),
- Any_,
- ResourceTag_
- );
- }
- private:
- void RegisterDependencies() const final {
- DependsOn(BlockStorage_);
- }
- private:
- const TVector<ui32> KeyColumns_;
- IComputationNode* const BlockStorage_;
- const bool Any_;
- const TString ResourceTag_;
- };
- template <bool WithoutRight, bool RightRequired>
- class TBlockMapJoinCoreWraper : public TMutableComputationNode<TBlockMapJoinCoreWraper<WithoutRight, RightRequired>>
- {
- using TBaseComputation = TMutableComputationNode<TBlockMapJoinCoreWraper<WithoutRight, RightRequired>>;
- using TJoinState = TBlockJoinState<RightRequired>;
- using TStorageState = TBlockStorage;
- using TIndexState = TBlockIndex;
- public:
- TBlockMapJoinCoreWraper(
- TComputationMutables& mutables,
- const TVector<TType*>&& resultItemTypes,
- const TVector<TType*>&& leftItemTypes,
- const TVector<ui32>&& leftKeyColumns,
- const TVector<ui32>&& leftIOMap,
- const TVector<ui32>&& rightIOMap,
- IComputationNode* leftStream,
- IComputationNode* rightBlockIndex
- )
- : TBaseComputation(mutables, EValueRepresentation::Boxed)
- , ResultItemTypes_(std::move(resultItemTypes))
- , LeftItemTypes_(std::move(leftItemTypes))
- , LeftKeyColumns_(std::move(leftKeyColumns))
- , LeftIOMap_(std::move(leftIOMap))
- , RightIOMap_(std::move(rightIOMap))
- , LeftStream_(leftStream)
- , RightBlockIndex_(rightBlockIndex)
- , KeyTupleCache_(mutables)
- {}
- NUdf::TUnboxedValuePod DoCalculate(TComputationContext& ctx) const {
- const auto joinState = ctx.HolderFactory.Create<TJoinState>(
- ctx,
- LeftItemTypes_,
- LeftIOMap_,
- ResultItemTypes_
- );
- return ctx.HolderFactory.Create<TStreamValue>(ctx.HolderFactory,
- std::move(joinState),
- LeftKeyColumns_,
- RightIOMap_,
- std::move(LeftStream_->GetValue(ctx)),
- std::move(RightBlockIndex_->GetValue(ctx))
- );
- }
- private:
- class TStreamValue : public TComputationValue<TStreamValue> {
- using TBase = TComputationValue<TStreamValue>;
- public:
- TStreamValue(
- TMemoryUsageInfo* memInfo,
- const THolderFactory& holderFactory,
- NUdf::TUnboxedValue&& joinState,
- const TVector<ui32>& leftKeyColumns,
- const TVector<ui32>& rightIOMap,
- NUdf::TUnboxedValue&& leftStream,
- NUdf::TUnboxedValue&& rightBlockIndex
- )
- : TBase(memInfo)
- , JoinState_(joinState)
- , LeftKeyColumns_(leftKeyColumns)
- , RightIOMap_(rightIOMap)
- , LeftStream_(leftStream)
- , RightBlockIndex_(rightBlockIndex)
- , HolderFactory_(holderFactory)
- {}
- private:
- NUdf::EFetchStatus WideFetch(NUdf::TUnboxedValue* output, ui32 width) {
- auto& joinState = *static_cast<TJoinState*>(JoinState_.AsBoxed().Get());
- auto& indexState = *static_cast<TIndexState*>(RightBlockIndex_.GetResource());
- auto& storageState = *static_cast<TStorageState*>(indexState.GetBlockStorage().GetResource());
- if (!RightStreamConsumed_) {
- auto fetchStatus = NUdf::EFetchStatus::Ok;
- while (fetchStatus != NUdf::EFetchStatus::Finish) {
- fetchStatus = storageState.FetchStream();
- if (fetchStatus == NUdf::EFetchStatus::Yield) {
- return NUdf::EFetchStatus::Yield;
- }
- }
- indexState.BuildIndex();
- RightStreamConsumed_ = true;
- }
- auto* inputFields = joinState.GetRawInputFields();
- const size_t inputWidth = joinState.GetInputWidth();
- const size_t outputWidth = joinState.GetOutputWidth();
- MKQL_ENSURE(width == outputWidth,
- "The given width doesn't equal to the result type size");
- std::vector<NYql::NUdf::TBlockItem> leftKeyColumns(LeftKeyColumns_.size());
- std::vector<ui64> leftKeyColumnHashes(LeftKeyColumns_.size());
- std::vector<NYql::NUdf::TBlockItem> rightRow(RightIOMap_.size());
- while (!joinState.HasBlocks()) {
- while (joinState.IsNotFull() && LookupBatchCurrent_ < LookupBatchSize_) {
- auto& iter = LookupBatchIterators_[LookupBatchCurrent_];
- if constexpr (WithoutRight) {
- if (bool(iter.IsEmpty()) != RightRequired) {
- joinState.CopyRow();
- }
- joinState.NextRow();
- LookupBatchCurrent_++;
- continue;
- } else if constexpr (!RightRequired) {
- if (iter.IsEmpty()) {
- joinState.MakeRow(std::vector<NYql::NUdf::TBlockItem>());
- joinState.NextRow();
- LookupBatchCurrent_++;
- continue;
- }
- }
- while (joinState.IsNotFull() && !iter.IsEmpty()) {
- auto key = iter.Next();
- storageState.GetRow(*key, RightIOMap_, rightRow);
- joinState.MakeRow(rightRow);
- }
- if (iter.IsEmpty()) {
- joinState.NextRow();
- LookupBatchCurrent_++;
- }
- }
- if (joinState.IsNotFull() && joinState.RemainingRowsCount() > 0) {
- LookupBatchSize_ = std::min(PrefetchBatchSize, static_cast<ui32>(joinState.RemainingRowsCount()));
- indexState.BatchLookup(LookupBatchSize_, LookupBatchIterators_, [&](size_t i) {
- MakeLeftKeys(leftKeyColumns, leftKeyColumnHashes, i);
- ui64 keyHash = CalculateTupleHash(leftKeyColumnHashes);
- return std::make_pair(std::ref(leftKeyColumns), keyHash);
- });
- LookupBatchCurrent_ = 0;
- continue;
- }
- if (joinState.IsNotFull() && !joinState.IsFinished()) {
- switch (LeftStream_.WideFetch(inputFields, inputWidth)) {
- case NUdf::EFetchStatus::Yield:
- return NUdf::EFetchStatus::Yield;
- case NUdf::EFetchStatus::Ok:
- joinState.Reset();
- continue;
- case NUdf::EFetchStatus::Finish:
- joinState.Finish();
- break;
- }
- // Leave the loop, if no values left in the stream.
- Y_DEBUG_ABORT_UNLESS(joinState.IsFinished());
- }
- if (joinState.IsEmpty()) {
- return NUdf::EFetchStatus::Finish;
- }
- joinState.MakeBlocks(HolderFactory_);
- }
- const auto sliceSize = joinState.Slice();
- for (size_t i = 0; i < outputWidth; i++) {
- output[i] = joinState.Get(sliceSize, HolderFactory_, i);
- }
- return NUdf::EFetchStatus::Ok;
- }
- void MakeLeftKeys(std::vector<NYql::NUdf::TBlockItem>& items, std::vector<ui64>& hashes, size_t offset) const {
- auto& joinState = *static_cast<TJoinState*>(JoinState_.AsBoxed().Get());
- Y_ENSURE(items.size() == LeftKeyColumns_.size());
- Y_ENSURE(hashes.size() == LeftKeyColumns_.size());
- for (size_t i = 0; i < LeftKeyColumns_.size(); i++) {
- std::tie(items[i], hashes[i]) = joinState.GetItemWithHash(LeftKeyColumns_[i], offset);
- }
- }
- NUdf::TUnboxedValue JoinState_;
- const TVector<ui32>& LeftKeyColumns_;
- const TVector<ui32>& RightIOMap_;
- bool RightStreamConsumed_ = false;
- std::array<typename TIndexState::TIterator, PrefetchBatchSize> LookupBatchIterators_;
- ui32 LookupBatchCurrent_ = 0;
- ui32 LookupBatchSize_ = 0;
- NUdf::TUnboxedValue LeftStream_;
- NUdf::TUnboxedValue RightBlockIndex_;
- const THolderFactory& HolderFactory_;
- };
- void RegisterDependencies() const final {
- this->DependsOn(LeftStream_);
- this->DependsOn(RightBlockIndex_);
- }
- private:
- const TVector<TType*> ResultItemTypes_;
- const TVector<TType*> LeftItemTypes_;
- const TVector<ui32> LeftKeyColumns_;
- const TVector<ui32> LeftIOMap_;
- const TVector<ui32> RightIOMap_;
- IComputationNode* const LeftStream_;
- IComputationNode* const RightBlockIndex_;
- const TContainerCacheOnContext KeyTupleCache_;
- };
- class TBlockCrossJoinCoreWraper : public TMutableComputationNode<TBlockCrossJoinCoreWraper>
- {
- using TBaseComputation = TMutableComputationNode<TBlockCrossJoinCoreWraper>;
- using TJoinState = TBlockJoinState<true>;
- using TStorageState = TBlockStorage;
- public:
- TBlockCrossJoinCoreWraper(
- TComputationMutables& mutables,
- const TVector<TType*>&& resultItemTypes,
- const TVector<TType*>&& leftItemTypes,
- const TVector<ui32>&& leftIOMap,
- const TVector<ui32>&& rightIOMap,
- IComputationNode* leftStream,
- IComputationNode* rightBlockStorage
- )
- : TBaseComputation(mutables, EValueRepresentation::Boxed)
- , ResultItemTypes_(std::move(resultItemTypes))
- , LeftItemTypes_(std::move(leftItemTypes))
- , LeftIOMap_(std::move(leftIOMap))
- , RightIOMap_(std::move(rightIOMap))
- , LeftStream_(std::move(leftStream))
- , RightBlockStorage_(std::move(rightBlockStorage))
- , KeyTupleCache_(mutables)
- {}
- NUdf::TUnboxedValuePod DoCalculate(TComputationContext& ctx) const {
- const auto joinState = ctx.HolderFactory.Create<TJoinState>(
- ctx,
- LeftItemTypes_,
- LeftIOMap_,
- ResultItemTypes_
- );
- return ctx.HolderFactory.Create<TStreamValue>(ctx.HolderFactory,
- std::move(joinState),
- RightIOMap_,
- std::move(LeftStream_->GetValue(ctx)),
- std::move(RightBlockStorage_->GetValue(ctx))
- );
- }
- private:
- class TStreamValue : public TComputationValue<TStreamValue> {
- using TBase = TComputationValue<TStreamValue>;
- public:
- TStreamValue(
- TMemoryUsageInfo* memInfo,
- const THolderFactory& holderFactory,
- NUdf::TUnboxedValue&& joinState,
- const TVector<ui32>& rightIOMap,
- NUdf::TUnboxedValue&& leftStream,
- NUdf::TUnboxedValue&& rightBlockStorage
- )
- : TBase(memInfo)
- , JoinState_(joinState)
- , RightIOMap_(rightIOMap)
- , LeftStream_(leftStream)
- , RightBlockStorage_(rightBlockStorage)
- , HolderFactory_(holderFactory)
- {}
- private:
- NUdf::EFetchStatus WideFetch(NUdf::TUnboxedValue* output, ui32 width) {
- auto& joinState = *static_cast<TJoinState*>(JoinState_.AsBoxed().Get());
- auto& storageState = *static_cast<TStorageState*>(RightBlockStorage_.GetResource());
- if (!RightStreamConsumed_) {
- auto fetchStatus = NUdf::EFetchStatus::Ok;
- while (fetchStatus != NUdf::EFetchStatus::Finish) {
- fetchStatus = storageState.FetchStream();
- if (fetchStatus == NUdf::EFetchStatus::Yield) {
- return NUdf::EFetchStatus::Yield;
- }
- }
- RightStreamConsumed_ = true;
- RightRowIterator_ = storageState.GetRowIterator();
- }
- auto* inputFields = joinState.GetRawInputFields();
- const size_t inputWidth = joinState.GetInputWidth();
- const size_t outputWidth = joinState.GetOutputWidth();
- MKQL_ENSURE(width == outputWidth,
- "The given width doesn't equal to the result type size");
- std::vector<NYql::NUdf::TBlockItem> rightRow(RightIOMap_.size());
- while (!joinState.HasBlocks()) {
- while (!RightRowIterator_.IsEmpty() && joinState.RemainingRowsCount() > 0 && joinState.IsNotFull()) {
- auto rowEntry = *RightRowIterator_.Next();
- storageState.GetRow(rowEntry, RightIOMap_, rightRow);
- joinState.MakeRow(rightRow);
- }
- if (joinState.IsNotFull() && joinState.RemainingRowsCount() > 0) {
- joinState.NextRow();
- RightRowIterator_ = storageState.GetRowIterator();
- continue;
- }
- if (joinState.IsNotFull() && !joinState.IsFinished()) {
- switch (LeftStream_.WideFetch(inputFields, inputWidth)) {
- case NUdf::EFetchStatus::Yield:
- return NUdf::EFetchStatus::Yield;
- case NUdf::EFetchStatus::Ok:
- joinState.Reset();
- continue;
- case NUdf::EFetchStatus::Finish:
- joinState.Finish();
- break;
- }
- // Leave the loop, if no values left in the stream.
- Y_DEBUG_ABORT_UNLESS(joinState.IsFinished());
- }
- if (joinState.IsEmpty()) {
- return NUdf::EFetchStatus::Finish;
- }
- joinState.MakeBlocks(HolderFactory_);
- }
- const auto sliceSize = joinState.Slice();
- for (size_t i = 0; i < outputWidth; i++) {
- output[i] = joinState.Get(sliceSize, HolderFactory_, i);
- }
- return NUdf::EFetchStatus::Ok;
- }
- NUdf::TUnboxedValue JoinState_;
- const TVector<ui32>& RightIOMap_;
- bool RightStreamConsumed_ = false;
- TStorageState::TRowIterator RightRowIterator_;
- NUdf::TUnboxedValue LeftStream_;
- NUdf::TUnboxedValue RightBlockStorage_;
- const THolderFactory& HolderFactory_;
- };
- void RegisterDependencies() const final {
- this->DependsOn(LeftStream_);
- this->DependsOn(RightBlockStorage_);
- }
- private:
- const TVector<TType*> ResultItemTypes_;
- const TVector<TType*> LeftItemTypes_;
- const TVector<ui32> LeftIOMap_;
- const TVector<ui32> RightIOMap_;
- IComputationNode* const LeftStream_;
- IComputationNode* const RightBlockStorage_;
- const TContainerCacheOnContext KeyTupleCache_;
- };
- } // namespace
- IComputationNode* WrapBlockStorage(TCallable& callable, const TComputationNodeFactoryContext& ctx) {
- MKQL_ENSURE(callable.GetInputsCount() == 1, "Expected 1 arg");
- const auto resultType = callable.GetType()->GetReturnType();
- MKQL_ENSURE(resultType->IsResource(), "Expected Resource as a result type");
- auto resultResourceType = AS_TYPE(TResourceType, resultType);
- MKQL_ENSURE(resultResourceType->GetTag().StartsWith(BlockStorageResourcePrefix), "Expected block storage resource");
- const auto inputType = callable.GetInput(0).GetStaticType();
- MKQL_ENSURE(inputType->IsStream(), "Expected WideStream as an input stream");
- const auto inputStreamType = AS_TYPE(TStreamType, inputType);
- MKQL_ENSURE(inputStreamType->GetItemType()->IsMulti(),
- "Expected Multi as a left stream item type");
- const auto inputStreamComponents = GetWideComponents(inputStreamType);
- MKQL_ENSURE(inputStreamComponents.size() > 0, "Expected at least one column");
- TVector<TType*> inputStreamItems(inputStreamComponents.cbegin(), inputStreamComponents.cend());
- const auto inputStream = LocateNode(ctx.NodeLocator, callable, 0);
- return new TBlockStorageWrapper(
- ctx.Mutables,
- std::move(inputStreamItems),
- inputStream,
- resultResourceType->GetTag()
- );
- }
- IComputationNode* WrapBlockMapJoinIndex(TCallable& callable, const TComputationNodeFactoryContext& ctx) {
- MKQL_ENSURE(callable.GetInputsCount() == 4, "Expected 4 args");
- const auto resultType = callable.GetType()->GetReturnType();
- MKQL_ENSURE(resultType->IsResource(), "Expected Resource as a result type");
- auto resultResourceType = AS_TYPE(TResourceType, resultType);
- MKQL_ENSURE(resultResourceType->GetTag().StartsWith(BlockMapJoinIndexResourcePrefix), "Expected block map join index resource");
- const auto inputType = callable.GetInput(0).GetStaticType();
- MKQL_ENSURE(inputType->IsResource(), "Expected Resource as an input type");
- auto inputResourceType = AS_TYPE(TResourceType, inputType);
- MKQL_ENSURE(inputResourceType->GetTag().StartsWith(BlockStorageResourcePrefix), "Expected block storage resource");
- auto origInputItemType = AS_VALUE(TTypeType, callable.GetInput(1));
- MKQL_ENSURE(origInputItemType->IsMulti(), "Expected Multi as an input item type");
- const auto streamComponents = AS_TYPE(TMultiType, origInputItemType)->GetElements();
- MKQL_ENSURE(streamComponents.size() > 0, "Expected at least one column");
- const auto keyColumnsLiteral = callable.GetInput(2);
- const auto keyColumnsTuple = AS_VALUE(TTupleLiteral, keyColumnsLiteral);
- TVector<ui32> keyColumns;
- keyColumns.reserve(keyColumnsTuple->GetValuesCount());
- for (ui32 i = 0; i < keyColumnsTuple->GetValuesCount(); i++) {
- const auto item = AS_VALUE(TDataLiteral, keyColumnsTuple->GetValue(i));
- keyColumns.emplace_back(item->AsValue().Get<ui32>());
- }
- for (ui32 keyColumn : keyColumns) {
- MKQL_ENSURE(keyColumn < streamComponents.size() - 1, "Key column out of range");
- }
- const auto anyNode = callable.GetInput(3);
- const auto any = AS_VALUE(TDataLiteral, anyNode)->AsValue().Get<bool>();
- const auto blockStorage = LocateNode(ctx.NodeLocator, callable, 0);
- return new TBlockMapJoinIndexWrapper(
- ctx.Mutables,
- std::move(keyColumns),
- blockStorage,
- any,
- resultResourceType->GetTag()
- );
- }
- IComputationNode* WrapBlockMapJoinCore(TCallable& callable, const TComputationNodeFactoryContext& ctx) {
- MKQL_ENSURE(callable.GetInputsCount() == 8, "Expected 8 args");
- const auto joinType = callable.GetType()->GetReturnType();
- MKQL_ENSURE(joinType->IsStream(), "Expected WideStream as a resulting stream");
- const auto joinStreamType = AS_TYPE(TStreamType, joinType);
- MKQL_ENSURE(joinStreamType->GetItemType()->IsMulti(),
- "Expected Multi as a resulting item type");
- const auto joinComponents = GetWideComponents(joinStreamType);
- MKQL_ENSURE(joinComponents.size() > 0, "Expected at least one column");
- const TVector<TType*> joinItems(joinComponents.cbegin(), joinComponents.cend());
- const auto leftType = callable.GetInput(0).GetStaticType();
- MKQL_ENSURE(leftType->IsStream(), "Expected WideStream as a left stream");
- const auto leftStreamType = AS_TYPE(TStreamType, leftType);
- MKQL_ENSURE(leftStreamType->GetItemType()->IsMulti(),
- "Expected Multi as a left stream item type");
- const auto leftStreamComponents = GetWideComponents(leftStreamType);
- MKQL_ENSURE(leftStreamComponents.size() > 0, "Expected at least one column");
- const TVector<TType*> leftStreamItems(leftStreamComponents.cbegin(), leftStreamComponents.cend());
- const auto joinKindNode = callable.GetInput(3);
- const auto rawKind = AS_VALUE(TDataLiteral, joinKindNode)->AsValue().Get<ui32>();
- const auto joinKind = GetJoinKind(rawKind);
- Y_ENSURE(joinKind == EJoinKind::Inner || joinKind == EJoinKind::Left ||
- joinKind == EJoinKind::LeftSemi || joinKind == EJoinKind::LeftOnly || joinKind == EJoinKind::Cross);
- const auto rightBlockStorageType = callable.GetInput(1).GetStaticType();
- MKQL_ENSURE(rightBlockStorageType->IsResource(), "Expected Resource as a right type");
- auto rightBlockStorageResourceType = AS_TYPE(TResourceType, rightBlockStorageType);
- if (joinKind != EJoinKind::Cross) {
- MKQL_ENSURE(rightBlockStorageResourceType->GetTag().StartsWith(BlockMapJoinIndexResourcePrefix), "Expected block map join index resource");
- } else {
- MKQL_ENSURE(rightBlockStorageResourceType->GetTag().StartsWith(BlockStorageResourcePrefix), "Expected block storage resource");
- }
- auto origRightItemType = AS_VALUE(TTypeType, callable.GetInput(2));
- MKQL_ENSURE(origRightItemType->IsMulti(), "Expected Multi as a right stream item type");
- const auto rightStreamComponents = AS_TYPE(TMultiType, origRightItemType)->GetElements();
- MKQL_ENSURE(rightStreamComponents.size() > 0, "Expected at least one column");
- const TVector<TType*> rightStreamItems(rightStreamComponents.cbegin(), rightStreamComponents.cend());
- const auto leftKeyColumnsLiteral = callable.GetInput(4);
- const auto leftKeyColumnsTuple = AS_VALUE(TTupleLiteral, leftKeyColumnsLiteral);
- TVector<ui32> leftKeyColumns;
- leftKeyColumns.reserve(leftKeyColumnsTuple->GetValuesCount());
- for (ui32 i = 0; i < leftKeyColumnsTuple->GetValuesCount(); i++) {
- const auto item = AS_VALUE(TDataLiteral, leftKeyColumnsTuple->GetValue(i));
- leftKeyColumns.emplace_back(item->AsValue().Get<ui32>());
- }
- const THashSet<ui32> leftKeySet(leftKeyColumns.cbegin(), leftKeyColumns.cend());
- const auto leftKeyDropsLiteral = callable.GetInput(5);
- const auto leftKeyDropsTuple = AS_VALUE(TTupleLiteral, leftKeyDropsLiteral);
- THashSet<ui32> leftKeyDrops;
- leftKeyDrops.reserve(leftKeyDropsTuple->GetValuesCount());
- for (ui32 i = 0; i < leftKeyDropsTuple->GetValuesCount(); i++) {
- const auto item = AS_VALUE(TDataLiteral, leftKeyDropsTuple->GetValue(i));
- leftKeyDrops.emplace(item->AsValue().Get<ui32>());
- }
- for (const auto& drop : leftKeyDrops) {
- MKQL_ENSURE(leftKeySet.contains(drop),
- "Only key columns has to be specified in drop column set");
- }
- const auto rightKeyColumnsLiteral = callable.GetInput(6);
- const auto rightKeyColumnsTuple = AS_VALUE(TTupleLiteral, rightKeyColumnsLiteral);
- TVector<ui32> rightKeyColumns;
- rightKeyColumns.reserve(rightKeyColumnsTuple->GetValuesCount());
- for (ui32 i = 0; i < rightKeyColumnsTuple->GetValuesCount(); i++) {
- const auto item = AS_VALUE(TDataLiteral, rightKeyColumnsTuple->GetValue(i));
- rightKeyColumns.emplace_back(item->AsValue().Get<ui32>());
- }
- const THashSet<ui32> rightKeySet(rightKeyColumns.cbegin(), rightKeyColumns.cend());
- const auto rightKeyDropsLiteral = callable.GetInput(7);
- const auto rightKeyDropsTuple = AS_VALUE(TTupleLiteral, rightKeyDropsLiteral);
- THashSet<ui32> rightKeyDrops;
- rightKeyDrops.reserve(rightKeyDropsTuple->GetValuesCount());
- for (ui32 i = 0; i < rightKeyDropsTuple->GetValuesCount(); i++) {
- const auto item = AS_VALUE(TDataLiteral, rightKeyDropsTuple->GetValue(i));
- rightKeyDrops.emplace(item->AsValue().Get<ui32>());
- }
- for (const auto& drop : rightKeyDrops) {
- MKQL_ENSURE(rightKeySet.contains(drop),
- "Only key columns has to be specified in drop column set");
- }
- if (joinKind == EJoinKind::Cross) {
- MKQL_ENSURE(leftKeyColumns.empty() && leftKeyDrops.empty() && rightKeyColumns.empty() && rightKeyDrops.empty(),
- "Specifying key columns is not allowed for cross join");
- }
- MKQL_ENSURE(leftKeyColumns.size() == rightKeyColumns.size(), "Key columns mismatch");
- // XXX: Mind the last wide item, containing block length.
- TVector<ui32> leftIOMap;
- for (size_t i = 0; i < leftStreamItems.size() - 1; i++) {
- if (leftKeyDrops.contains(i)) {
- continue;
- }
- leftIOMap.push_back(i);
- }
- // XXX: Mind the last wide item, containing block length.
- TVector<ui32> rightIOMap;
- if (joinKind == EJoinKind::Inner || joinKind == EJoinKind::Left || joinKind == EJoinKind::Cross) {
- for (size_t i = 0; i < rightStreamItems.size() - 1; i++) {
- if (rightKeyDrops.contains(i)) {
- continue;
- }
- rightIOMap.push_back(i);
- }
- } else {
- MKQL_ENSURE(rightKeyDrops.empty(), "Right key drops are not allowed for semi/only join");
- }
- const auto leftStream = LocateNode(ctx.NodeLocator, callable, 0);
- const auto rightBlockStorage = LocateNode(ctx.NodeLocator, callable, 1);
- #define JOIN_WRAPPER(WITHOUT_RIGHT, RIGHT_REQUIRED) \
- return new TBlockMapJoinCoreWraper<WITHOUT_RIGHT, RIGHT_REQUIRED>( \
- ctx.Mutables, \
- std::move(joinItems), \
- std::move(leftStreamItems), \
- std::move(leftKeyColumns), \
- std::move(leftIOMap), \
- std::move(rightIOMap), \
- leftStream, \
- rightBlockStorage \
- )
- switch (joinKind) {
- case EJoinKind::Inner:
- JOIN_WRAPPER(false, true);
- case EJoinKind::Left:
- JOIN_WRAPPER(false, false);
- case EJoinKind::LeftSemi:
- MKQL_ENSURE(rightIOMap.empty(), "Can't access right table on left semi join");
- JOIN_WRAPPER(true, true);
- case EJoinKind::LeftOnly:
- MKQL_ENSURE(rightIOMap.empty(), "Can't access right table on left only join");
- JOIN_WRAPPER(true, false);
- case EJoinKind::Cross:
- return new TBlockCrossJoinCoreWraper(
- ctx.Mutables,
- std::move(joinItems),
- std::move(leftStreamItems),
- std::move(leftIOMap),
- std::move(rightIOMap),
- leftStream,
- rightBlockStorage
- );
- default:
- /* TODO: Display the human-readable join kind name. */
- MKQL_ENSURE(false, "BlockMapJoinCore doesn't support join type #"
- << static_cast<ui32>(joinKind));
- }
- #undef JOIN_WRAPPER
- }
- } // namespace NMiniKQL
- } // namespace NKikimr
|