123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165 |
- #include "thread.h"
- #include "record.h"
- #include <util/thread/pool.h>
- #include <util/system/event.h>
- #include <util/memory/addstorage.h>
- #include <util/generic/ptr.h>
- #include <util/generic/yexception.h>
- class TThreadedLogBackend::TImpl {
- class TRec: public IObjectInQueue, public TAdditionalStorage<TRec>, public TLogRecord {
- public:
- inline TRec(TImpl* parent, const TLogRecord& rec)
- : TLogRecord(rec.Priority, (const char*)AdditionalData(), rec.Len)
- , Parent_(parent)
- {
- memcpy(AdditionalData(), rec.Data, rec.Len);
- }
- inline ~TRec() override {
- }
- private:
- void Process(void* /*tsr*/) override {
- THolder<TRec> This(this);
- Parent_->Slave_->WriteData(*this);
- }
- private:
- TImpl* Parent_;
- };
- class TReopener: public IObjectInQueue, public TSystemEvent, public TAtomicRefCount<TReopener> {
- public:
- inline TReopener(TImpl* parent)
- : Parent_(parent)
- {
- Ref();
- }
- inline ~TReopener() override {
- }
- private:
- void Process(void* /*tsr*/) override {
- try {
- Parent_->Slave_->ReopenLog();
- } catch (...) {
- }
- Signal();
- UnRef();
- }
- private:
- TImpl* Parent_;
- };
- public:
- inline TImpl(TLogBackend* slave, size_t queuelen, std::function<void()> queueOverflowCallback = {})
- : Slave_(slave)
- , QueueOverflowCallback_(std::move(queueOverflowCallback))
- {
- Queue_.Start(1, queuelen);
- }
- inline ~TImpl() {
- Queue_.Stop();
- }
- inline void WriteData(const TLogRecord& rec) {
- THolder<TRec> obj(new (rec.Len) TRec(this, rec));
- if (Queue_.Add(obj.Get())) {
- Y_UNUSED(obj.Release());
- return;
- }
- if (QueueOverflowCallback_) {
- QueueOverflowCallback_();
- } else {
- ythrow yexception() << "log queue exhausted";
- }
- }
- // Write an emergency message when the memory allocator is corrupted.
- // The TThreadedLogBackend object can't be used after this method is called.
- inline void WriteEmergencyData(const TLogRecord& rec) noexcept {
- Queue_.Stop();
- Slave_->WriteData(rec);
- }
- inline void ReopenLog() {
- TIntrusivePtr<TReopener> reopener(new TReopener(this));
- if (!Queue_.Add(reopener.Get())) {
- reopener->UnRef(); // Ref() was called in constructor
- ythrow yexception() << "log queue exhausted";
- }
- reopener->Wait();
- }
- inline void ReopenLogNoFlush() {
- Slave_->ReopenLogNoFlush();
- }
- inline size_t QueueSize() const {
- return Queue_.Size();
- }
- private:
- TLogBackend* Slave_;
- TThreadPool Queue_{"ThreadedLogBack"};
- const std::function<void()> QueueOverflowCallback_;
- };
- TThreadedLogBackend::TThreadedLogBackend(TLogBackend* slave)
- : Impl_(new TImpl(slave, 0))
- {
- }
- TThreadedLogBackend::TThreadedLogBackend(TLogBackend* slave, size_t queuelen, std::function<void()> queueOverflowCallback)
- : Impl_(new TImpl(slave, queuelen, std::move(queueOverflowCallback)))
- {
- }
- TThreadedLogBackend::~TThreadedLogBackend() {
- }
- void TThreadedLogBackend::WriteData(const TLogRecord& rec) {
- Impl_->WriteData(rec);
- }
- void TThreadedLogBackend::ReopenLog() {
- Impl_->ReopenLog();
- }
- void TThreadedLogBackend::ReopenLogNoFlush() {
- Impl_->ReopenLogNoFlush();
- }
- void TThreadedLogBackend::WriteEmergencyData(const TLogRecord& rec) {
- Impl_->WriteEmergencyData(rec);
- }
- size_t TThreadedLogBackend::QueueSize() const {
- return Impl_->QueueSize();
- }
- TOwningThreadedLogBackend::TOwningThreadedLogBackend(TLogBackend* slave)
- : THolder<TLogBackend>(slave)
- , TThreadedLogBackend(Get())
- {
- }
- TOwningThreadedLogBackend::TOwningThreadedLogBackend(TLogBackend* slave, size_t queuelen, std::function<void()> queueOverflowCallback)
- : THolder<TLogBackend>(slave)
- , TThreadedLogBackend(Get(), queuelen, std::move(queueOverflowCallback))
- {
- }
- TOwningThreadedLogBackend::~TOwningThreadedLogBackend() {
- }
|