Browse Source

revert rXXXXXX (see discusstion in pr), will commit again more pci-dss friendly way

kulikov 1 year ago
parent
commit
4a7691c519

+ 10 - 20
library/cpp/http/server/http.cpp

@@ -2,7 +2,6 @@
 #include "http_ex.h"
 
 #include <library/cpp/threading/equeue/equeue.h>
-#include <library/cpp/threading/equeue/fast.h>
 
 #include <util/generic/buffer.h>
 #include <util/generic/intrlist.h>
@@ -406,8 +405,8 @@ public:
         : TImpl(
               parent,
               cb,
-              MakeThreadPool<TSimpleThreadPool>(factory, options, cb, options.RequestsThreadName),
-              MakeThreadPool<TThreadPool>(factory, options, nullptr, options.FailRequestsThreadName),
+              MakeThreadPool<TSimpleThreadPool>(factory, options.UseElasticQueues, cb, options.RequestsThreadName),
+              MakeThreadPool<TThreadPool>(factory, options.UseElasticQueues, nullptr, options.FailRequestsThreadName),
               options) {
     }
 
@@ -457,30 +456,21 @@ public:
 
 private:
     template <class TThreadPool_>
-    static THolder<IThreadPool> MakeThreadPool(ICallBack* callback, const IThreadPool::TParams& params) {
-        if (callback) {
-            return MakeHolder<TThreadPoolBinder<TThreadPool_, THttpServer::ICallBack>>(callback, params);
-        } else {
-            return  MakeHolder<TThreadPool_>(params);
-        }
-    }
-
-    template <class TThreadPool_>
-    static THolder<IThreadPool> MakeThreadPool(IThreadFactory* factory, const TOptions& options, ICallBack* callback = nullptr, const TString& threadName = {}) {
+    static THolder<IThreadPool> MakeThreadPool(IThreadFactory* factory, bool elastic, ICallBack* callback = nullptr, const TString& threadName = {}) {
         if (!factory) {
             factory = SystemThreadFactory();
         }
 
         THolder<IThreadPool> pool;
         const auto params = IThreadPool::TParams().SetFactory(factory).SetThreadName(threadName);
-
-        if (options.UseFastElasticQueues) {
-            pool = MakeThreadPool<TFastElasticQueue>(callback, params);
+        if (callback) {
+            pool = MakeHolder<TThreadPoolBinder<TThreadPool_, THttpServer::ICallBack>>(callback, params);
         } else {
-            pool = MakeThreadPool<TThreadPool_>(callback, params);
-            if (options.UseElasticQueues) {
-                pool = MakeHolder<TElasticQueue>(std::move(pool));
-            }
+            pool = MakeHolder<TThreadPool_>(params);
+        }
+
+        if (elastic) {
+            pool = MakeHolder<TElasticQueue>(std::move(pool));
         }
 
         return pool;

+ 0 - 6
library/cpp/http/server/options.h

@@ -131,11 +131,6 @@ public:
         return *this;
     }
 
-    inline THttpServerOptions& EnableFastElasticQueues(bool enable) noexcept {
-        UseFastElasticQueues = enable;
-
-        return *this;
-    }
     inline THttpServerOptions& SetThreadsName(const TString& listenThreadName, const TString& requestsThreadName, const TString& failRequestsThreadName) noexcept {
         ListenThreadName = listenThreadName;
         RequestsThreadName = requestsThreadName;
@@ -172,7 +167,6 @@ public:
     ui64 MaxInputContentLength = sizeof(size_t) <= 4 ? 2_GB : 64_GB;
     size_t MaxRequestsPerConnection = 0;  // If keep-alive is enabled, request limit before connection is closed
     bool UseElasticQueues = false;
-    bool UseFastElasticQueues = false;
 
     TDuration PollTimeout; // timeout of TSocketPoller::WaitT call
     TDuration ExpirationTimeout; // drop inactive connections after ExpirationTimeout (should be > 0)

+ 0 - 1
library/cpp/threading/CMakeLists.txt

@@ -7,7 +7,6 @@
 
 
 add_subdirectory(atomic)
-add_subdirectory(bounded_queue)
 add_subdirectory(chunk_queue)
 add_subdirectory(equeue)
 add_subdirectory(future)

+ 0 - 14
library/cpp/threading/bounded_queue/CMakeLists.darwin-x86_64.txt

@@ -1,14 +0,0 @@
-
-# This file was generated by the build system used internally in the Yandex monorepo.
-# Only simple modifications are allowed (adding source-files to targets, adding simple properties
-# like target_include_directories). These modifications will be ported to original
-# ya.make files by maintainers. Any complex modifications which can't be ported back to the
-# original buildsystem will not be accepted.
-
-
-
-add_library(cpp-threading-bounded_queue INTERFACE)
-target_link_libraries(cpp-threading-bounded_queue INTERFACE
-  contrib-libs-cxxsupp
-  yutil
-)

+ 0 - 15
library/cpp/threading/bounded_queue/CMakeLists.linux-aarch64.txt

@@ -1,15 +0,0 @@
-
-# This file was generated by the build system used internally in the Yandex monorepo.
-# Only simple modifications are allowed (adding source-files to targets, adding simple properties
-# like target_include_directories). These modifications will be ported to original
-# ya.make files by maintainers. Any complex modifications which can't be ported back to the
-# original buildsystem will not be accepted.
-
-
-
-add_library(cpp-threading-bounded_queue INTERFACE)
-target_link_libraries(cpp-threading-bounded_queue INTERFACE
-  contrib-libs-linux-headers
-  contrib-libs-cxxsupp
-  yutil
-)

+ 0 - 15
library/cpp/threading/bounded_queue/CMakeLists.linux-x86_64.txt

@@ -1,15 +0,0 @@
-
-# This file was generated by the build system used internally in the Yandex monorepo.
-# Only simple modifications are allowed (adding source-files to targets, adding simple properties
-# like target_include_directories). These modifications will be ported to original
-# ya.make files by maintainers. Any complex modifications which can't be ported back to the
-# original buildsystem will not be accepted.
-
-
-
-add_library(cpp-threading-bounded_queue INTERFACE)
-target_link_libraries(cpp-threading-bounded_queue INTERFACE
-  contrib-libs-linux-headers
-  contrib-libs-cxxsupp
-  yutil
-)

+ 0 - 17
library/cpp/threading/bounded_queue/CMakeLists.txt

@@ -1,17 +0,0 @@
-
-# This file was generated by the build system used internally in the Yandex monorepo.
-# Only simple modifications are allowed (adding source-files to targets, adding simple properties
-# like target_include_directories). These modifications will be ported to original
-# ya.make files by maintainers. Any complex modifications which can't be ported back to the
-# original buildsystem will not be accepted.
-
-
-if (CMAKE_SYSTEM_NAME STREQUAL "Linux" AND CMAKE_SYSTEM_PROCESSOR STREQUAL "aarch64" AND NOT HAVE_CUDA)
-  include(CMakeLists.linux-aarch64.txt)
-elseif (CMAKE_SYSTEM_NAME STREQUAL "Darwin" AND CMAKE_SYSTEM_PROCESSOR STREQUAL "x86_64")
-  include(CMakeLists.darwin-x86_64.txt)
-elseif (WIN32 AND CMAKE_SYSTEM_PROCESSOR STREQUAL "AMD64" AND NOT HAVE_CUDA)
-  include(CMakeLists.windows-x86_64.txt)
-elseif (CMAKE_SYSTEM_NAME STREQUAL "Linux" AND CMAKE_SYSTEM_PROCESSOR STREQUAL "x86_64" AND NOT HAVE_CUDA)
-  include(CMakeLists.linux-x86_64.txt)
-endif()

+ 0 - 14
library/cpp/threading/bounded_queue/CMakeLists.windows-x86_64.txt

@@ -1,14 +0,0 @@
-
-# This file was generated by the build system used internally in the Yandex monorepo.
-# Only simple modifications are allowed (adding source-files to targets, adding simple properties
-# like target_include_directories). These modifications will be ported to original
-# ya.make files by maintainers. Any complex modifications which can't be ported back to the
-# original buildsystem will not be accepted.
-
-
-
-add_library(cpp-threading-bounded_queue INTERFACE)
-target_link_libraries(cpp-threading-bounded_queue INTERFACE
-  contrib-libs-cxxsupp
-  yutil
-)

+ 0 - 89
library/cpp/threading/bounded_queue/bounded_queue.h

@@ -1,89 +0,0 @@
-#pragma once
-
-#include <util/generic/yexception.h>
-
-//https://www.1024cores.net/home/lock-free-algorithms/queues/bounded-mpmc-queue
-
-namespace NThreading {
-    template<typename T>
-    class TBoundedQueue {
-    public:
-        explicit TBoundedQueue(size_t size)
-            : Buffer_(new TCell[size])
-            , Mask_(size - 1)
-        {
-            Y_ENSURE(size >= 2 && (size & (size - 1)) == 0);
-
-            for (size_t i = 0; i < size; ++i) {
-                Buffer_[i].Sequence.store(i, std::memory_order_relaxed);
-            }
-        }
-
-        template <typename T_>
-        [[nodiscard]] bool Enqueue(T_&& data) noexcept {
-            TCell* cell;
-            size_t pos = EnqueuePos_.load(std::memory_order_relaxed);
-
-            for (;;) {
-                cell = &Buffer_[pos & Mask_];
-                size_t seq = cell->Sequence.load(std::memory_order_acquire);
-                intptr_t dif = (intptr_t)seq - (intptr_t)pos;
-
-                if (dif == 0) {
-                    if (EnqueuePos_.compare_exchange_weak(pos, pos + 1, std::memory_order_relaxed)) {
-                        break;
-                    }
-                } else if (dif < 0) {
-                    return false;
-                } else {
-                    pos = EnqueuePos_.load(std::memory_order_relaxed);
-                }
-            }
-
-            static_assert(noexcept(cell->Data = std::forward<T_>(data)));
-            cell->Data = std::forward<T_>(data);
-            cell->Sequence.store(pos + 1, std::memory_order_release);
-
-            return true;
-        }
-
-        [[nodiscard]] bool Dequeue(T& data) noexcept {
-            TCell* cell;
-            size_t pos = DequeuePos_.load(std::memory_order_relaxed);
-
-            for (;;) {
-                cell = &Buffer_[pos & Mask_];
-                size_t seq = cell->Sequence.load(std::memory_order_acquire);
-                intptr_t dif = (intptr_t)seq - (intptr_t)(pos + 1);
-
-                if (dif == 0) {
-                    if (DequeuePos_.compare_exchange_weak(pos, pos + 1, std::memory_order_relaxed)) {
-                        break;
-                    }
-                } else if (dif < 0) {
-                    return false;
-                } else {
-                    pos = DequeuePos_.load(std::memory_order_relaxed);
-                }
-            }
-
-            static_assert(noexcept(data = std::move(cell->Data)));
-            data = std::move(cell->Data);
-
-            cell->Sequence.store(pos + Mask_ + 1, std::memory_order_release);
-            return true;
-        }
-    private:
-        struct TCell {
-            std::atomic<size_t> Sequence;
-            T Data;
-        };
-
-        std::unique_ptr<TCell[]> Buffer_;
-        const size_t Mask_;
-
-        alignas(64) std::atomic<size_t> EnqueuePos_ = 0;
-        alignas(64) std::atomic<size_t> DequeuePos_ = 0;
-    };
-}
-

+ 0 - 106
library/cpp/threading/bounded_queue/bounded_queue_ut.cpp

@@ -1,106 +0,0 @@
-#include "bounded_queue.h"
-
-#include <library/cpp/testing/unittest/registar.h>
-#include <util/thread/factory.h>
-
-using namespace NThreading;
-
-Y_UNIT_TEST_SUITE(TBoundedQueueTest) {
-    Y_UNIT_TEST(QueueSize) {
-        const size_t queueSize = 16;
-        TBoundedQueue<size_t> boundedQueue(queueSize);
-
-        for (size_t i = 0; i < queueSize; ++i) {
-            UNIT_ASSERT(boundedQueue.Enqueue(i));
-        }
-        UNIT_ASSERT(!boundedQueue.Enqueue(0));
-        size_t tmp = 0;
-        UNIT_ASSERT(boundedQueue.Dequeue(tmp));
-        UNIT_ASSERT(boundedQueue.Enqueue(0));
-        UNIT_ASSERT(!boundedQueue.Enqueue(0));
-    }
-
-    Y_UNIT_TEST(Move) {
-        const size_t queueSize = 16;
-        TBoundedQueue<TString> boundedQueue(queueSize);
-
-        for (size_t i = 0; i < queueSize; ++i) {
-            TString v = "xxx";
-            UNIT_ASSERT(boundedQueue.Enqueue(std::move(v)));
-            UNIT_ASSERT(v.empty());
-        }
-
-        {
-            TString v = "xxx";
-            UNIT_ASSERT(!boundedQueue.Enqueue(std::move(v)));
-            UNIT_ASSERT(v == "xxx");
-        }
-
-        TString v;
-        UNIT_ASSERT(boundedQueue.Dequeue(v));
-        UNIT_ASSERT(v == "xxx");
-    }
-
-    Y_UNIT_TEST(MPMC) {
-        size_t queueSize = 16;
-        size_t producers = 10;
-        size_t consumers = 10;
-        size_t itemsCount = 10000;
-
-        TVector<THolder<IThreadFactory::IThread>> threads;
-        TBoundedQueue<std::pair<size_t, size_t>> boundedQueue(queueSize);
-
-        std::atomic<size_t> itemCounter = 0;
-        std::atomic<size_t> consumed = 0;
-
-        for (size_t i = 0; i < consumers; ++i) {
-            threads.push_back(SystemThreadFactory()->Run(
-                [&]() {
-                    TVector<size_t> prevItems(producers);
-                    for (;;) {
-                        std::pair<size_t, size_t> item;
-                        while (!boundedQueue.Dequeue(item)) {
-                            ;
-                        }
-
-                        if (item.first >= producers) {
-                            break;
-                        }
-
-                        UNIT_ASSERT(item.second > prevItems[item.first]);
-                        prevItems[item.first] = item.second;
-                        ++consumed;
-                    }
-                })
-            );
-        }
-
-        for (size_t i = 0; i < producers ; ++i) {
-            threads.push_back(SystemThreadFactory()->Run(
-                [&, producerNum = i]() {
-                    for (;;) {
-                        size_t item = ++itemCounter;
-                        if (item > itemsCount) {
-                            break;
-                        }
-
-                        while (!boundedQueue.Enqueue(std::make_pair(producerNum, item))) {
-                            ;
-                        }
-                    }
-
-                    while (!boundedQueue.Enqueue(std::make_pair(producers, size_t(0)))) {
-                        ;
-                    }
-                })
-            );
-        }
-
-
-        for (auto& t : threads) {
-            t->Join();
-        }
-
-        UNIT_ASSERT_VALUES_EQUAL(consumed.load(), itemsCount);
-    }
-}

Some files were not shown because too many files changed in this diff