storage.cpp 4.8 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161
  1. #include "storage.h"
  2. #include <typeinfo>
  3. namespace NBus {
  4. namespace NPrivate {
  5. TTimedMessages::TTimedMessages() {
  6. }
  7. TTimedMessages::~TTimedMessages() {
  8. Y_ABORT_UNLESS(Items.empty());
  9. }
  10. void TTimedMessages::PushBack(TNonDestroyingAutoPtr<TBusMessage> m) {
  11. TItem i;
  12. i.Message.Reset(m.Release());
  13. Items.push_back(i);
  14. }
  15. TNonDestroyingAutoPtr<TBusMessage> TTimedMessages::PopFront() {
  16. TBusMessage* r = nullptr;
  17. if (!Items.empty()) {
  18. r = Items.front()->Message.Release();
  19. Items.pop_front();
  20. }
  21. return r;
  22. }
  23. bool TTimedMessages::Empty() const {
  24. return Items.empty();
  25. }
  26. size_t TTimedMessages::Size() const {
  27. return Items.size();
  28. }
  29. void TTimedMessages::Timeout(TInstant before, TMessagesPtrs* r) {
  30. // shortcut
  31. if (before == TInstant::Max()) {
  32. Clear(r);
  33. return;
  34. }
  35. while (!Items.empty()) {
  36. TItem& i = *Items.front();
  37. if (TInstant::MilliSeconds(i.Message->GetHeader()->SendTime) > before) {
  38. break;
  39. }
  40. r->push_back(i.Message.Release());
  41. Items.pop_front();
  42. }
  43. }
  44. void TTimedMessages::Clear(TMessagesPtrs* r) {
  45. while (!Items.empty()) {
  46. r->push_back(Items.front()->Message.Release());
  47. Items.pop_front();
  48. }
  49. }
  50. TSyncAckMessages::TSyncAckMessages() {
  51. KeyToMessage.set_empty_key(0);
  52. KeyToMessage.set_deleted_key(1);
  53. }
  54. TSyncAckMessages::~TSyncAckMessages() {
  55. Y_ABORT_UNLESS(KeyToMessage.empty());
  56. Y_ABORT_UNLESS(TimedItems.empty());
  57. }
  58. void TSyncAckMessages::Push(TBusMessagePtrAndHeader& m) {
  59. // Perform garbage collection if `TimedMessages` contain too many junk data
  60. if (TimedItems.size() > 1000 && TimedItems.size() > KeyToMessage.size() * 4) {
  61. Gc();
  62. }
  63. TValue value = {m.MessagePtr.Release()};
  64. std::pair<TKeyToMessage::iterator, bool> p = KeyToMessage.insert(TKeyToMessage::value_type(m.Header.Id, value));
  65. Y_ABORT_UNLESS(p.second, "non-unique id; %s", value.Message->Describe().data());
  66. TTimedItem item = {m.Header.Id, m.Header.SendTime};
  67. TimedItems.push_back(item);
  68. }
  69. TBusMessage* TSyncAckMessages::Pop(TBusKey id) {
  70. TKeyToMessage::iterator it = KeyToMessage.find(id);
  71. if (it == KeyToMessage.end()) {
  72. return nullptr;
  73. }
  74. TValue v = it->second;
  75. KeyToMessage.erase(it);
  76. // `TimedMessages` still contain record about this message
  77. return v.Message;
  78. }
  79. void TSyncAckMessages::Timeout(TInstant before, TMessagesPtrs* r) {
  80. // shortcut
  81. if (before == TInstant::Max()) {
  82. Clear(r);
  83. return;
  84. }
  85. Y_ASSERT(r->empty());
  86. while (!TimedItems.empty()) {
  87. TTimedItem i = TimedItems.front();
  88. if (TInstant::MilliSeconds(i.SendTime) > before) {
  89. break;
  90. }
  91. TKeyToMessage::iterator itMessage = KeyToMessage.find(i.Key);
  92. if (itMessage != KeyToMessage.end()) {
  93. r->push_back(itMessage->second.Message);
  94. KeyToMessage.erase(itMessage);
  95. }
  96. TimedItems.pop_front();
  97. }
  98. }
  99. void TSyncAckMessages::Clear(TMessagesPtrs* r) {
  100. for (TKeyToMessage::const_iterator i = KeyToMessage.begin(); i != KeyToMessage.end(); ++i) {
  101. r->push_back(i->second.Message);
  102. }
  103. KeyToMessage.clear();
  104. TimedItems.clear();
  105. }
  106. void TSyncAckMessages::Gc() {
  107. TDeque<TTimedItem> tmp;
  108. for (auto& timedItem : TimedItems) {
  109. if (KeyToMessage.find(timedItem.Key) == KeyToMessage.end()) {
  110. continue;
  111. }
  112. tmp.push_back(timedItem);
  113. }
  114. TimedItems.swap(tmp);
  115. }
  116. void TSyncAckMessages::RemoveAll(const TMessagesPtrs& messages) {
  117. for (auto message : messages) {
  118. TKeyToMessage::iterator it = KeyToMessage.find(message->GetHeader()->Id);
  119. Y_ABORT_UNLESS(it != KeyToMessage.end(), "delete non-existent message");
  120. KeyToMessage.erase(it);
  121. }
  122. }
  123. void TSyncAckMessages::DumpState() {
  124. Cerr << TimedItems.size() << Endl;
  125. Cerr << KeyToMessage.size() << Endl;
  126. }
  127. }
  128. }