yql_pure_provider.cpp 11 KB


  1. #include "yql_pure_provider.h"
  2. #include <yql/essentials/core/yql_type_annotation.h>
  3. #include <yql/essentials/core/yql_graph_transformer.h>
  4. #include <yql/essentials/core/yql_expr_type_annotation.h>
  5. #include <yql/essentials/core/peephole_opt/yql_opt_peephole_physical.h>
  6. #include <yql/essentials/utils/log/log.h>
  7. #include <yql/essentials/providers/common/provider/yql_provider_names.h>
  8. #include <yql/essentials/providers/common/provider/yql_data_provider_impl.h>
  9. #include <yql/essentials/providers/common/provider/yql_provider.h>
  10. #include <yql/essentials/providers/common/codec/yql_codec.h>
  11. #include <yql/essentials/providers/common/schema/expr/yql_expr_schema.h>
  12. #include <yql/essentials/providers/common/transform/yql_exec.h>
  13. #include <yql/essentials/providers/common/transform/yql_lazy_init.h>
  14. #include <yql/essentials/providers/common/mkql/yql_provider_mkql.h>
  15. #include <yql/essentials/providers/common/mkql_simple_file/mkql_simple_file.h>
  16. #include <yql/essentials/providers/result/expr_nodes/yql_res_expr_nodes.h>
  17. #include <yql/essentials/minikql/computation/mkql_computation_node.h>
  18. #include <yql/essentials/minikql/mkql_program_builder.h>
  19. #include <yql/essentials/minikql/mkql_node_cast.h>
  20. #include <yql/essentials/minikql/mkql_opt_literal.h>
  21. #include <yql/essentials/minikql/comp_nodes/mkql_factories.h>
  22. #include <yql/essentials/parser/pg_wrapper/interface/comp_factory.h>
  23. #include <yql/essentials/providers/common/comp_nodes/yql_factory.h>
  24. #include <util/stream/length.h>
  25. namespace NYql {
  26. namespace {
  27. using namespace NKikimr;
  28. using namespace NKikimr::NMiniKQL;
  29. class TPureDataSinkExecTransformer : public TExecTransformerBase {
  30. public:
  31. TPureDataSinkExecTransformer(const TPureState::TPtr state)
  32. : State_(state)
  33. {
  34. AddHandler({TStringBuf("Result")}, RequireNone(), Hndl(&TPureDataSinkExecTransformer::HandleRes));
  35. }
  36. void Rewind() override {
  37. TExecTransformerBase::Rewind();
  38. }
  39. TStatusCallbackPair HandleRes(const TExprNode::TPtr& input, TExprContext& ctx) {
  40. YQL_CLOG(DEBUG, ProviderPure) << "Executing " << input->Content() << " (UniqueId=" << input->UniqueId() << ")";
  41. if (TStringBuf("Result") != input->Content()) {
  42. ythrow yexception() << "Don't know how to execute " << input->Content();
  43. }
  44. NNodes::TResOrPullBase resOrPull(input);
  45. IDataProvider::TFillSettings fillSettings = NCommon::GetFillSettings(resOrPull.Ref());
  46. YQL_ENSURE(fillSettings.Format == IDataProvider::EResultFormat::Yson);
  47. auto lambda = resOrPull.Input();
  48. if (!IsPureIsolatedLambda(lambda.Ref())) {
  49. ctx.AddError(TIssue(ctx.GetPosition(lambda.Pos()), TStringBuilder() << "Failed to execute node due to bad graph: " << input->Content()));
  50. return SyncError();
  51. }
  52. const bool isList = lambda.Ref().GetTypeAnn()->GetKind() == ETypeAnnotationKind::List;
  53. auto optimized = lambda.Ptr();
  54. auto source1 = ctx.Builder(lambda.Pos())
  55. .Callable("Take")
  56. .Callable(0, "SourceOf")
  57. .Callable(0, "StreamType")
  58. .Callable(0, "NullType")
  59. .Seal()
  60. .Seal()
  61. .Seal()
  62. .Callable(1, "Uint64")
  63. .Atom(0, "1")
  64. .Seal()
  65. .Seal()
  66. .Build();
  67. optimized = ctx.Builder(lambda.Pos())
  68. .Callable(isList ? "FlatMap" : "Map")
  69. .Add(0, source1)
  70. .Lambda(1)
  71. .Param("x")
  72. .Set(optimized)
  73. .Seal()
  74. .Seal()
  75. .Build();
  76. bool hasNonDeterministicFunctions;
  77. auto status = PeepHoleOptimizeNode(optimized, optimized, ctx, *State_->Types, nullptr, hasNonDeterministicFunctions);
  78. if (status.Level == IGraphTransformer::TStatus::Error) {
  79. return SyncStatus(status);
  80. }
  81. TUserDataTable crutches = State_->Types->UserDataStorageCrutches;
  82. TUserDataTable files;
  83. auto filesRes = NCommon::FreezeUsedFiles(*optimized, files, *State_->Types, ctx, [](const TString&) { return true; }, crutches);
  84. if (filesRes.first.Level != TStatus::Ok) {
  85. return filesRes;
  86. }
  87. TVector<TString> columns(NCommon::GetResOrPullColumnHints(*input));
  88. if (columns.empty()) {
  89. columns = NCommon::GetStructFields(lambda.Ref().GetTypeAnn());
  90. }
  91. TStringStream out;
  92. NYson::TYsonWriter writer(&out, NCommon::GetYsonFormat(fillSettings), ::NYson::EYsonType::Node, false);
  93. writer.OnBeginMap();
  94. if (NCommon::HasResOrPullOption(*input, "type")) {
  95. writer.OnKeyedItem("Type");
  96. NCommon::WriteResOrPullType(writer, lambda.Ref().GetTypeAnn(), TColumnOrder(columns));
  97. }
  98. TScopedAlloc alloc(__LOCATION__, TAlignedPagePoolCounters(), State_->FunctionRegistry->SupportsSizedAllocators());
  99. TTypeEnvironment env(alloc);
  100. TProgramBuilder pgmBuilder(env, *State_->FunctionRegistry);
  101. NCommon::TMkqlCommonCallableCompiler compiler;
  102. NCommon::TMkqlBuildContext mkqlCtx(compiler, pgmBuilder, ctx);
  103. auto root = NCommon::MkqlBuildExpr(*optimized, mkqlCtx);
  104. root = TransformProgram(root, files, env);
  105. TExploringNodeVisitor explorer;
  106. explorer.Walk(root.GetNode(), env);
  107. auto compFactory = GetCompositeWithBuiltinFactory({
  108. GetYqlFactory(),
  109. GetPgFactory()
  110. });
  111. TComputationPatternOpts patternOpts(alloc.Ref(), env, compFactory, State_->FunctionRegistry,
  112. State_->Types->ValidateMode, NUdf::EValidatePolicy::Exception, State_->Types->OptLLVM.GetOrElse(TString()),
  113. EGraphPerProcess::Multi);
  114. auto pattern = MakeComputationPattern(explorer, root, {}, patternOpts);
  115. const TComputationOptsFull computeOpts(nullptr, alloc.Ref(), env,
  116. *State_->Types->RandomProvider, *State_->Types->TimeProvider,
  117. NUdf::EValidatePolicy::Exception, nullptr, nullptr);
  118. auto graph = pattern->Clone(computeOpts);
  119. const TBindTerminator bind(graph->GetTerminator());
  120. graph->Prepare();
  121. auto value = graph->GetValue();
  122. bool truncated = false;
  123. auto type = root.GetStaticType();
  124. TString data;
  125. TStringOutput dataOut(data);
  126. TCountingOutput dataCountingOut(&dataOut);
  127. NYson::TYsonWriter dataWriter(&dataCountingOut, NCommon::GetYsonFormat(fillSettings), ::NYson::EYsonType::Node, false);
  128. YQL_ENSURE(type->IsStream());
  129. auto itemType = AS_TYPE(TStreamType, type)->GetItemType();
  130. if (isList) {
  131. TMaybe<ui64> rowsLimit = fillSettings.RowsLimitPerWrite;
  132. TMaybe<ui64> bytesLimit = fillSettings.AllResultsBytesLimit;
  133. TMaybe<TVector<ui32>> structPositions = NCommon::CreateStructPositions(itemType, &columns);
  134. dataWriter.OnBeginList();
  135. ui64 rows = 0;
  136. for (;;) {
  137. NUdf::TUnboxedValue item;
  138. auto status = value.Fetch(item);
  139. if (status == NUdf::EFetchStatus::Finish) {
  140. break;
  141. }
  142. YQL_ENSURE(status == NUdf::EFetchStatus::Ok);
  143. if ((rowsLimit && rows >= *rowsLimit) || (bytesLimit && dataCountingOut.Counter() >= *bytesLimit)) {
  144. truncated = true;
  145. break;
  146. }
  147. dataWriter.OnListItem();
  148. NCommon::WriteYsonValue(dataWriter, item, itemType, structPositions.Get());
  149. ++rows;
  150. }
  151. dataWriter.OnEndList();
  152. } else {
  153. NUdf::TUnboxedValue item;
  154. YQL_ENSURE(value.Fetch(item) == NUdf::EFetchStatus::Ok);
  155. NCommon::WriteYsonValue(dataWriter, item, itemType, nullptr);
  156. YQL_ENSURE(value.Fetch(item) == NUdf::EFetchStatus::Finish);
  157. }
  158. writer.OnKeyedItem("Data");
  159. writer.OnRaw(fillSettings.Discard ? "#" : data);
  160. if (truncated) {
  161. writer.OnKeyedItem("Truncated");
  162. writer.OnBooleanScalar(true);
  163. }
  164. writer.OnEndMap();
  165. input->SetState(TExprNode::EState::ExecutionComplete);
  166. input->SetResult(ctx.NewAtom(input->Pos(), out.Str()));
  167. return SyncOk();
  168. }
  169. private:
  170. TRuntimeNode TransformProgram(TRuntimeNode root, const TUserDataTable& files, TTypeEnvironment& env) {
  171. TExploringNodeVisitor explorer;
  172. explorer.Walk(root.GetNode(), env);
  173. bool wereChanges = false;
  174. TRuntimeNode program = SinglePassVisitCallables(root, explorer,
  175. TSimpleFileTransformProvider(State_->FunctionRegistry, files), env, true, wereChanges);
  176. program = LiteralPropagationOptimization(program, env, true);
  177. return program;
  178. }
  179. private:
  180. const TPureState::TPtr State_;
  181. };
  182. THolder<TExecTransformerBase> CreatePureDataSourceExecTransformer(const TPureState::TPtr& state) {
  183. return THolder(new TPureDataSinkExecTransformer(state));
  184. }
  185. class TPureProvider : public TDataProviderBase {
  186. public:
  187. TPureProvider(const TPureState::TPtr& state)
  188. : State_(state)
  189. , ExecTransformer_([this]() { return CreatePureDataSourceExecTransformer(State_); })
  190. {}
  191. TStringBuf GetName() const final {
  192. return PureProviderName;
  193. }
  194. IGraphTransformer& GetCallableExecutionTransformer() override {
  195. return *ExecTransformer_;
  196. }
  197. private:
  198. const TPureState::TPtr State_;
  199. TLazyInitHolder<TExecTransformerBase> ExecTransformer_;
  200. };
  201. }
  202. TIntrusivePtr<IDataProvider> CreatePureProvider(const TPureState::TPtr& state) {
  203. return MakeIntrusive<TPureProvider>(state);
  204. }
  205. TDataProviderInitializer GetPureDataProviderInitializer() {
  206. return [] (
  207. const TString& userName,
  208. const TString& sessionId,
  209. const TGatewaysConfig* gatewaysConfig,
  210. const IFunctionRegistry* functionRegistry,
  211. TIntrusivePtr<IRandomProvider> randomProvider,
  212. TIntrusivePtr<TTypeAnnotationContext> typeCtx,
  213. const TOperationProgressWriter& progressWriter,
  214. const TYqlOperationOptions& operationOptions,
  215. THiddenQueryAborter hiddenAborter,
  216. const TQContext& qContext
  217. ) {
  218. Y_UNUSED(userName);
  219. Y_UNUSED(sessionId);
  220. Y_UNUSED(gatewaysConfig);
  221. Y_UNUSED(randomProvider);
  222. Y_UNUSED(typeCtx);
  223. Y_UNUSED(progressWriter);
  224. Y_UNUSED(operationOptions);
  225. Y_UNUSED(hiddenAborter);
  226. Y_UNUSED(qContext);
  227. TDataProviderInfo info;
  228. info.Names.insert(TString{PureProviderName});
  229. auto state = MakeIntrusive<TPureState>();
  230. state->Types = typeCtx.Get();
  231. state->FunctionRegistry = functionRegistry;
  232. info.Source = CreatePureProvider(state);
  233. info.OpenSession = [state](
  234. const TString& sessionId,
  235. const TString& username,
  236. const TOperationProgressWriter& progressWriter,
  237. const TYqlOperationOptions& operationOptions,
  238. TIntrusivePtr<IRandomProvider> randomProvider,
  239. TIntrusivePtr<ITimeProvider> timeProvider) {
  240. Y_UNUSED(sessionId);
  241. Y_UNUSED(username);
  242. Y_UNUSED(progressWriter);
  243. Y_UNUSED(operationOptions);
  244. Y_UNUSED(randomProvider);
  245. Y_UNUSED(timeProvider);
  246. return NThreading::MakeFuture();
  247. };
  248. info.CloseSessionAsync = [](const TString& sessionId) {
  249. Y_UNUSED(sessionId);
  250. return NThreading::MakeFuture();
  251. };
  252. return info;
  253. };
  254. }
  255. }