123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135 |
- #include "mkql_custom_list.h"
- namespace NKikimr {
- namespace NMiniKQL {
- TForwardListValue::TForwardListValue(TMemoryUsageInfo* memInfo, NUdf::TUnboxedValue&& stream)
- : TCustomListValue(memInfo)
- , Stream(std::move(stream))
- {
- MKQL_ENSURE(Stream, "Empty stream.");
- }
- TForwardListValue::TIterator::TIterator(TMemoryUsageInfo* memInfo, NUdf::TUnboxedValue&& stream)
- : TComputationValue(memInfo), Stream(std::move(stream))
- {}
- bool TForwardListValue::TIterator::Next(NUdf::TUnboxedValue& value) {
- const auto status = Stream.Fetch(value);
- MKQL_ENSURE(status != NUdf::EFetchStatus::Yield, "Unexpected stream status.");
- return status == NUdf::EFetchStatus::Ok;
- }
- NUdf::TUnboxedValue TForwardListValue::GetListIterator() const {
- MKQL_ENSURE(Stream, "Second pass for ForwardList");
- return NUdf::TUnboxedValuePod(new TIterator(GetMemInfo(), std::move(Stream)));
- }
- TExtendListValue::TExtendListValue(TMemoryUsageInfo* memInfo, TUnboxedValueVector&& lists)
- : TCustomListValue(memInfo)
- , Lists(std::move(lists))
- {
- MKQL_MEM_TAKE(memInfo, Lists.data(), Lists.capacity() * sizeof(NUdf::TUnboxedValue));
- Y_ASSERT(!Lists.empty());
- }
- TExtendListValue::TIterator::TIterator(TMemoryUsageInfo* memInfo, TUnboxedValueVector&& iters)
- : TComputationValue(memInfo)
- , Iters(std::move(iters))
- , Index(0)
- {
- MKQL_MEM_TAKE(memInfo, Iters.data(), Iters.capacity() * sizeof(NUdf::TUnboxedValue));
- }
- TExtendListValue::TIterator::~TIterator()
- {
- MKQL_MEM_RETURN(GetMemInfo(), Iters.data(), Iters.capacity() * sizeof(NUdf::TUnboxedValue));
- }
- bool TExtendListValue::TIterator::Next(NUdf::TUnboxedValue& value) {
- for (; Index < Iters.size(); ++Index) {
- if (Iters[Index].Next(value)) {
- return true;
- }
- }
- return false;
- }
- bool TExtendListValue::TIterator::Skip() {
- for (; Index < Iters.size(); ++Index) {
- if (Iters[Index].Skip()) {
- return true;
- }
- }
- return false;
- }
- NUdf::TUnboxedValue TExtendListValue::GetListIterator() const {
- TUnboxedValueVector iters;
- iters.reserve(Lists.size());
- for (const auto& list : Lists) {
- iters.emplace_back(list.GetListIterator());
- }
- return NUdf::TUnboxedValuePod(new TIterator(GetMemInfo(), std::move(iters)));
- }
- TExtendListValue::~TExtendListValue() {
- MKQL_MEM_RETURN(GetMemInfo(), Lists.data(), Lists.capacity() * sizeof(NUdf::TUnboxedValue));
- }
- ui64 TExtendListValue::GetListLength() const {
- if (!Length) {
- ui64 length = 0ULL;
- for (const auto& list : Lists) {
- ui64 partialLength = list.GetListLength();
- length += partialLength;
- }
- Length = length;
- }
- return *Length;
- }
- bool TExtendListValue::HasListItems() const {
- if (!HasItems) {
- for (const auto& list : Lists) {
- if (list.HasListItems()) {
- HasItems = true;
- break;
- }
- }
- if (!HasItems) {
- HasItems = false;
- }
- }
- return *HasItems;
- }
- TExtendStreamValue::TExtendStreamValue(TMemoryUsageInfo* memInfo, TUnboxedValueVector&& lists)
- : TBase(memInfo)
- , Lists(std::move(lists))
- {
- MKQL_MEM_TAKE(memInfo, Lists.data(), Lists.capacity() * sizeof(NUdf::TUnboxedValue));
- Y_ASSERT(!Lists.empty());
- }
- TExtendStreamValue::~TExtendStreamValue() {
- MKQL_MEM_RETURN(GetMemInfo(), Lists.data(), Lists.capacity() * sizeof(NUdf::TUnboxedValue));
- }
- NUdf::EFetchStatus TExtendStreamValue::Fetch(NUdf::TUnboxedValue& value) {
- for (; Index < Lists.size(); ++Index) {
- const auto status = Lists[Index].Fetch(value);
- if (status != NUdf::EFetchStatus::Finish) {
- return status;
- }
- }
- return NUdf::EFetchStatus::Finish;
- }
- }
- }
|