Browse Source

BlockExpandChunked: add forgotten file

aneporada 2 years ago
parent
commit
99c0ebbafd

+ 1 - 1
library/cpp/neh/multiclient.cpp

@@ -280,7 +280,7 @@ namespace {
         void ScheduleRequest(TIntrusivePtr<TRequestSupervisor>& rs, const THandleRef& h, const TInstant& deadline) {
             TJobPtr j(new TNewRequest(rs));
             JQ_.Enqueue(j);
-            if (!h->Signalled) {
+            if (!h->Signalled()) {
                 if (deadline.GetValue() < GetNearDeadline_()) {
                     Signal();
                 }

+ 23 - 18
library/cpp/neh/wfmo.h

@@ -28,37 +28,42 @@ namespace NNeh {
 
     class TWaitQueue {
     public:
-        struct TWaitHandle {
-            inline TWaitHandle() noexcept
-                : Signalled(false)
-                , Parent(nullptr)
-            {
-            }
-
-            inline void Signal() noexcept {
+        class TWaitHandle {
+        public:
+            void Signal() noexcept {
                 TGuard<TSpinLock> lock(M_);
 
-                Signalled = true;
+                Signalled_ = true;
 
-                if (Parent) {
-                    Parent->Notify(this);
+                if (Parent_) {
+                    Parent_->Notify(this);
                 }
             }
 
-            inline void Register(TWaitQueue* parent) noexcept {
+            void Register(TWaitQueue* parent) noexcept {
                 TGuard<TSpinLock> lock(M_);
 
-                Parent = parent;
+                Parent_ = parent;
 
-                if (Signalled) {
-                    if (Parent) {
-                        Parent->Notify(this);
+                if (Signalled_) {
+                    if (Parent_) {
+                        Parent_->Notify(this);
                     }
                 }
             }
 
-            NAtomic::TBool Signalled;
-            TWaitQueue* Parent;
+            bool Signalled() const {
+                return Signalled_;
+            }
+
+            void ResetState() {
+                TGuard<TSpinLock> lock(M_);
+
+                Signalled_ = false;
+            }
+        private:
+            NAtomic::TBool Signalled_ = false;
+            TWaitQueue* Parent_ = nullptr;
             TSpinLock M_;
         };
 

+ 5 - 0
ydb/library/yql/providers/common/mkql/yql_provider_mkql.cpp

@@ -2481,6 +2481,11 @@ TMkqlCommonCallableCompiler::TShared::TShared() {
         return ctx.ProgramBuilder.BlockCompress(flow, index);
     });
 
+    AddCallable("BlockExpandChunked", [](const TExprNode& node, TMkqlBuildContext& ctx) {
+        const auto flow = MkqlBuildExpr(node.Head(), ctx);
+        return ctx.ProgramBuilder.BlockExpandChunked(flow);
+    });
+
     AddCallable("PgArray", [](const TExprNode& node, TMkqlBuildContext& ctx) {
         std::vector<TRuntimeNode> args;
         args.reserve(node.ChildrenSize());