|
- #pragma once
- #include "lfqueue.h"
- #include <library/cpp/coroutine/engine/impl.h>
- #include <library/cpp/coroutine/engine/network.h>
- #include <library/cpp/deprecated/atomic/atomic.h>
- #include <util/system/pipe.h>
- #ifdef _linux_
- #include <sys/eventfd.h>
- #endif
- #if defined(_bionic_) && !defined(EFD_SEMAPHORE)
- #define EFD_SEMAPHORE 1
- #endif
- namespace NNeh {
- #ifdef _linux_
- class TSemaphoreEventFd {
- public:
- inline TSemaphoreEventFd() {
- F_ = eventfd(0, EFD_NONBLOCK | EFD_SEMAPHORE);
- if (F_ < 0) {
- ythrow TFileError() << "failed to create a eventfd";
- }
- }
- inline ~TSemaphoreEventFd() {
- close(F_);
- }
- inline size_t Acquire(TCont* c) {
- ui64 ev;
- return NCoro::ReadI(c, F_, &ev, sizeof ev).Processed();
- }
- inline void Release() {
- const static ui64 ev(1);
- (void)write(F_, &ev, sizeof ev);
- }
- private:
- int F_;
- };
- #endif
- class TSemaphorePipe {
- public:
- inline TSemaphorePipe() {
- TPipeHandle::Pipe(S_[0], S_[1]);
- SetNonBlock(S_[0]);
- SetNonBlock(S_[1]);
- }
- inline size_t Acquire(TCont* c) {
- char ch;
- return NCoro::ReadI(c, S_[0], &ch, 1).Processed();
- }
- inline size_t Acquire(TCont* c, char* buff, size_t buflen) {
- return NCoro::ReadI(c, S_[0], buff, buflen).Processed();
- }
- inline void Release() {
- char ch = 13;
- S_[1].Write(&ch, 1);
- }
- private:
- TPipeHandle S_[2];
- };
- class TPipeQueueBase {
- public:
- inline void Enqueue(void* job) {
- Q_.Enqueue(job);
- S_.Release();
- }
- inline void* Dequeue(TCont* c, char* ch, size_t buflen) {
- void* ret = nullptr;
- while (!Q_.Dequeue(&ret) && S_.Acquire(c, ch, buflen)) {
- }
- return ret;
- }
- inline void* Dequeue() noexcept {
- void* ret = nullptr;
- Q_.Dequeue(&ret);
- return ret;
- }
- private:
- TLockFreeQueue<void*> Q_;
- TSemaphorePipe S_;
- };
- template <class T, size_t buflen = 1>
- class TPipeQueue {
- public:
- template <class TPtr>
- inline void EnqueueSafe(TPtr req) {
- Enqueue(req.Get());
- req.Release();
- }
- inline void Enqueue(T* req) {
- Q_.Enqueue(req);
- }
- template <class TPtr>
- inline void DequeueSafe(TCont* c, TPtr& ret) {
- ret.Reset(Dequeue(c));
- }
- inline T* Dequeue(TCont* c) {
- char ch[buflen];
- return (T*)Q_.Dequeue(c, ch, sizeof(ch));
- }
- protected:
- TPipeQueueBase Q_;
- };
- //optimized for avoiding unnecessary usage semaphore + use eventfd on linux
- template <class T>
- struct TOneConsumerPipeQueue {
- inline TOneConsumerPipeQueue()
- : Signaled_(0)
- , SkipWait_(0)
- {
- }
- inline void Enqueue(T* job) {
- Q_.Enqueue(job);
- AtomicSet(SkipWait_, 1);
- if (AtomicCas(&Signaled_, 1, 0)) {
- S_.Release();
- }
- }
- inline T* Dequeue(TCont* c) {
- T* ret = nullptr;
- while (!Q_.Dequeue(&ret)) {
- AtomicSet(Signaled_, 0);
- if (!AtomicCas(&SkipWait_, 0, 1)) {
- if (!S_.Acquire(c)) {
- break;
- }
- }
- AtomicSet(Signaled_, 1);
- }
- return ret;
- }
- template <class TPtr>
- inline void EnqueueSafe(TPtr req) {
- Enqueue(req.Get());
- Y_UNUSED(req.Release());
- }
- template <class TPtr>
- inline void DequeueSafe(TCont* c, TPtr& ret) {
- ret.Reset(Dequeue(c));
- }
- protected:
- TLockFreeQueue<T*> Q_;
- #ifdef _linux_
- TSemaphoreEventFd S_;
- #else
- TSemaphorePipe S_;
- #endif
- TAtomic Signaled_;
- TAtomic SkipWait_;
- };
- template <class T, size_t buflen = 1>
- struct TAutoPipeQueue: public TPipeQueue<T, buflen> {
- ~TAutoPipeQueue() {
- while (T* t = (T*)TPipeQueue<T, buflen>::Q_.Dequeue()) {
- delete t;
- }
- }
- };
- template <class T>
- struct TAutoOneConsumerPipeQueue: public TOneConsumerPipeQueue<T> {
- ~TAutoOneConsumerPipeQueue() {
- T* ret = nullptr;
- while (TOneConsumerPipeQueue<T>::Q_.Dequeue(&ret)) {
- delete ret;
- }
- }
- };
- }
|