request.cpp 9.6 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257
  1. #include "request.h"
  2. #include <yql/essentials/providers/common/mkql/yql_provider_mkql.h>
  3. #include <yql/essentials/minikql/mkql_node_cast.h>
  4. #include <yql/essentials/minikql/mkql_node_serialization.h>
  5. #include <yql/essentials/core/yql_opt_utils.h>
  6. #include <yql/essentials/core/yql_expr_type_annotation.h>
  7. namespace NYql {
  8. using namespace NKikimr::NMiniKQL;
  9. TKernelRequestBuilder::TKernelRequestBuilder(const IFunctionRegistry& functionRegistry)
  10. : Alloc_(__LOCATION__)
  11. , Env_(Alloc_)
  12. , Pb_(Env_, functionRegistry)
  13. {
  14. Alloc_.Release();
  15. }
  16. TKernelRequestBuilder::~TKernelRequestBuilder() {
  17. Alloc_.Acquire();
  18. }
  19. ui32 TKernelRequestBuilder::AddUnaryOp(EUnaryOp op, const TTypeAnnotationNode* arg1Type, const TTypeAnnotationNode* retType) {
  20. const TGuard<TScopedAlloc> allocGuard(Alloc_);
  21. const auto returnType = MakeType(retType);
  22. Y_UNUSED(returnType);
  23. const auto arg = MakeArg(arg1Type);
  24. switch (op) {
  25. case EUnaryOp::Not:
  26. Items_.emplace_back(Pb_.BlockNot(arg));
  27. break;
  28. case EUnaryOp::Just:
  29. Items_.emplace_back(Pb_.BlockJust(arg));
  30. break;
  31. case EUnaryOp::Size:
  32. case EUnaryOp::Minus:
  33. case EUnaryOp::Abs:
  34. Items_.emplace_back(Pb_.BlockFunc(ToString(op), returnType, { arg }));
  35. break;
  36. }
  37. return Items_.size() - 1;
  38. }
  39. ui32 TKernelRequestBuilder::AddBinaryOp(EBinaryOp op, const TTypeAnnotationNode* arg1Type, const TTypeAnnotationNode* arg2Type, const TTypeAnnotationNode* retType) {
  40. const TGuard<TScopedAlloc> allocGuard(Alloc_);
  41. const auto returnType = MakeType(retType);
  42. const auto arg1 = MakeArg(arg1Type);
  43. const auto arg2 = MakeArg(arg2Type);
  44. switch (op) {
  45. case EBinaryOp::And:
  46. Items_.emplace_back(Pb_.BlockAnd(arg1, arg2));
  47. break;
  48. case EBinaryOp::Or:
  49. Items_.emplace_back(Pb_.BlockOr(arg1, arg2));
  50. break;
  51. case EBinaryOp::Xor:
  52. Items_.emplace_back(Pb_.BlockXor(arg1, arg2));
  53. break;
  54. case EBinaryOp::Coalesce:
  55. Items_.emplace_back(Pb_.BlockCoalesce(arg1, arg2));
  56. break;
  57. case EBinaryOp::Add:
  58. case EBinaryOp::Sub:
  59. case EBinaryOp::Mul:
  60. case EBinaryOp::Div:
  61. case EBinaryOp::Mod:
  62. case EBinaryOp::StartsWith:
  63. case EBinaryOp::EndsWith:
  64. case EBinaryOp::StringContains:
  65. case EBinaryOp::Equals:
  66. case EBinaryOp::NotEquals:
  67. case EBinaryOp::Less:
  68. case EBinaryOp::LessOrEqual:
  69. case EBinaryOp::Greater:
  70. case EBinaryOp::GreaterOrEqual:
  71. Items_.emplace_back(Pb_.BlockFunc(ToString(op), returnType, { arg1, arg2 }));
  72. break;
  73. }
  74. return Items_.size() - 1;
  75. }
  76. ui32 TKernelRequestBuilder::AddIf(const TTypeAnnotationNode* conditionType, const TTypeAnnotationNode* thenType, const TTypeAnnotationNode* elseType) {
  77. const TGuard<TScopedAlloc> allocGuard(Alloc_);
  78. const auto arg1 = MakeArg(conditionType);
  79. const auto arg2 = MakeArg(thenType);
  80. const auto arg3 = MakeArg(elseType);
  81. Items_.emplace_back(Pb_.BlockIf(arg1, arg2, arg3));
  82. return Items_.size() - 1;
  83. }
  84. ui32 TKernelRequestBuilder::Udf(const TString& name, bool isPolymorphic, const TTypeAnnotationNode::TListType& argTypes,
  85. const TTypeAnnotationNode* retType) {
  86. const TGuard<TScopedAlloc> allocGuard(Alloc_);
  87. std::vector<TType*> inputTypes;
  88. for (const auto& type : argTypes) {
  89. inputTypes.emplace_back(MakeType(type));
  90. }
  91. const auto userType = Pb_.NewTupleType({
  92. Pb_.NewTupleType(inputTypes),
  93. Pb_.NewEmptyStructType(),
  94. Pb_.NewEmptyTupleType()});
  95. auto udf = Pb_.Udf(isPolymorphic ? name : (name + "_BlocksImpl"), Pb_.NewVoid(), userType);
  96. TRuntimeNode::TList args;
  97. for (const auto& type : argTypes) {
  98. args.emplace_back(MakeArg(type));
  99. }
  100. auto apply = Pb_.Apply(udf, args);
  101. auto outType = MakeType(retType);
  102. Y_ENSURE(outType->IsSameType(*apply.GetStaticType()));
  103. Items_.emplace_back(apply);
  104. return Items_.size() - 1;
  105. }
  106. ui32 TKernelRequestBuilder::AddScalarApply(const TExprNode& lambda, const TTypeAnnotationNode::TListType& argTypes, TExprContext& ctx) {
  107. const TGuard<TScopedAlloc> allocGuard(Alloc_);
  108. TRuntimeNode::TList args(argTypes.size());
  109. std::transform(argTypes.cbegin(), argTypes.cend(), args.begin(), std::bind(&TKernelRequestBuilder::MakeArg, this, std::placeholders::_1));
  110. NCommon::TMkqlCommonCallableCompiler compiler;
  111. NCommon::TMkqlBuildContext compileCtx(compiler, Pb_, ctx);
  112. const auto apply = Pb_.ScalarApply(args, [&lambda, &compileCtx] (const TArrayRef<const TRuntimeNode>& args) {
  113. return MkqlBuildLambda(lambda, compileCtx, TRuntimeNode::TList(args.cbegin(), args.cend()));
  114. });
  115. Items_.emplace_back(apply);
  116. return Items_.size() - 1U;
  117. }
  118. ui32 TKernelRequestBuilder::JsonExists(const TTypeAnnotationNode* arg1Type, const TTypeAnnotationNode* arg2Type, const TTypeAnnotationNode* retType) {
  119. const TGuard<TScopedAlloc> allocGuard(Alloc_);
  120. bool isScalar = false;
  121. bool isBinaryJson = (RemoveOptionalType(NYql::GetBlockItemType(*arg1Type, isScalar))->Cast<TDataExprType>()->GetSlot() == EDataSlot::JsonDocument);
  122. auto udfName = TStringBuilder() << "Json2." << (isBinaryJson ? "JsonDocument" : "" ) << "SqlExists";
  123. auto exists = Pb_.Udf(udfName);
  124. auto parse = Pb_.Udf("Json2.Parse");
  125. auto compilePath = Pb_.Udf("Json2.CompilePath");
  126. auto outType = MakeType(retType);
  127. auto arg1 = MakeArg(arg1Type);
  128. auto arg2 = MakeArg(arg2Type);
  129. auto scalarApply = Pb_.ScalarApply({arg1, arg2}, [&](const auto& args) {
  130. auto json = args[0];
  131. auto processJson = [&](auto unpacked) {
  132. auto input = Pb_.NewOptional(isBinaryJson ? unpacked : Pb_.Apply(parse, { unpacked }));
  133. auto path = Pb_.Apply(compilePath, { args[1] });
  134. auto dictType = Pb_.NewDictType(Pb_.NewDataType(NUdf::EDataSlot::Utf8), Pb_.NewResourceType("JsonNode"), false);
  135. return Pb_.Apply(exists, { input, path, Pb_.NewDict(dictType, {}), Pb_.NewOptional(Pb_.NewDataLiteral(false)) });
  136. };
  137. if (json.GetStaticType()->IsOptional()) {
  138. return Pb_.FlatMap(json, processJson);
  139. } else {
  140. return processJson(json);
  141. }
  142. });
  143. Y_ENSURE(outType->IsSameType(*scalarApply.GetStaticType()));
  144. Items_.emplace_back(scalarApply);
  145. return Items_.size() - 1;
  146. }
  147. ui32 TKernelRequestBuilder::JsonValue(const TTypeAnnotationNode* arg1Type, const TTypeAnnotationNode* arg2Type, const TTypeAnnotationNode* retType) {
  148. const TGuard<TScopedAlloc> allocGuard(Alloc_);
  149. bool isScalar = false;
  150. bool isBinaryJson = (RemoveOptionalType(NYql::GetBlockItemType(*arg1Type, isScalar))->Cast<TDataExprType>()->GetSlot() == EDataSlot::JsonDocument);
  151. auto resultSlot = RemoveOptionalType(NYql::GetBlockItemType(*retType, isScalar))->Cast<TDataExprType>()->GetSlot();
  152. auto udfName = TStringBuilder() << "Json2." << (isBinaryJson ? "JsonDocument" : "" );
  153. if (NYql::IsDataTypeFloat(resultSlot)) {
  154. udfName << "SqlValueNumber";
  155. } else if (NYql::IsDataTypeNumeric(resultSlot)) {
  156. udfName << "SqlValueInt64";
  157. } else if (NYql::IsDataTypeString(resultSlot)) {
  158. udfName << "SqlValueConvertToUtf8";
  159. } else if (resultSlot == EDataSlot::Bool) {
  160. udfName << "SqlValueBool";
  161. } else {
  162. Y_ENSURE(false, "Invalid return type");
  163. }
  164. auto jsonValue = Pb_.Udf(udfName);
  165. auto parse = Pb_.Udf("Json2.Parse");
  166. auto compilePath = Pb_.Udf("Json2.CompilePath");
  167. auto outType = MakeType(retType);
  168. auto arg1 = MakeArg(arg1Type);
  169. auto arg2 = MakeArg(arg2Type);
  170. auto scalarApply = Pb_.ScalarApply({arg1, arg2}, [&](const auto& args) {
  171. auto json = args[0];
  172. auto processJson = [&](auto unpacked) {
  173. auto input = Pb_.NewOptional( isBinaryJson ? unpacked : Pb_.Apply(parse, { unpacked }));
  174. auto path = Pb_.Apply(compilePath, { args[1] });
  175. auto dictType = Pb_.NewDictType(Pb_.NewDataType(NUdf::EDataSlot::Utf8), Pb_.NewResourceType("JsonNode"), false);
  176. auto resultTuple = Pb_.Apply(jsonValue, { input, path, Pb_.NewDict(dictType, {})});
  177. return Pb_.VisitAll(resultTuple, [&](ui32 index, TRuntimeNode item) {
  178. if (index == 0) {
  179. return Pb_.NewEmptyOptional(outType->GetItemType());
  180. }
  181. Y_ENSURE(index == 1);
  182. return Pb_.Cast(item, outType->GetItemType());
  183. });
  184. };
  185. if (json.GetStaticType()->IsOptional()) {
  186. return Pb_.FlatMap(json, processJson);
  187. } else {
  188. return processJson(json);
  189. }
  190. });
  191. Y_ENSURE(outType->IsSameType(*scalarApply.GetStaticType()));
  192. Items_.emplace_back(scalarApply);
  193. return Items_.size() - 1;
  194. }
  195. TString TKernelRequestBuilder::Serialize() {
  196. const TGuard<TScopedAlloc> allocGuard(Alloc_);
  197. const auto kernelTuple = Items_.empty() ? Pb_.AsScalar(Pb_.NewEmptyTuple()) : Pb_.BlockAsTuple(Items_);
  198. const auto argsTuple = ArgsItems_.empty() ? Pb_.AsScalar(Pb_.NewEmptyTuple()) : Pb_.BlockAsTuple(ArgsItems_);
  199. const auto tuple = Pb_.BlockAsTuple( { argsTuple, kernelTuple });
  200. return SerializeRuntimeNode(tuple, Env_);
  201. }
  202. TRuntimeNode TKernelRequestBuilder::MakeArg(const TTypeAnnotationNode* type) {
  203. auto [it, inserted] = CachedArgs_.emplace(type, TRuntimeNode());
  204. if (!inserted) {
  205. return it->second;
  206. }
  207. auto arg = Pb_.Arg(MakeType(type));
  208. it->second = arg;
  209. ArgsItems_.emplace_back(arg);
  210. return arg;
  211. }
  212. TBlockType* TKernelRequestBuilder::MakeType(const TTypeAnnotationNode* type) {
  213. TStringStream err;
  214. const auto ret = NCommon::BuildType(*type, Pb_, TypesMemoization_, err);
  215. if (!ret) {
  216. ythrow yexception() << err.Str();
  217. }
  218. return AS_TYPE(TBlockType, ret);
  219. }
  220. }