conn_cache.h 4.3 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149
  1. #pragma once
  2. #include <string.h>
  3. #include <util/generic/ptr.h>
  4. #include <util/generic/singleton.h>
  5. #include <util/generic/string.h>
  6. #include <library/cpp/deprecated/atomic/atomic.h>
  7. #include <util/thread/lfqueue.h>
  8. #include "http_common.h"
  9. namespace NNeh {
  10. namespace NHttp2 {
  11. // TConn must be refcounted and contain methods:
  12. // void SetCached(bool) noexcept
  13. // bool IsValid() const noexcept
  14. // void Close() noexcept
  15. template <class TConn>
  16. class TConnCache {
  17. struct TCounter : TAtomicCounter {
  18. inline void IncCount(const TConn* const&) {
  19. Inc();
  20. }
  21. inline void DecCount(const TConn* const&) {
  22. Dec();
  23. }
  24. };
  25. public:
  26. typedef TIntrusivePtr<TConn> TConnRef;
  27. class TConnList: public TLockFreeQueue<TConn*, TCounter> {
  28. public:
  29. ~TConnList() {
  30. Clear();
  31. }
  32. inline void Clear() {
  33. TConn* conn;
  34. while (this->Dequeue(&conn)) {
  35. conn->Close();
  36. conn->UnRef();
  37. }
  38. }
  39. inline size_t Size() {
  40. return this->GetCounter().Val();
  41. }
  42. };
  43. inline void Put(TConnRef& conn, size_t addrId) {
  44. conn->SetCached(true);
  45. ConnList(addrId).Enqueue(conn.Get());
  46. conn->Ref();
  47. Y_UNUSED(conn.Release());
  48. CachedConn_.Inc();
  49. }
  50. bool Get(TConnRef& conn, size_t addrId) {
  51. TConnList& connList = ConnList(addrId);
  52. TConn* connTmp;
  53. while (connList.Dequeue(&connTmp)) {
  54. connTmp->SetCached(false);
  55. CachedConn_.Dec();
  56. if (connTmp->IsValid()) {
  57. TConnRef(connTmp).Swap(conn);
  58. connTmp->DecRef();
  59. return true;
  60. } else {
  61. connTmp->UnRef();
  62. }
  63. }
  64. return false;
  65. }
  66. inline size_t Size() const noexcept {
  67. return CachedConn_.Val();
  68. }
  69. inline size_t Validate(size_t addrId) {
  70. TConnList& cl = Lst_.Get(addrId);
  71. return Validate(cl);
  72. }
  73. //close/remove part of the connections from cache
  74. size_t Purge(size_t addrId, size_t frac256) {
  75. TConnList& cl = Lst_.Get(addrId);
  76. size_t qsize = cl.Size();
  77. if (!qsize) {
  78. return 0;
  79. }
  80. size_t purgeCounter = ((qsize * frac256) >> 8);
  81. if (!purgeCounter && qsize >= 2) {
  82. purgeCounter = 1;
  83. }
  84. size_t pc = 0;
  85. {
  86. TConn* conn;
  87. while (purgeCounter-- && cl.Dequeue(&conn)) {
  88. conn->SetCached(false);
  89. if (conn->IsValid()) {
  90. conn->Close();
  91. }
  92. CachedConn_.Dec();
  93. conn->UnRef();
  94. ++pc;
  95. }
  96. }
  97. pc += Validate(cl);
  98. return pc;
  99. }
  100. private:
  101. inline TConnList& ConnList(size_t addrId) {
  102. return Lst_.Get(addrId);
  103. }
  104. inline size_t Validate(TConnList& cl) {
  105. size_t pc = 0;
  106. size_t nc = cl.Size();
  107. TConn* conn;
  108. while (nc-- && cl.Dequeue(&conn)) {
  109. if (conn->IsValid()) {
  110. cl.Enqueue(conn);
  111. } else {
  112. ++pc;
  113. conn->SetCached(false);
  114. CachedConn_.Dec();
  115. conn->UnRef();
  116. }
  117. }
  118. return pc;
  119. }
  120. NNeh::NHttp::TLockFreeSequence<TConnList> Lst_;
  121. TAtomicCounter CachedConn_;
  122. };
  123. }
  124. }