#include "thread.h" #include "record.h" #include #include #include #include #include class TThreadedLogBackend::TImpl { class TRec: public IObjectInQueue, public TAdditionalStorage, public TLogRecord { public: inline TRec(TImpl* parent, const TLogRecord& rec) : TLogRecord(rec.Priority, (const char*)AdditionalData(), rec.Len, rec.MetaFlags) , Parent_(parent) { memcpy(AdditionalData(), rec.Data, rec.Len); } inline ~TRec() override { } private: void Process(void* /*tsr*/) override { THolder This(this); Parent_->Slave_->WriteData(*this); } private: TImpl* Parent_; }; class TReopener: public IObjectInQueue, public TSystemEvent, public TAtomicRefCount { public: inline TReopener(TImpl* parent) : Parent_(parent) { Ref(); } inline ~TReopener() override { } private: void Process(void* /*tsr*/) override { try { Parent_->Slave_->ReopenLog(); } catch (...) { } Signal(); UnRef(); } private: TImpl* Parent_; }; public: inline TImpl(TLogBackend* slave, size_t queuelen, std::function queueOverflowCallback = {}) : Slave_(slave) , QueueOverflowCallback_(std::move(queueOverflowCallback)) { Queue_.Start(1, queuelen); } inline ~TImpl() { Queue_.Stop(); } inline void WriteData(const TLogRecord& rec) { THolder obj(new (rec.Len) TRec(this, rec)); if (Queue_.Add(obj.Get())) { Y_UNUSED(obj.Release()); return; } if (QueueOverflowCallback_) { QueueOverflowCallback_(); } else { ythrow yexception() << "log queue exhausted"; } } // Write an emergency message when the memory allocator is corrupted. // The TThreadedLogBackend object can't be used after this method is called. inline void WriteEmergencyData(const TLogRecord& rec) noexcept { Queue_.Stop(); Slave_->WriteData(rec); } inline void ReopenLog() { TIntrusivePtr reopener(new TReopener(this)); if (!Queue_.Add(reopener.Get())) { reopener->UnRef(); // Ref() was called in constructor ythrow yexception() << "log queue exhausted"; } reopener->Wait(); } inline void ReopenLogNoFlush() { Slave_->ReopenLogNoFlush(); } inline size_t QueueSize() const { return Queue_.Size(); } private: TLogBackend* Slave_; TThreadPool Queue_{"ThreadedLogBack"}; const std::function QueueOverflowCallback_; }; TThreadedLogBackend::TThreadedLogBackend(TLogBackend* slave) : Impl_(new TImpl(slave, 0)) { } TThreadedLogBackend::TThreadedLogBackend(TLogBackend* slave, size_t queuelen, std::function queueOverflowCallback) : Impl_(new TImpl(slave, queuelen, std::move(queueOverflowCallback))) { } TThreadedLogBackend::~TThreadedLogBackend() { } void TThreadedLogBackend::WriteData(const TLogRecord& rec) { Impl_->WriteData(rec); } void TThreadedLogBackend::ReopenLog() { Impl_->ReopenLog(); } void TThreadedLogBackend::ReopenLogNoFlush() { Impl_->ReopenLogNoFlush(); } void TThreadedLogBackend::WriteEmergencyData(const TLogRecord& rec) { Impl_->WriteEmergencyData(rec); } size_t TThreadedLogBackend::QueueSize() const { return Impl_->QueueSize(); } TOwningThreadedLogBackend::TOwningThreadedLogBackend(TLogBackend* slave) : THolder(slave) , TThreadedLogBackend(Get()) { } TOwningThreadedLogBackend::TOwningThreadedLogBackend(TLogBackend* slave, size_t queuelen, std::function queueOverflowCallback) : THolder(slave) , TThreadedLogBackend(Get(), queuelen, std::move(queueOverflowCallback)) { } TOwningThreadedLogBackend::~TOwningThreadedLogBackend() { }