mkql_range.cpp 34 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737738739740741742743744745746747748749750751752753754755756757758759760761762763764765766767768769770771772773774775776777778779780781782783784785786787788789790791792793794795796797798799800801802803804805806807808809810811812813814815816817818819820821822823824825826827828829830831832833834835836837838839840841842843844845846847848849850851852853854855856857858859860861862863864865
  1. #include "mkql_range.h"
  2. #include <yql/essentials/minikql/computation/mkql_computation_node_holders.h>
  3. #include <yql/essentials/minikql/computation/presort.h>
  4. #include <yql/essentials/minikql/mkql_node_cast.h>
  5. #include <yql/essentials/minikql/mkql_node_builder.h>
  6. #include <yql/essentials/minikql/mkql_type_builder.h>
  7. #include <yql/essentials/minikql/mkql_string_util.h>
  8. #include <queue>
  9. #include <algorithm>
  10. namespace NKikimr {
  11. namespace NMiniKQL {
  12. using namespace NYql::NUdf;
  13. namespace {
  14. using TTypeList = std::vector<TType*>;
  15. using TUnboxedValueQueue = std::deque<NUdf::TUnboxedValue, TMKQLAllocator<NUdf::TUnboxedValue>>;
  16. struct TRangeTypeInfo {
  17. TType* RangeType = nullptr;
  18. ICompare::TPtr RangeCompare;
  19. TType* BoundaryType = nullptr;
  20. ICompare::TPtr BoundaryCompare;
  21. TTypeList Components;
  22. std::vector<ICompare::TPtr> ComponentsCompare;
  23. };
  24. TType* RemoveAllOptionals(TType* type) {
  25. Y_ENSURE(type);
  26. while (type->IsOptional()) {
  27. type = static_cast<TOptionalType*>(type)->GetItemType();
  28. }
  29. return type;
  30. }
  31. TRangeTypeInfo ExtractTypes(TType* rangeType) {
  32. TRangeTypeInfo result;
  33. result.RangeType = rangeType;
  34. result.RangeCompare = MakeCompareImpl(result.RangeType);
  35. MKQL_ENSURE(result.RangeType->IsTuple(), "Expecting range to be of tuple type");
  36. auto rangeTupleType = static_cast<TTupleType*>(result.RangeType);
  37. MKQL_ENSURE(rangeTupleType->GetElementsCount() == 2, "Expecting range to be of tuple type with 2 elements");
  38. MKQL_ENSURE(rangeTupleType->GetElementType(0)->IsSameType(*rangeTupleType->GetElementType(1)),
  39. "Expecting range to be of tuple type with 2 elements of same type");
  40. result.BoundaryType = rangeTupleType->GetElementType(0);
  41. result.BoundaryCompare = MakeCompareImpl(result.BoundaryType);
  42. MKQL_ENSURE(result.BoundaryType->IsTuple(), "Expecting range boundary to be of tuple type");
  43. auto rangeBoundaryTupleType = static_cast<TTupleType*>(result.BoundaryType);
  44. MKQL_ENSURE(rangeBoundaryTupleType->GetElementsCount() >= 3,
  45. "Expecting range boundary to be of tuple type with at least 3 elements");
  46. MKQL_ENSURE(rangeBoundaryTupleType->GetElementsCount() % 2 == 1,
  47. "Expecting range boundary to be of tuple type with odd element count");
  48. for (ui32 i = 0; i < rangeBoundaryTupleType->GetElementsCount(); ++i) {
  49. auto type = rangeBoundaryTupleType->GetElementType(i);
  50. if (i % 2 == 1) {
  51. auto baseType = RemoveAllOptionals(type);
  52. MKQL_ENSURE(type->IsOptional() && (baseType->IsData() || baseType->IsPg()),
  53. "Expecting (multiple) optional of Data or Pg at odd positions of range boundary tuple");
  54. } else {
  55. MKQL_ENSURE(type->IsData() && static_cast<TDataType*>(type)->GetSchemeType() == NUdf::TDataType<i32>::Id,
  56. "Expected i32 at even positions of range boundary tuple");
  57. }
  58. result.Components.push_back(type);
  59. result.ComponentsCompare.push_back(MakeCompareImpl(type));
  60. }
  61. return result;
  62. }
  63. struct TExpandedRangeBoundary {
  64. int Included = 0; // -1 = [; 0 = (); +1 = ]
  65. TUnboxedValue Value; // AsTuple(Inf, x, Inf, y, Inf, z, ..., Included), where -1 = -inf, +1 = +inf, 0 - finite value
  66. TUnboxedValueVector Components;
  67. };
  68. struct TExpandedRange {
  69. TExpandedRangeBoundary Left;
  70. TExpandedRangeBoundary Right;
  71. };
  72. TExpandedRangeBoundary Max(TExpandedRangeBoundary a, TExpandedRangeBoundary b, ICompare* cmp) {
  73. return cmp->Less(a.Value, b.Value) ? b : a;
  74. }
  75. TExpandedRangeBoundary Min(TExpandedRangeBoundary a, TExpandedRangeBoundary b, ICompare* cmp) {
  76. return cmp->Less(a.Value, b.Value) ? a : b;
  77. }
  78. i32 GetInfSign(bool hasPrefix, bool isIncluded, bool isLeft) {
  79. if (!hasPrefix || isIncluded) {
  80. return (isLeft ? -1 : 1);
  81. }
  82. return (isLeft ? 1 : -1);
  83. }
  84. TExpandedRangeBoundary ExpandRangeBoundary(TUnboxedValue value, bool left) {
  85. auto elements = value.GetElements();
  86. auto elementsCount = value.GetListLength();
  87. Y_ENSURE(elements);
  88. Y_ENSURE(elementsCount >= 3 && elementsCount % 2 == 1);
  89. TExpandedRangeBoundary result;
  90. result.Value = value;
  91. const bool hasPrefix = elements[0].Get<i32>() == 0;
  92. const bool isIncluded = elements[elementsCount - 1].Get<i32>() != 0;
  93. for (size_t i = 0; i < elementsCount - 1; i += 2) {
  94. i32 inf = elements[i].Get<i32>();
  95. MKQL_ENSURE(inf == 0 || inf == GetInfSign(hasPrefix, isIncluded, left),
  96. "Invalid value for range boundary inf marker: " << inf << " at position " << i);
  97. MKQL_ENSURE((inf != 0) ^ bool(elements[i + 1]),
  98. "Value does not match inf marker: " << inf << " at position " << i);
  99. }
  100. result.Components.assign(elements, elements + elementsCount);
  101. result.Included = result.Components.back().Get<i32>();
  102. MKQL_ENSURE(!result.Included || result.Included == (left ? -1 : 1),
  103. "Invalid value for range boundary last element: " << result.Included);
  104. return result;
  105. }
  106. TExpandedRange ExpandRange(TUnboxedValue value) {
  107. auto elements = value.GetElements();
  108. auto elementsCount = value.GetListLength();
  109. Y_ENSURE(elements);
  110. Y_ENSURE(elementsCount == 2);
  111. TExpandedRange result;
  112. result.Left = ExpandRangeBoundary(elements[0], true);
  113. result.Right = ExpandRangeBoundary(elements[1], false);
  114. Y_ENSURE(result.Left.Components.size() == result.Right.Components.size());
  115. bool seenInfRange = false;
  116. for (size_t i = 0; i < result.Left.Components.size() - 1; i += 2) {
  117. if (result.Left.Components[i].Get<i32>() && result.Right.Components[i].Get<i32>()) {
  118. seenInfRange = true;
  119. } else {
  120. MKQL_ENSURE(!seenInfRange, "Non inf component follows inf component at position " << i);
  121. }
  122. }
  123. return result;
  124. }
  125. size_t GetFiniteComponentsCount(const TExpandedRangeBoundary& boundary) {
  126. size_t result = 0;
  127. for (size_t i = 0; i < boundary.Components.size() - 1; i += 2) {
  128. if (boundary.Components[i].Get<i32>() != 0) {
  129. break;
  130. }
  131. result += 1;
  132. }
  133. return result;
  134. }
  135. template<typename T>
  136. bool IsAdjacentNumericValues(TUnboxedValue left, TUnboxedValue right) {
  137. T l = left.Get<T>();
  138. T r = right.Get<T>();
  139. Y_ENSURE(l < r);
  140. return l + 1 == r;
  141. }
  142. bool CanConvertToPointRange(const TExpandedRange& range, const TRangeTypeInfo& typeInfo) {
  143. if (!(range.Left.Included && !range.Right.Included ||
  144. !range.Left.Included && range.Right.Included))
  145. {
  146. return false;
  147. }
  148. const size_t compsCount = GetFiniteComponentsCount(range.Left);
  149. if (compsCount == 0 || GetFiniteComponentsCount(range.Right) != compsCount) {
  150. return false;
  151. }
  152. const size_t lastCompIdx = 2 * (compsCount - 1) + 1;
  153. // check for suitable type
  154. TType* baseType = RemoveAllOptionals(static_cast<TTupleType*>(typeInfo.BoundaryType)->GetElementType(lastCompIdx));
  155. auto slot = baseType->IsData() ? static_cast<TDataType*>(baseType)->GetDataSlot() : TMaybe<EDataSlot>{};
  156. if (!slot || !(GetDataTypeInfo(*slot).Features & (NUdf::EDataTypeFeatures::IntegralType | NUdf::EDataTypeFeatures::DateType))) {
  157. return false;
  158. }
  159. // all components before last should be equal
  160. for (size_t i = 1; i < lastCompIdx; i += 2) {
  161. if (typeInfo.ComponentsCompare[i]->Compare(range.Left.Components[i], range.Right.Components[i])) {
  162. return false;
  163. }
  164. }
  165. auto left = range.Left.Components[lastCompIdx];
  166. auto right = range.Right.Components[lastCompIdx];
  167. if (!left.HasValue() || !right.HasValue()) {
  168. return false;
  169. }
  170. switch (*slot) {
  171. case EDataSlot::Int8: return IsAdjacentNumericValues<i8>(left, right);
  172. case EDataSlot::Uint8: return IsAdjacentNumericValues<ui8>(left, right);
  173. case EDataSlot::Int16: return IsAdjacentNumericValues<i16>(left, right);
  174. case EDataSlot::Uint16: return IsAdjacentNumericValues<ui16>(left, right);
  175. case EDataSlot::Int32: return IsAdjacentNumericValues<i32>(left, right);
  176. case EDataSlot::Uint32: return IsAdjacentNumericValues<ui32>(left, right);
  177. case EDataSlot::Int64: return IsAdjacentNumericValues<i64>(left, right);
  178. case EDataSlot::Uint64: return IsAdjacentNumericValues<ui64>(left, right);
  179. case EDataSlot::Date: return IsAdjacentNumericValues<ui16>(left, right);
  180. case EDataSlot::Date32: return IsAdjacentNumericValues<i32>(left, right);
  181. case EDataSlot::Datetime: return IsAdjacentNumericValues<ui32>(left, right);
  182. case EDataSlot::Timestamp: return IsAdjacentNumericValues<ui64>(left, right);
  183. case EDataSlot::Datetime64: return IsAdjacentNumericValues<i64>(left, right);
  184. case EDataSlot::Timestamp64: return IsAdjacentNumericValues<i64>(left, right);
  185. default: break;
  186. }
  187. MKQL_ENSURE(false, "Unsupported type: " << *slot);
  188. }
  189. bool RangeIsEmpty(const TExpandedRange& range, const TRangeTypeInfo& typeInfo) {
  190. if (typeInfo.BoundaryCompare->Compare(range.Left.Value, range.Right.Value) >= 0) {
  191. // left >= right
  192. return true;
  193. }
  194. Y_ENSURE(typeInfo.ComponentsCompare.size() == range.Left.Components.size());
  195. // range is not empty if components are not equal
  196. for (size_t i = 0; i < typeInfo.ComponentsCompare.size() - 1; ++i) {
  197. if (typeInfo.ComponentsCompare[i]->Compare(range.Left.Components[i], range.Right.Components[i])) {
  198. return false;
  199. }
  200. }
  201. // all component are equal, and range is empty if any side is excluded
  202. return range.Left.Components.back().Get<i32>() == 0 || range.Right.Components.back().Get<i32>() == 0;
  203. }
  204. bool RangeCanMerge(const TExpandedRange& a, const TExpandedRange& b, const TRangeTypeInfo& typeInfo) {
  205. // It is assumed that a <= b here
  206. // < { > }
  207. // a.Left b.Left a.Right b.Right
  208. TExpandedRange intersection = { b.Left, a.Right };
  209. int cmp = typeInfo.BoundaryCompare->Compare(intersection.Left.Value, intersection.Right.Value);
  210. if (cmp > 0) {
  211. return false;
  212. }
  213. const auto& lefts = intersection.Left.Components;
  214. const auto& rights = intersection.Right.Components;
  215. bool leftIncluded = lefts.back().Get<i32>() != 0;
  216. bool rightIncluded = rights.back().Get<i32>() != 0;
  217. for (size_t i = 0; i < lefts.size() - 1; i += 2) {
  218. auto infCmp = typeInfo.ComponentsCompare[i].Get();
  219. auto compCmp = typeInfo.ComponentsCompare[i + 1].Get();
  220. auto infCompareRes = infCmp->Compare(lefts[i], rights[i]);
  221. Y_ENSURE(infCompareRes <= 0);
  222. if (infCompareRes < 0) {
  223. return true;
  224. }
  225. auto componentCompareRes = compCmp->Compare(lefts[i + 1], rights[i + 1]);
  226. Y_ENSURE(componentCompareRes <= 0);
  227. if (componentCompareRes < 0) {
  228. return true;
  229. }
  230. }
  231. return leftIncluded || rightIncluded;
  232. }
  233. class TRangeComputeBase {
  234. public:
  235. TRangeComputeBase(TComputationMutables&, TComputationNodePtrVector&& lists, std::vector<TRangeTypeInfo>&& typeInfos)
  236. : Lists(std::move(lists)), TypeInfos(std::move(typeInfos))
  237. {
  238. Y_ENSURE(Lists.size() == TypeInfos.size());
  239. Y_ENSURE(!Lists.empty());
  240. }
  241. protected:
  242. std::vector<TUnboxedValueQueue> ExpandLists(TComputationContext& ctx) const {
  243. TUnboxedValueVector lists;
  244. lists.reserve(Lists.size());
  245. for (auto& list : Lists) {
  246. lists.emplace_back(list->GetValue(ctx));
  247. }
  248. std::vector<TUnboxedValueQueue> expandedLists;
  249. for (size_t i = 0; i < lists.size(); ++i) {
  250. expandedLists.emplace_back();
  251. TThresher<false>::DoForEachItem(lists[i],
  252. [&] (NUdf::TUnboxedValue&& item) {
  253. expandedLists.back().emplace_back(std::move(item));
  254. }
  255. );
  256. NormalizeRanges(expandedLists.back(), TypeInfos[i]);
  257. }
  258. return expandedLists;
  259. }
  260. private:
  261. template<typename TContainer>
  262. static void NormalizeRanges(TContainer& ranges, const TRangeTypeInfo& typeInfo) {
  263. auto rangeLess = [&](const TUnboxedValuePod& a, const TUnboxedValuePod& b) {
  264. return typeInfo.RangeCompare->Less(a, b);
  265. };
  266. auto rangeEqual = [&](const TUnboxedValuePod& a, const TUnboxedValuePod& b) {
  267. return typeInfo.RangeCompare->Compare(a, b) == 0;
  268. };
  269. for (size_t i = 1; i < ranges.size(); ++i) {
  270. if (rangeLess(ranges[i], ranges[i - 1])) {
  271. std::sort(ranges.begin(), ranges.end(), rangeLess);
  272. break;
  273. }
  274. }
  275. ranges.erase(
  276. std::remove_if(ranges.begin(), ranges.end(),
  277. [&](const TUnboxedValue& range) { return RangeIsEmpty(ExpandRange(range), typeInfo); }),
  278. ranges.end());
  279. ranges.erase(std::unique(ranges.begin(), ranges.end(), rangeEqual), ranges.end());
  280. }
  281. protected:
  282. const TComputationNodePtrVector Lists;
  283. const std::vector<TRangeTypeInfo> TypeInfos;
  284. };
  285. class TRangeUnionWrapper : public TMutableComputationNode<TRangeUnionWrapper>, public TRangeComputeBase {
  286. typedef TMutableComputationNode<TRangeUnionWrapper> TBaseComputation;
  287. public:
  288. TRangeUnionWrapper(TComputationMutables& mutables, TComputationNodePtrVector&& lists, std::vector<TRangeTypeInfo>&& typeInfos)
  289. : TBaseComputation(mutables)
  290. , TRangeComputeBase(mutables, std::move(lists), std::move(typeInfos))
  291. {}
  292. NUdf::TUnboxedValuePod DoCalculate(TComputationContext& ctx) const {
  293. TUnboxedValueVector mergedLists;
  294. auto expandedLists = ExpandLists(ctx);
  295. auto comparator = [&](size_t l, size_t r) { return TypeInfos.front().RangeCompare->Less(expandedLists[r].front(), expandedLists[l].front()); };
  296. std::priority_queue<size_t, std::vector<size_t>, decltype(comparator)> queue{comparator};
  297. for (size_t i = 0; i < expandedLists.size(); ++i) {
  298. if (!expandedLists[i].empty()) {
  299. queue.push(i);
  300. }
  301. }
  302. while (!queue.empty()) {
  303. auto argMin = queue.top();
  304. queue.pop();
  305. auto& from = expandedLists[argMin];
  306. if (!RangeIsEmpty(ExpandRange(from.front()), TypeInfos.front())) {
  307. mergedLists.emplace_back(std::move(from.front()));
  308. }
  309. from.pop_front();
  310. if (!from.empty()) {
  311. queue.push(argMin);
  312. }
  313. }
  314. TUnboxedValueVector unionList;
  315. if (!mergedLists.empty()) {
  316. unionList.push_back(mergedLists.front());
  317. auto current = ExpandRange(unionList.back());
  318. for (size_t i = 1; i < mergedLists.size(); ++i) {
  319. auto toUnion = ExpandRange(mergedLists[i]);
  320. if (RangeCanMerge(current, toUnion, TypeInfos.front())) {
  321. current = { current.Left, Max(current.Right, toUnion.Right, TypeInfos.front().BoundaryCompare.Get()) };
  322. TUnboxedValueVector newValue = { current.Left.Value, current.Right.Value };
  323. unionList.back() = ctx.HolderFactory.VectorAsArray(newValue);
  324. } else {
  325. unionList.emplace_back(std::move(mergedLists[i]));
  326. current = ExpandRange(unionList.back());
  327. }
  328. }
  329. }
  330. TDefaultListRepresentation res;
  331. for (auto& item : unionList) {
  332. res = res.Append(std::move(item));
  333. }
  334. return ctx.HolderFactory.CreateDirectListHolder(std::move(res));
  335. }
  336. private:
  337. void RegisterDependencies() const final {
  338. std::for_each(Lists.cbegin(), Lists.cend(), std::bind(&TRangeUnionWrapper::DependsOn, this, std::placeholders::_1));
  339. }
  340. };
  341. class TRangeIntersectWrapper : public TMutableComputationNode<TRangeIntersectWrapper>, public TRangeComputeBase {
  342. typedef TMutableComputationNode<TRangeIntersectWrapper> TBaseComputation;
  343. public:
  344. TRangeIntersectWrapper(TComputationMutables& mutables, TComputationNodePtrVector&& lists, std::vector<TRangeTypeInfo>&& typeInfos)
  345. : TBaseComputation(mutables)
  346. , TRangeComputeBase(mutables, std::move(lists), std::move(typeInfos))
  347. {}
  348. NUdf::TUnboxedValuePod DoCalculate(TComputationContext& ctx) const {
  349. TUnboxedValueVector mergedLists;
  350. auto expandedLists = ExpandLists(ctx);
  351. Y_ENSURE(!expandedLists.empty());
  352. TUnboxedValueQueue intersected = std::move(expandedLists.front());
  353. for (size_t i = 1; i < expandedLists.size(); ++i) {
  354. DoIntersect(ctx, intersected, std::move(expandedLists[i]));
  355. }
  356. TDefaultListRepresentation res;
  357. for (auto& item : intersected) {
  358. res = res.Append(std::move(item));
  359. }
  360. return ctx.HolderFactory.CreateDirectListHolder(std::move(res));
  361. }
  362. private:
  363. void RegisterDependencies() const final {
  364. std::for_each(Lists.cbegin(), Lists.cend(), std::bind(&TRangeIntersectWrapper::DependsOn, this, std::placeholders::_1));
  365. }
  366. void DoIntersect(TComputationContext& ctx, TUnboxedValueQueue& current, TUnboxedValueQueue&& next) const {
  367. TUnboxedValueQueue result;
  368. auto cmp = TypeInfos.front().RangeCompare.Get();
  369. auto boundaryCmp = TypeInfos.front().BoundaryCompare.Get();
  370. while (!current.empty() && !next.empty()) {
  371. TUnboxedValueQueue* minInput;
  372. TUnboxedValueQueue* maxInput;
  373. if (cmp->Less(current.front(), next.front())) {
  374. minInput = &current;
  375. maxInput = &next;
  376. } else {
  377. minInput = &next;
  378. maxInput = &current;
  379. }
  380. auto minRange = ExpandRange(minInput->front());
  381. auto maxRange = ExpandRange(maxInput->front());
  382. TExpandedRange intersected;
  383. intersected.Left = maxRange.Left;
  384. intersected.Right = Min(minRange.Right, maxRange.Right, TypeInfos.front().BoundaryCompare.Get());
  385. if (!RangeIsEmpty(intersected, TypeInfos.front())) {
  386. TUnboxedValueVector newValue = { intersected.Left.Value, intersected.Right.Value };
  387. result.push_back(ctx.HolderFactory.VectorAsArray(newValue));
  388. if (boundaryCmp->Less(minRange.Right.Value, maxRange.Right.Value)) {
  389. minInput->pop_front();
  390. } else {
  391. maxInput->pop_front();
  392. }
  393. } else {
  394. minInput->pop_front();
  395. }
  396. }
  397. std::swap(current, result);
  398. }
  399. };
  400. class TRangeMultiplyWrapper : public TMutableComputationNode<TRangeMultiplyWrapper>, public TRangeComputeBase {
  401. typedef TMutableComputationNode<TRangeMultiplyWrapper> TBaseComputation;
  402. public:
  403. TRangeMultiplyWrapper(TComputationMutables& mutables, IComputationNode* limit, TComputationNodePtrVector&& lists, std::vector<TRangeTypeInfo>&& typeInfos)
  404. : TBaseComputation(mutables)
  405. , TRangeComputeBase(mutables, std::move(lists), std::move(typeInfos))
  406. , Limit(limit)
  407. {}
  408. NUdf::TUnboxedValuePod DoCalculate(TComputationContext& ctx) const {
  409. const ui64 limit = Limit->GetValue(ctx).Get<ui64>();
  410. TUnboxedValueVector mergedLists;
  411. auto expandedLists = ExpandLists(ctx);
  412. Y_ENSURE(!expandedLists.empty());
  413. if (expandedLists.size() == 1 && expandedLists.front().size() > limit) {
  414. return FullRange(ctx);
  415. }
  416. TUnboxedValueQueue current = std::move(expandedLists.front());
  417. std::vector<ICompare*> currentComponentsCompare;
  418. currentComponentsCompare.reserve(TypeInfos.front().ComponentsCompare.size());
  419. for (const auto& comp : TypeInfos.front().ComponentsCompare) {
  420. currentComponentsCompare.push_back(comp.Get());
  421. }
  422. for (size_t i = 1; i < expandedLists.size(); ++i) {
  423. if (expandedLists[i].empty()) {
  424. return ctx.HolderFactory.GetEmptyContainerLazy();
  425. }
  426. if (!DoMultiply(ctx, limit, current, expandedLists[i], currentComponentsCompare, TypeInfos[i])) {
  427. if (i > 0) {
  428. PadInfs(ctx, current, i);
  429. break;
  430. } else {
  431. return FullRange(ctx);
  432. }
  433. }
  434. }
  435. TDefaultListRepresentation res;
  436. for (auto& item : current) {
  437. res = res.Append(std::move(item));
  438. }
  439. return ctx.HolderFactory.CreateDirectListHolder(std::move(res));
  440. }
  441. private:
  442. void RegisterDependencies() const final {
  443. DependsOn(Limit);
  444. std::for_each(Lists.cbegin(), Lists.cend(), std::bind(&TRangeMultiplyWrapper::DependsOn, this, std::placeholders::_1));
  445. }
  446. void PadInfs(TComputationContext& ctx, TUnboxedValueQueue& current, size_t currentPrefix) const {
  447. size_t extraColumns = 0;
  448. for (size_t i = 0; i < TypeInfos.size(); ++i) {
  449. const auto& ti = TypeInfos[i];
  450. Y_ENSURE(ti.Components.size() % 2 == 1);
  451. if (currentPrefix <= i) {
  452. extraColumns += (ti.Components.size() - 1) / 2;
  453. }
  454. }
  455. TUnboxedValueQueue result;
  456. for (const auto& c : current) {
  457. auto curr = ExpandRange(c);
  458. result.push_back(AppendInfs(ctx, curr, extraColumns));
  459. }
  460. std::swap(current, result);
  461. }
  462. bool DoMultiply(TComputationContext& ctx, ui64 limit, TUnboxedValueQueue& current, const TUnboxedValueQueue& next,
  463. std::vector<ICompare*>& currentCmps, const TRangeTypeInfo& nextTypeInfo) const
  464. {
  465. TUnboxedValueQueue result;
  466. Y_ENSURE(currentCmps.size() >= 3 && currentCmps.size() % 2 == 1);
  467. size_t extraColumns = (nextTypeInfo.ComponentsCompare.size() - 1) / 2;
  468. for (const auto& c : current) {
  469. auto curr = ExpandRange(c);
  470. if (RangeIsPoint(curr, currentCmps)) {
  471. if (result.size() + next.size() > limit) {
  472. return false;
  473. }
  474. for (const auto& n : next) {
  475. result.push_back(Append(ctx, curr, ExpandRange(n)));
  476. }
  477. } else {
  478. if (result.size() + 1 > limit) {
  479. return false;
  480. }
  481. result.push_back(AppendInfs(ctx, curr, extraColumns));
  482. }
  483. }
  484. currentCmps.pop_back();
  485. for (const auto& comp : nextTypeInfo.ComponentsCompare) {
  486. currentCmps.push_back(comp.Get());
  487. }
  488. std::swap(current, result);
  489. return true;
  490. }
  491. static bool RangeIsPoint(const TExpandedRange& range, const std::vector<ICompare*>& cmps) {
  492. Y_ENSURE(range.Left.Components.size() == cmps.size());
  493. TUnboxedValue leftIncluded = range.Left.Components.back();
  494. TUnboxedValue rightIncluded = range.Right.Components.back();
  495. if (!leftIncluded.Get<i32>() || !rightIncluded.Get<i32>()) {
  496. return false;
  497. }
  498. bool allEqual = true;
  499. for (size_t i = 0; allEqual && i < cmps.size() - 1; ++i) {
  500. allEqual = allEqual &&
  501. cmps[i]->Compare(range.Left.Components[i], range.Right.Components[i]) == 0;
  502. }
  503. return allEqual;
  504. }
  505. static TUnboxedValuePod Append(TComputationContext& ctx, const TExpandedRange& first, const TExpandedRange& second) {
  506. auto left = Append(ctx, first.Left, second.Left);
  507. auto right = Append(ctx, first.Right, second.Right);
  508. TUnboxedValueVector range = { left, right };
  509. return ctx.HolderFactory.VectorAsArray(range);
  510. }
  511. static TUnboxedValuePod Append(TComputationContext& ctx, const TExpandedRangeBoundary& first,
  512. const TExpandedRangeBoundary& second)
  513. {
  514. TUnboxedValueVector components(first.Components.begin(), first.Components.end() - 1);
  515. components.insert(components.end(), second.Components.begin(), second.Components.end());
  516. if (second.Components.front().Get<i32>() != 0) {
  517. // preserve original include/exclude flag when appending nulls (+-inf)
  518. components.back() = first.Components.back();
  519. }
  520. return ctx.HolderFactory.VectorAsArray(components);
  521. }
  522. static TUnboxedValuePod AppendInfs(TComputationContext& ctx, const TExpandedRange& range, size_t count) {
  523. auto left = AppendInfs(ctx, true, range.Left, count);
  524. auto right = AppendInfs(ctx, false, range.Right, count);
  525. TUnboxedValueVector newRange = { left, right };
  526. return ctx.HolderFactory.VectorAsArray(newRange);
  527. }
  528. static TUnboxedValuePod AppendInfs(TComputationContext& ctx, bool isLeft, const TExpandedRangeBoundary& boundary, size_t count) {
  529. Y_ENSURE(!boundary.Components.empty());
  530. TUnboxedValueVector components(boundary.Components.begin(), boundary.Components.end() - 1);
  531. const bool hasPrefix = boundary.Components.size() > 1 && boundary.Components.front().Get<i32>() == 0;
  532. const bool isIncluded = boundary.Components.back().Get<i32>() != 0;
  533. for (size_t i = 0; i < count; ++i) {
  534. components.push_back(TUnboxedValuePod(GetInfSign(hasPrefix, isIncluded, isLeft)));
  535. components.emplace_back();
  536. }
  537. components.push_back(boundary.Components.back());
  538. return ctx.HolderFactory.VectorAsArray(components);
  539. }
  540. TUnboxedValuePod FullRange(TComputationContext& ctx) const {
  541. size_t columnCount = 0;
  542. for (const auto& ti : TypeInfos) {
  543. Y_ENSURE(ti.Components.size() % 2 == 1);
  544. columnCount += (ti.Components.size() - 1) / 2;
  545. }
  546. TExpandedRange range;
  547. range.Left.Components.push_back(TUnboxedValuePod(0));
  548. range.Right.Components.push_back(TUnboxedValuePod(0));
  549. TUnboxedValueVector result = { AppendInfs(ctx, range, columnCount) };
  550. return ctx.HolderFactory.VectorAsArray(result);
  551. }
  552. IComputationNode* const Limit;
  553. };
  554. class TRangeFinalizeWrapper : public TMutableComputationNode<TRangeFinalizeWrapper>, public TRangeComputeBase {
  555. typedef TMutableComputationNode<TRangeFinalizeWrapper> TBaseComputation;
  556. public:
  557. TRangeFinalizeWrapper(TComputationMutables& mutables, TComputationNodePtrVector&& lists, std::vector<TRangeTypeInfo>&& typeInfos)
  558. : TBaseComputation(mutables)
  559. , TRangeComputeBase(mutables, std::move(lists), std::move(typeInfos))
  560. {}
  561. NUdf::TUnboxedValuePod DoCalculate(TComputationContext& ctx) const {
  562. auto expandedLists = ExpandLists(ctx);
  563. Y_ENSURE(expandedLists.size() == 1);
  564. TDefaultListRepresentation res;
  565. for (auto& item : expandedLists.front()) {
  566. auto range = ExpandRange(item);
  567. if (CanConvertToPointRange(range, TypeInfos.front())) {
  568. if (range.Left.Included) {
  569. range.Right = range.Left;
  570. } else {
  571. range.Left = range.Right;
  572. }
  573. }
  574. auto left = ConvertFromInternal(range.Left.Components, ctx);
  575. auto right = ConvertFromInternal(range.Right.Components, ctx);
  576. TUnboxedValueVector rangeVector = { left, right };
  577. res = res.Append(ctx.HolderFactory.VectorAsArray(rangeVector));
  578. }
  579. return ctx.HolderFactory.CreateDirectListHolder(std::move(res));
  580. }
  581. private:
  582. void RegisterDependencies() const final {
  583. std::for_each(Lists.cbegin(), Lists.cend(), std::bind(&TRangeFinalizeWrapper::DependsOn, this, std::placeholders::_1));
  584. }
  585. TUnboxedValue ConvertFromInternal(const TUnboxedValueVector& boundaryComponents, TComputationContext& ctx) const {
  586. size_t compsSize = boundaryComponents.size();
  587. Y_ENSURE(compsSize >= 3);
  588. Y_ENSURE(compsSize % 2 == 1);
  589. TUnboxedValueVector converted;
  590. for (size_t i = 0; i < compsSize - 1; ++i) {
  591. if (i % 2 == 1) {
  592. converted.push_back(boundaryComponents[i]);
  593. }
  594. }
  595. i32 included = boundaryComponents.back().Get<i32>();
  596. if (included != 0) {
  597. included = 1;
  598. }
  599. converted.push_back(TUnboxedValuePod(included));
  600. return ctx.HolderFactory.VectorAsArray(converted);
  601. }
  602. };
  603. enum ERangeOp {
  604. RANGE_UNION,
  605. RANGE_INTERSECT,
  606. RANGE_MULTIPLY,
  607. RANGE_FINALIZE,
  608. };
  609. IComputationNode* WrapRange(ERangeOp func, TCallable& callable, const TComputationNodeFactoryContext& ctx) {
  610. TComputationNodePtrVector lists;
  611. std::vector<TRangeTypeInfo> typeInfos;
  612. size_t listsStart = 0;
  613. if (func == RANGE_FINALIZE) {
  614. MKQL_ENSURE(callable.GetInputsCount() == 1, "Expecting single argument");
  615. } else if (func == RANGE_MULTIPLY) {
  616. MKQL_ENSURE(callable.GetInputsCount() > 1, "Expecting at least two arguments");
  617. listsStart = 1;
  618. auto limitType = callable.GetInput(0).GetStaticType();
  619. MKQL_ENSURE(limitType->IsData() && static_cast<TDataType*>(limitType)->GetSchemeType() == NUdf::TDataType<ui64>::Id,
  620. "Expecting Uint64 as first argument");
  621. } else {
  622. MKQL_ENSURE(callable.GetInputsCount() > 0, "Expecting at least one argument");
  623. }
  624. lists.reserve(callable.GetInputsCount());
  625. typeInfos.reserve(callable.GetInputsCount());
  626. for (ui32 i = listsStart; i < callable.GetInputsCount(); ++i) {
  627. auto type = callable.GetInput(i).GetStaticType();
  628. MKQL_ENSURE(type->IsList(), "Expecting list as argument");
  629. auto rangeType = static_cast<TListType*>(type)->GetItemType();
  630. if (func != RANGE_MULTIPLY) {
  631. MKQL_ENSURE(type->IsSameType(*callable.GetInput(listsStart).GetStaticType()), "All arguments must be of same type");
  632. }
  633. lists.push_back(LocateNode(ctx.NodeLocator, callable, i));
  634. typeInfos.push_back(ExtractTypes(rangeType));
  635. }
  636. switch (func) {
  637. case RANGE_UNION:
  638. return new TRangeUnionWrapper(ctx.Mutables, std::move(lists), std::move(typeInfos));
  639. case RANGE_INTERSECT:
  640. return new TRangeIntersectWrapper(ctx.Mutables, std::move(lists), std::move(typeInfos));
  641. case RANGE_MULTIPLY: {
  642. auto limit = LocateNode(ctx.NodeLocator, callable, 0);
  643. return new TRangeMultiplyWrapper(ctx.Mutables, limit, std::move(lists), std::move(typeInfos));
  644. }
  645. case RANGE_FINALIZE:
  646. return new TRangeFinalizeWrapper(ctx.Mutables, std::move(lists), std::move(typeInfos));
  647. default:
  648. Y_ENSURE(!"Unknown callable");
  649. }
  650. }
  651. class TRangeCreateWrapper : public TMutableComputationNode<TRangeCreateWrapper> {
  652. typedef TMutableComputationNode<TRangeCreateWrapper> TBaseComputation;
  653. public:
  654. TRangeCreateWrapper(TComputationMutables& mutables, IComputationNode* list)
  655. : TBaseComputation(mutables)
  656. , List(list)
  657. {}
  658. TUnboxedValuePod DoCalculate(TComputationContext& ctx) const {
  659. TUnboxedValue list = List->GetValue(ctx);
  660. TDefaultListRepresentation res;
  661. TThresher<false>::DoForEachItem(list,
  662. [&] (NUdf::TUnboxedValue&& item) {
  663. auto left = ConvertToInternal(item.GetElement(0), true, ctx);
  664. auto right = ConvertToInternal(item.GetElement(1), false, ctx);
  665. TUnboxedValueVector rangeVector = { left, right };
  666. auto range = ctx.HolderFactory.VectorAsArray(rangeVector);
  667. res = res.Append(std::move(range));
  668. }
  669. );
  670. return ctx.HolderFactory.CreateDirectListHolder(std::move(res));
  671. }
  672. private:
  673. void RegisterDependencies() const final {
  674. DependsOn(List);
  675. }
  676. TUnboxedValue ConvertToInternal(TUnboxedValue boundary, bool isLeft, TComputationContext& ctx) const {
  677. auto elements = boundary.GetElements();
  678. auto elementsCount = boundary.GetListLength();
  679. Y_ENSURE(elements);
  680. Y_ENSURE(elementsCount >= 2);
  681. TUnboxedValueVector converted;
  682. i32 included = elements[elementsCount - 1].Get<i32>();
  683. const auto hasPrefix = bool(elements[0]);
  684. bool tail = false;
  685. for (size_t i = 0; i < elementsCount - 1; ++i) {
  686. i32 infValue;
  687. tail = tail || !elements[i];
  688. if (elements[i]) {
  689. MKQL_ENSURE(!tail, "Invalid boundary value - non null element follows null");
  690. infValue = 0;
  691. } else {
  692. infValue = GetInfSign(hasPrefix, included, isLeft);
  693. }
  694. converted.push_back(TUnboxedValuePod(infValue));
  695. converted.push_back(elements[i]);
  696. }
  697. included = included ? (isLeft ? -1 : 1) : 0;
  698. converted.push_back(TUnboxedValuePod(included));
  699. return ctx.HolderFactory.VectorAsArray(converted);
  700. }
  701. IComputationNode* const List;
  702. };
  703. } // namespace
  704. IComputationNode* WrapRangeCreate(TCallable& callable, const TComputationNodeFactoryContext& ctx) {
  705. MKQL_ENSURE(callable.GetInputsCount() == 1, "Expecting exactly one argument");
  706. auto list = callable.GetInput(0);
  707. auto itemType = static_cast<TListType*>(list.GetStaticType())->GetItemType();
  708. MKQL_ENSURE(itemType->IsTuple(), "Expecting list of tuples");
  709. auto tupleType = static_cast<TTupleType*>(itemType);
  710. MKQL_ENSURE(tupleType->GetElementsCount() == 2,
  711. "Expecting list ot 2-element tuples, got: " << tupleType->GetElementsCount() << " elements");
  712. MKQL_ENSURE(tupleType->GetElementType(0)->IsSameType(*tupleType->GetElementType(1)),
  713. "Expecting list ot 2-element tuples of same type");
  714. MKQL_ENSURE(tupleType->GetElementType(0)->IsTuple(),
  715. "Expecting range boundary to be tuple");
  716. auto boundaryType = static_cast<TTupleType*>(tupleType->GetElementType(0));
  717. MKQL_ENSURE(boundaryType->GetElementsCount() >= 2,
  718. "Range boundary should have at least 2 components, got: " << boundaryType->GetElementsCount());
  719. return new TRangeCreateWrapper(ctx.Mutables, LocateNode(ctx.NodeLocator, callable, 0));
  720. }
  721. IComputationNode* WrapRangeUnion(TCallable& callable, const TComputationNodeFactoryContext& ctx) {
  722. return WrapRange(RANGE_UNION, callable, ctx);
  723. }
  724. IComputationNode* WrapRangeIntersect(TCallable& callable, const TComputationNodeFactoryContext& ctx) {
  725. return WrapRange(RANGE_INTERSECT, callable, ctx);
  726. }
  727. IComputationNode* WrapRangeMultiply(TCallable& callable, const TComputationNodeFactoryContext& ctx) {
  728. return WrapRange(RANGE_MULTIPLY, callable, ctx);
  729. }
  730. IComputationNode* WrapRangeFinalize(TCallable& callable, const TComputationNodeFactoryContext& ctx) {
  731. return WrapRange(RANGE_FINALIZE, callable, ctx);
  732. }
  733. }
  734. }