thread.cpp 4.1 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165
  1. #include "thread.h"
  2. #include "record.h"
  3. #include <util/thread/pool.h>
  4. #include <util/system/event.h>
  5. #include <util/memory/addstorage.h>
  6. #include <util/generic/ptr.h>
  7. #include <util/generic/yexception.h>
  8. class TThreadedLogBackend::TImpl {
  9. class TRec: public IObjectInQueue, public TAdditionalStorage<TRec>, public TLogRecord {
  10. public:
  11. inline TRec(TImpl* parent, const TLogRecord& rec)
  12. : TLogRecord(rec.Priority, (const char*)AdditionalData(), rec.Len)
  13. , Parent_(parent)
  14. {
  15. memcpy(AdditionalData(), rec.Data, rec.Len);
  16. }
  17. inline ~TRec() override {
  18. }
  19. private:
  20. void Process(void* /*tsr*/) override {
  21. THolder<TRec> This(this);
  22. Parent_->Slave_->WriteData(*this);
  23. }
  24. private:
  25. TImpl* Parent_;
  26. };
  27. class TReopener: public IObjectInQueue, public TSystemEvent, public TAtomicRefCount<TReopener> {
  28. public:
  29. inline TReopener(TImpl* parent)
  30. : Parent_(parent)
  31. {
  32. Ref();
  33. }
  34. inline ~TReopener() override {
  35. }
  36. private:
  37. void Process(void* /*tsr*/) override {
  38. try {
  39. Parent_->Slave_->ReopenLog();
  40. } catch (...) {
  41. }
  42. Signal();
  43. UnRef();
  44. }
  45. private:
  46. TImpl* Parent_;
  47. };
  48. public:
  49. inline TImpl(TLogBackend* slave, size_t queuelen, std::function<void()> queueOverflowCallback = {})
  50. : Slave_(slave)
  51. , QueueOverflowCallback_(std::move(queueOverflowCallback))
  52. {
  53. Queue_.Start(1, queuelen);
  54. }
  55. inline ~TImpl() {
  56. Queue_.Stop();
  57. }
  58. inline void WriteData(const TLogRecord& rec) {
  59. THolder<TRec> obj(new (rec.Len) TRec(this, rec));
  60. if (Queue_.Add(obj.Get())) {
  61. Y_UNUSED(obj.Release());
  62. return;
  63. }
  64. if (QueueOverflowCallback_) {
  65. QueueOverflowCallback_();
  66. } else {
  67. ythrow yexception() << "log queue exhausted";
  68. }
  69. }
  70. // Write an emergency message when the memory allocator is corrupted.
  71. // The TThreadedLogBackend object can't be used after this method is called.
  72. inline void WriteEmergencyData(const TLogRecord& rec) noexcept {
  73. Queue_.Stop();
  74. Slave_->WriteData(rec);
  75. }
  76. inline void ReopenLog() {
  77. TIntrusivePtr<TReopener> reopener(new TReopener(this));
  78. if (!Queue_.Add(reopener.Get())) {
  79. reopener->UnRef(); // Ref() was called in constructor
  80. ythrow yexception() << "log queue exhausted";
  81. }
  82. reopener->Wait();
  83. }
  84. inline void ReopenLogNoFlush() {
  85. Slave_->ReopenLogNoFlush();
  86. }
  87. inline size_t QueueSize() const {
  88. return Queue_.Size();
  89. }
  90. private:
  91. TLogBackend* Slave_;
  92. TThreadPool Queue_{"ThreadedLogBack"};
  93. const std::function<void()> QueueOverflowCallback_;
  94. };
  95. TThreadedLogBackend::TThreadedLogBackend(TLogBackend* slave)
  96. : Impl_(new TImpl(slave, 0))
  97. {
  98. }
  99. TThreadedLogBackend::TThreadedLogBackend(TLogBackend* slave, size_t queuelen, std::function<void()> queueOverflowCallback)
  100. : Impl_(new TImpl(slave, queuelen, std::move(queueOverflowCallback)))
  101. {
  102. }
  103. TThreadedLogBackend::~TThreadedLogBackend() {
  104. }
  105. void TThreadedLogBackend::WriteData(const TLogRecord& rec) {
  106. Impl_->WriteData(rec);
  107. }
  108. void TThreadedLogBackend::ReopenLog() {
  109. Impl_->ReopenLog();
  110. }
  111. void TThreadedLogBackend::ReopenLogNoFlush() {
  112. Impl_->ReopenLogNoFlush();
  113. }
  114. void TThreadedLogBackend::WriteEmergencyData(const TLogRecord& rec) {
  115. Impl_->WriteEmergencyData(rec);
  116. }
  117. size_t TThreadedLogBackend::QueueSize() const {
  118. return Impl_->QueueSize();
  119. }
  120. TOwningThreadedLogBackend::TOwningThreadedLogBackend(TLogBackend* slave)
  121. : THolder<TLogBackend>(slave)
  122. , TThreadedLogBackend(Get())
  123. {
  124. }
  125. TOwningThreadedLogBackend::TOwningThreadedLogBackend(TLogBackend* slave, size_t queuelen, std::function<void()> queueOverflowCallback)
  126. : THolder<TLogBackend>(slave)
  127. , TThreadedLogBackend(Get(), queuelen, std::move(queueOverflowCallback))
  128. {
  129. }
  130. TOwningThreadedLogBackend::~TOwningThreadedLogBackend() {
  131. }