thread.cpp 3.2 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133
  1. #include "thread.h"
  2. #include "magic.h"
  3. #include <util/network/socket.h>
  4. #include <util/thread/factory.h>
  5. #include <util/thread/lfqueue.h>
  6. #include <util/system/event.h>
  7. #include <util/generic/vector.h>
  8. #include <util/generic/singleton.h>
  9. using namespace NDns;
  10. namespace {
  11. class TThreadedResolver: public IThreadFactory::IThreadAble, public TNonCopyable {
  12. struct TResolveRequest {
  13. inline TResolveRequest(const TString& host, ui16 port)
  14. : Host(host)
  15. , Port(port)
  16. {
  17. }
  18. inline TNetworkAddressPtr Wait() {
  19. E.Wait();
  20. if (!Error) {
  21. if (!Result) {
  22. ythrow TNetworkResolutionError(EAI_AGAIN) << TStringBuf(": resolver down");
  23. }
  24. return Result;
  25. }
  26. Error->Raise();
  27. ythrow TNetworkResolutionError(EAI_FAIL) << TStringBuf(": shit happen");
  28. }
  29. inline void Resolve() noexcept {
  30. try {
  31. Result = new TNetworkAddress(Host, Port);
  32. } catch (...) {
  33. Error = SaveError();
  34. }
  35. Wake();
  36. }
  37. inline void Wake() noexcept {
  38. E.Signal();
  39. }
  40. TString Host;
  41. ui16 Port;
  42. TManualEvent E;
  43. TNetworkAddressPtr Result;
  44. IErrorRef Error;
  45. };
  46. public:
  47. inline TThreadedResolver()
  48. : E_(TSystemEvent::rAuto)
  49. {
  50. T_.push_back(SystemThreadFactory()->Run(this));
  51. }
  52. inline ~TThreadedResolver() override {
  53. Schedule(nullptr);
  54. for (size_t i = 0; i < T_.size(); ++i) {
  55. T_[i]->Join();
  56. }
  57. {
  58. TResolveRequest* rr = nullptr;
  59. while (Q_.Dequeue(&rr)) {
  60. if (rr) {
  61. rr->Wake();
  62. }
  63. }
  64. }
  65. }
  66. static inline TThreadedResolver* Instance() {
  67. return Singleton<TThreadedResolver>();
  68. }
  69. inline TNetworkAddressPtr Resolve(const TString& host, ui16 port) {
  70. TResolveRequest rr(host, port);
  71. Schedule(&rr);
  72. return rr.Wait();
  73. }
  74. private:
  75. inline void Schedule(TResolveRequest* rr) {
  76. Q_.Enqueue(rr);
  77. E_.Signal();
  78. }
  79. void DoExecute() override {
  80. while (true) {
  81. TResolveRequest* rr = nullptr;
  82. while (!Q_.Dequeue(&rr)) {
  83. E_.Wait();
  84. }
  85. if (rr) {
  86. rr->Resolve();
  87. } else {
  88. break;
  89. }
  90. }
  91. Schedule(nullptr);
  92. }
  93. private:
  94. TLockFreeQueue<TResolveRequest*> Q_;
  95. TSystemEvent E_;
  96. typedef TAutoPtr<IThreadFactory::IThread> IThreadRef;
  97. TVector<IThreadRef> T_;
  98. };
  99. }
  100. namespace NDns {
  101. TNetworkAddressPtr ThreadedResolve(const TString& host, ui16 port) {
  102. return TThreadedResolver::Instance()->Resolve(host, port);
  103. }
  104. }