Browse Source

Disable LLVM for block-only stages

aneporada 1 year ago
parent
commit
786a2afbef

+ 12 - 0
ydb/library/yql/core/yql_expr_type_annotation.cpp

@@ -3032,6 +3032,18 @@ bool IsWideBlockType(const TTypeAnnotationNode& type) {
     return blockLenType->Cast<TDataExprType>()->GetSlot() == EDataSlot::Uint64;
 }
 
+bool IsWideSequenceBlockType(const TTypeAnnotationNode& type) {
+    const TTypeAnnotationNode* itemType = nullptr;
+    if (type.GetKind() == ETypeAnnotationKind::Stream) {
+        itemType = type.Cast<TStreamExprType>()->GetItemType();
+    } else if (type.GetKind() == ETypeAnnotationKind::Flow) {
+        itemType = type.Cast<TFlowExprType>()->GetItemType();
+    } else {
+        return false;
+    }
+    return IsWideBlockType(*itemType);
+}
+
 bool IsSupportedAsBlockType(TPositionHandle pos, const TTypeAnnotationNode& type, TExprContext& ctx, TTypeAnnotationContext& types) {
     if (!types.ArrowResolver) {
         return false;

+ 1 - 0
ydb/library/yql/core/yql_expr_type_annotation.h

@@ -125,6 +125,7 @@ bool EnsureWideFlowType(TPositionHandle position, const TTypeAnnotationNode& typ
 bool EnsureWideStreamType(const TExprNode& node, TExprContext& ctx);
 bool EnsureWideStreamType(TPositionHandle position, const TTypeAnnotationNode& type, TExprContext& ctx);
 bool IsWideBlockType(const TTypeAnnotationNode& type);
+bool IsWideSequenceBlockType(const TTypeAnnotationNode& type);
 bool IsSupportedAsBlockType(TPositionHandle pos, const TTypeAnnotationNode& type, TExprContext& ctx, TTypeAnnotationContext& types);
 bool EnsureSupportedAsBlockType(TPositionHandle pos, const TTypeAnnotationNode& type, TExprContext& ctx, TTypeAnnotationContext& types);
 bool EnsureWideBlockType(TPositionHandle position, const TTypeAnnotationNode& type, TTypeAnnotationNode::TListType& blockItemTypes, TExprContext& ctx, bool allowScalar = true);

+ 18 - 10
ydb/library/yql/dq/runtime/dq_tasks_runner.cpp

@@ -280,11 +280,12 @@ public:
         return TaskId;
     }
 
-    bool UseSeparatePatternAlloc() const {
-        return Context.PatternCache && (Settings.OptLLVM == "OFF" || Settings.UseCacheForLLVM);
+    bool UseSeparatePatternAlloc(const TDqTaskSettings& taskSettings) const {
+        return Context.PatternCache &&
+            (Settings.OptLLVM == "OFF" || taskSettings.IsLLVMDisabled() || Settings.UseCacheForLLVM);
     }
 
-    TComputationPatternOpts CreatePatternOpts(TScopedAlloc& alloc, TTypeEnvironment& typeEnv) {
+    TComputationPatternOpts CreatePatternOpts(const TDqTaskSettings& task, TScopedAlloc& alloc, TTypeEnvironment& typeEnv) {
         auto validatePolicy = Settings.TerminateOnError ? NUdf::EValidatePolicy::Fail : NUdf::EValidatePolicy::Exception;
 
         auto taskRunnerFactory = [this](TCallable& callable, const TComputationNodeFactoryContext& ctx) -> IComputationNode* {
@@ -301,8 +302,14 @@ public:
         if (Y_UNLIKELY(CollectFull() && !AllocatedHolder->ProgramParsed.StatsRegistry)) {
             AllocatedHolder->ProgramParsed.StatsRegistry = NMiniKQL::CreateDefaultStatsRegistry();
         }
+
+        TString optLLVM = Settings.OptLLVM;
+        if (task.IsLLVMDisabled()) {
+            optLLVM = "OFF";
+        }
+
         TComputationPatternOpts opts(alloc.Ref(), typeEnv, taskRunnerFactory,
-            Context.FuncRegistry, NUdf::EValidateMode::None, validatePolicy, Settings.OptLLVM, EGraphPerProcess::Multi,
+            Context.FuncRegistry, NUdf::EValidateMode::None, validatePolicy, optLLVM, EGraphPerProcess::Multi,
             AllocatedHolder->ProgramParsed.StatsRegistry.Get());
 
         if (!SecureParamsProvider) {
@@ -315,9 +322,10 @@ public:
 
     std::shared_ptr<TPatternCacheEntry> CreateComputationPattern(const TDqTaskSettings& task, const TString& rawProgram, bool forCache, bool& canBeCached) {
         canBeCached = true;
-        auto entry = TComputationPatternLRUCache::CreateCacheEntry(UseSeparatePatternAlloc());
-        auto& patternAlloc = UseSeparatePatternAlloc() ? entry->Alloc : Alloc();
-        auto& patternEnv = UseSeparatePatternAlloc() ? entry->Env : TypeEnv();
+        const bool useSeparatePattern = UseSeparatePatternAlloc(task);
+        auto entry = TComputationPatternLRUCache::CreateCacheEntry(useSeparatePattern);
+        auto& patternAlloc = useSeparatePattern ? entry->Alloc : Alloc();
+        auto& patternEnv = useSeparatePattern ? entry->Env : TypeEnv();
         patternAlloc.Ref().UseRefLocking = forCache;
 
         {
@@ -411,7 +419,7 @@ public:
         LOG(TStringBuilder() << "task: " << TaskId << ", program size: " << programSize
             << ", llvm: `" << Settings.OptLLVM << "`.");
 
-        auto opts = CreatePatternOpts(patternAlloc, patternEnv);
+        auto opts = CreatePatternOpts(task, patternAlloc, patternEnv);
         opts.SetPatternEnv(entry);
 
         {
@@ -431,7 +439,7 @@ public:
 
         std::shared_ptr<TPatternCacheEntry> entry;
         bool canBeCached;
-        if (UseSeparatePatternAlloc() && Context.PatternCache) {
+        if (UseSeparatePatternAlloc(task) && Context.PatternCache) {
             auto& cache = Context.PatternCache;
             auto ticket = cache->FindOrSubscribe(program.GetRaw());
             if (!ticket.HasFuture()) {
@@ -454,7 +462,7 @@ public:
         AllocatedHolder->ProgramParsed.PatternCacheEntry = entry;
 
         // clone pattern using TDqTaskRunner's alloc
-        auto opts = CreatePatternOpts(Alloc(), TypeEnv());
+        auto opts = CreatePatternOpts(task, Alloc(), TypeEnv());
 
         AllocatedHolder->ProgramParsed.CompGraph = AllocatedHolder->ProgramParsed.GetPattern()->Clone(
             opts.ToComputationOptions(*Context.RandomProvider, *Context.TimeProvider, &TypeEnv()));

+ 4 - 0
ydb/library/yql/dq/runtime/dq_tasks_runner.h

@@ -312,6 +312,10 @@ public:
         return Task_->HasUseLlvm();
     }
 
+    bool IsLLVMDisabled() const {
+        return HasUseLlvm() && !GetUseLlvm();
+    }
+
     const TVector<google::protobuf::Message*>& GetSourceSettings() const {
         return SourceSettings;
     }

+ 12 - 0
ydb/library/yql/dq/type_ann/CMakeLists.darwin-x86_64.txt

@@ -6,6 +6,12 @@
 # original buildsystem will not be accepted.
 
 
+get_built_tool_path(
+  TOOL_enum_parser_bin
+  TOOL_enum_parser_dependency
+  tools/enum_parser/enum_parser
+  enum_parser
+)
 
 add_library(yql-dq-type_ann)
 target_compile_options(yql-dq-type_ann PRIVATE
@@ -20,7 +26,13 @@ target_link_libraries(yql-dq-type_ann PUBLIC
   yql-dq-expr_nodes
   yql-dq-proto
   providers-common-provider
+  tools-enum_parser-enum_serialization_runtime
 )
 target_sources(yql-dq-type_ann PRIVATE
   ${CMAKE_SOURCE_DIR}/ydb/library/yql/dq/type_ann/dq_type_ann.cpp
 )
+generate_enum_serilization(yql-dq-type_ann
+  ${CMAKE_SOURCE_DIR}/ydb/library/yql/dq/type_ann/dq_type_ann.h
+  INCLUDE_HEADERS
+  ydb/library/yql/dq/type_ann/dq_type_ann.h
+)

+ 12 - 0
ydb/library/yql/dq/type_ann/CMakeLists.linux-aarch64.txt

@@ -6,6 +6,12 @@
 # original buildsystem will not be accepted.
 
 
+get_built_tool_path(
+  TOOL_enum_parser_bin
+  TOOL_enum_parser_dependency
+  tools/enum_parser/enum_parser
+  enum_parser
+)
 
 add_library(yql-dq-type_ann)
 target_compile_options(yql-dq-type_ann PRIVATE
@@ -21,7 +27,13 @@ target_link_libraries(yql-dq-type_ann PUBLIC
   yql-dq-expr_nodes
   yql-dq-proto
   providers-common-provider
+  tools-enum_parser-enum_serialization_runtime
 )
 target_sources(yql-dq-type_ann PRIVATE
   ${CMAKE_SOURCE_DIR}/ydb/library/yql/dq/type_ann/dq_type_ann.cpp
 )
+generate_enum_serilization(yql-dq-type_ann
+  ${CMAKE_SOURCE_DIR}/ydb/library/yql/dq/type_ann/dq_type_ann.h
+  INCLUDE_HEADERS
+  ydb/library/yql/dq/type_ann/dq_type_ann.h
+)

+ 12 - 0
ydb/library/yql/dq/type_ann/CMakeLists.linux-x86_64.txt

@@ -6,6 +6,12 @@
 # original buildsystem will not be accepted.
 
 
+get_built_tool_path(
+  TOOL_enum_parser_bin
+  TOOL_enum_parser_dependency
+  tools/enum_parser/enum_parser
+  enum_parser
+)
 
 add_library(yql-dq-type_ann)
 target_compile_options(yql-dq-type_ann PRIVATE
@@ -21,7 +27,13 @@ target_link_libraries(yql-dq-type_ann PUBLIC
   yql-dq-expr_nodes
   yql-dq-proto
   providers-common-provider
+  tools-enum_parser-enum_serialization_runtime
 )
 target_sources(yql-dq-type_ann PRIVATE
   ${CMAKE_SOURCE_DIR}/ydb/library/yql/dq/type_ann/dq_type_ann.cpp
 )
+generate_enum_serilization(yql-dq-type_ann
+  ${CMAKE_SOURCE_DIR}/ydb/library/yql/dq/type_ann/dq_type_ann.h
+  INCLUDE_HEADERS
+  ydb/library/yql/dq/type_ann/dq_type_ann.h
+)

+ 12 - 0
ydb/library/yql/dq/type_ann/CMakeLists.windows-x86_64.txt

@@ -6,6 +6,12 @@
 # original buildsystem will not be accepted.
 
 
+get_built_tool_path(
+  TOOL_enum_parser_bin
+  TOOL_enum_parser_dependency
+  tools/enum_parser/enum_parser
+  enum_parser
+)
 
 add_library(yql-dq-type_ann)
 target_compile_options(yql-dq-type_ann PRIVATE
@@ -20,7 +26,13 @@ target_link_libraries(yql-dq-type_ann PUBLIC
   yql-dq-expr_nodes
   yql-dq-proto
   providers-common-provider
+  tools-enum_parser-enum_serialization_runtime
 )
 target_sources(yql-dq-type_ann PRIVATE
   ${CMAKE_SOURCE_DIR}/ydb/library/yql/dq/type_ann/dq_type_ann.cpp
 )
+generate_enum_serilization(yql-dq-type_ann
+  ${CMAKE_SOURCE_DIR}/ydb/library/yql/dq/type_ann/dq_type_ann.h
+  INCLUDE_HEADERS
+  ydb/library/yql/dq/type_ann/dq_type_ann.h
+)

+ 14 - 1
ydb/library/yql/dq/type_ann/dq_type_ann.cpp

@@ -1119,6 +1119,8 @@ TDqStageSettings TDqStageSettings::Parse(const TDqStageBase& node) {
         } else if (name == WideChannelsSettingName) {
             settings.WideChannels = true;
             settings.OutputNarrowType = tuple.Value().Ref().GetTypeAnn()->Cast<TTypeExprType>()->GetType()->Cast<TStructExprType>();
+        } else if (name == BlockStatusSettingName) {
+            settings.BlockStatus = FromString<EBlockStatus>(tuple.Value().Cast<TCoAtom>().Value());
         }
     }
 
@@ -1141,7 +1143,7 @@ bool TDqStageSettings::Validate(const TExprNode& stage, TExprContext& ctx) {
         }
 
         TStringBuf name = setting->Head().Content();
-        if (name == IdSettingName || name == LogicalIdSettingName) {
+        if (name == IdSettingName || name == LogicalIdSettingName || name == BlockStatusSettingName) {
             if (setting->ChildrenSize() != 2) {
                 ctx.AddError(TIssue(ctx.GetPosition(setting->Pos()), TStringBuilder() << "Setting " << name << " should contain single value"));
                 return false;
@@ -1155,6 +1157,10 @@ bool TDqStageSettings::Validate(const TExprNode& stage, TExprContext& ctx) {
                 ctx.AddError(TIssue(ctx.GetPosition(setting->Pos()), TStringBuilder() << "Setting " << name << " should contain ui64 value, but got: " << value->Content()));
                 return false;
             }
+            if (name == BlockStatusSettingName && !TryFromString<EBlockStatus>(value->Content())) {
+                ctx.AddError(TIssue(ctx.GetPosition(setting->Pos()), TStringBuilder() << "Unsupported " << name << " value: " << value->Content()));
+                return false;
+            }
         } else if (name == WideChannelsSettingName) {
             if (setting->ChildrenSize() != 2) {
                 ctx.AddError(TIssue(ctx.GetPosition(setting->Pos()), TStringBuilder() << "Setting " << name << " should contain single value"));
@@ -1229,6 +1235,13 @@ NNodes::TCoNameValueTupleList TDqStageSettings::BuildNode(TExprContext& ctx, TPo
             .Done());
     }
 
+    if (BlockStatus.Defined()) {
+        settings.push_back(Build<TCoNameValueTuple>(ctx, pos)
+            .Name().Build(BlockStatusSettingName)
+            .Value<TCoAtom>().Build(ToString(*BlockStatus))
+            .Done());
+    }
+
     return Build<TCoNameValueTupleList>(ctx, pos)
         .Add(settings)
         .Done();

+ 10 - 0
ydb/library/yql/dq/type_ann/dq_type_ann.h

@@ -38,6 +38,7 @@ struct TDqStageSettings {
     static constexpr TStringBuf IdSettingName = "_id";
     static constexpr TStringBuf SinglePartitionSettingName = "_single_partition";
     static constexpr TStringBuf WideChannelsSettingName = "_wide_channels";
+    static constexpr TStringBuf BlockStatusSettingName = "_block_status";
 
     ui64 LogicalId = 0;
     TString Id;
@@ -46,8 +47,17 @@ struct TDqStageSettings {
     bool WideChannels = false;
     const TStructExprType* OutputNarrowType = nullptr;
 
+    enum class EBlockStatus {
+        None,
+        Partial,
+        Full,
+    };
+
+    TMaybe<EBlockStatus> BlockStatus;
+
     TDqStageSettings& SetSinglePartition(bool value = true) { SinglePartition = value; return *this; }
     TDqStageSettings& SetWideChannels(const TStructExprType& narrowType) { WideChannels = true; OutputNarrowType = &narrowType; return *this; }
+    TDqStageSettings& SetBlockStatus(EBlockStatus status) { BlockStatus = status; return *this; }
 
     static TDqStageSettings New(const NNodes::TDqStageBase& node);
     static TDqStageSettings New();

Some files were not shown because too many files changed in this diff