utils.cpp 7.6 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192
  1. #include "utils.h"
  2. #include <yql/essentials/public/purecalc/common/names.h>
  3. #include <yql/essentials/core/yql_expr_type_annotation.h>
  4. using namespace NYql;
  5. using namespace NYql::NPureCalc;
  6. TExprNode::TPtr NYql::NPureCalc::NodeFromBlocks(
  7. const TPositionHandle& pos,
  8. const TStructExprType* structType,
  9. TExprContext& ctx
  10. ) {
  11. const auto items = structType->GetItems();
  12. Y_ENSURE(items.size() > 0);
  13. return ctx.Builder(pos)
  14. .Lambda()
  15. .Param("stream")
  16. .Callable(0, "FromFlow")
  17. .Callable(0, "NarrowMap")
  18. .Callable(0, "ToFlow")
  19. .Callable(0, "WideFromBlocks")
  20. .Callable(0, "FromFlow")
  21. .Callable(0, "ExpandMap")
  22. .Callable(0, "ToFlow")
  23. .Arg(0, "stream")
  24. .Seal()
  25. .Lambda(1)
  26. .Param("item")
  27. .Do([&](TExprNodeBuilder& lambda) -> TExprNodeBuilder& {
  28. ui32 i = 0;
  29. for (const auto& item : items) {
  30. lambda.Callable(i++, "Member")
  31. .Arg(0, "item")
  32. .Atom(1, item->GetName())
  33. .Seal();
  34. }
  35. lambda.Callable(i, "Member")
  36. .Arg(0, "item")
  37. .Atom(1, PurecalcBlockColumnLength)
  38. .Seal();
  39. return lambda;
  40. })
  41. .Seal()
  42. .Seal()
  43. .Seal()
  44. .Seal()
  45. .Seal()
  46. .Lambda(1)
  47. .Params("fields", items.size())
  48. .Callable("AsStruct")
  49. .Do([&](TExprNodeBuilder& parent) -> TExprNodeBuilder& {
  50. ui32 i = 0;
  51. for (const auto& item : items) {
  52. parent.List(i)
  53. .Atom(0, item->GetName())
  54. .Arg(1, "fields", i++)
  55. .Seal();
  56. }
  57. return parent;
  58. })
  59. .Seal()
  60. .Seal()
  61. .Seal()
  62. .Seal()
  63. .Seal()
  64. .Build();
  65. }
  66. TExprNode::TPtr NYql::NPureCalc::NodeToBlocks(
  67. const TPositionHandle& pos,
  68. const TStructExprType* structType,
  69. TExprContext& ctx
  70. ) {
  71. const auto items = structType->GetItems();
  72. Y_ENSURE(items.size() > 0);
  73. // Static assert to ensure backward compatible change: if the
  74. // constant below is true, both input and output types of
  75. // WideToBlocks callable have to be WideStream; otherwise,
  76. // both input and output types have to be WideFlow.
  77. // FIXME: When all spots using WideToBlocks are adjusted
  78. // to work with WideStream, drop the assertion below.
  79. static_assert(!NYql::NBlockStreamIO::WideToBlocks);
  80. return ctx.Builder(pos)
  81. .Lambda()
  82. .Param("stream")
  83. .Callable("FromFlow")
  84. .Callable(0, "NarrowMap")
  85. .Callable(0, "WideToBlocks")
  86. .Callable(0, "ExpandMap")
  87. .Callable(0, "ToFlow")
  88. .Arg(0, "stream")
  89. .Seal()
  90. .Lambda(1)
  91. .Param("item")
  92. .Do([&](TExprNodeBuilder& lambda) -> TExprNodeBuilder& {
  93. ui32 i = 0;
  94. for (const auto& item : items) {
  95. lambda.Callable(i++, "Member")
  96. .Arg(0, "item")
  97. .Atom(1, item->GetName())
  98. .Seal();
  99. }
  100. return lambda;
  101. })
  102. .Seal()
  103. .Seal()
  104. .Seal()
  105. .Lambda(1)
  106. .Params("fields", items.size() + 1)
  107. .Callable("AsStruct")
  108. .Do([&](TExprNodeBuilder& parent) -> TExprNodeBuilder& {
  109. ui32 i = 0;
  110. for (const auto& item : items) {
  111. parent.List(i)
  112. .Atom(0, item->GetName())
  113. .Arg(1, "fields", i++)
  114. .Seal();
  115. }
  116. parent.List(i)
  117. .Atom(0, PurecalcBlockColumnLength)
  118. .Arg(1, "fields", i)
  119. .Seal();
  120. return parent;
  121. })
  122. .Seal()
  123. .Seal()
  124. .Seal()
  125. .Seal()
  126. .Seal()
  127. .Build();
  128. }
  129. TExprNode::TPtr NYql::NPureCalc::ApplyToIterable(
  130. const TPositionHandle& pos,
  131. const TExprNode::TPtr iterable,
  132. const TExprNode::TPtr lambda,
  133. bool wrapLMap,
  134. TExprContext& ctx
  135. ) {
  136. if (wrapLMap) {
  137. return ctx.Builder(pos)
  138. .Callable("LMap")
  139. .Add(0, iterable)
  140. .Lambda(1)
  141. .Param("stream")
  142. .Apply(lambda)
  143. .With(0, "stream")
  144. .Seal()
  145. .Seal()
  146. .Seal()
  147. .Build();
  148. } else {
  149. return ctx.Builder(pos)
  150. .Apply(lambda)
  151. .With(0, iterable)
  152. .Seal()
  153. .Build();
  154. }
  155. }
  156. const TStructExprType* NYql::NPureCalc::WrapBlockStruct(
  157. const TStructExprType* structType,
  158. TExprContext& ctx
  159. ) {
  160. TVector<const TItemExprType*> members;
  161. for (const auto& item : structType->GetItems()) {
  162. const auto blockItemType = ctx.MakeType<TBlockExprType>(item->GetItemType());
  163. members.push_back(ctx.MakeType<TItemExprType>(item->GetName(), blockItemType));
  164. }
  165. const auto scalarItemType = ctx.MakeType<TScalarExprType>(ctx.MakeType<TDataExprType>(EDataSlot::Uint64));
  166. members.push_back(ctx.MakeType<TItemExprType>(PurecalcBlockColumnLength, scalarItemType));
  167. return ctx.MakeType<TStructExprType>(members);
  168. }
  169. const TStructExprType* NYql::NPureCalc::UnwrapBlockStruct(
  170. const TStructExprType* structType,
  171. TExprContext& ctx
  172. ) {
  173. TVector<const TItemExprType*> members;
  174. for (const auto& item : structType->GetItems()) {
  175. if (item->GetName() == PurecalcBlockColumnLength) {
  176. continue;
  177. }
  178. bool isScalarUnused;
  179. const auto blockItemType = GetBlockItemType(*item->GetItemType(), isScalarUnused);
  180. members.push_back(ctx.MakeType<TItemExprType>(item->GetName(), blockItemType));
  181. }
  182. return ctx.MakeType<TStructExprType>(members);
  183. }