123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161 |
- #include "storage.h"
- #include <typeinfo>
- namespace NBus {
- namespace NPrivate {
- TTimedMessages::TTimedMessages() {
- }
- TTimedMessages::~TTimedMessages() {
- Y_ABORT_UNLESS(Items.empty());
- }
- void TTimedMessages::PushBack(TNonDestroyingAutoPtr<TBusMessage> m) {
- TItem i;
- i.Message.Reset(m.Release());
- Items.push_back(i);
- }
- TNonDestroyingAutoPtr<TBusMessage> TTimedMessages::PopFront() {
- TBusMessage* r = nullptr;
- if (!Items.empty()) {
- r = Items.front()->Message.Release();
- Items.pop_front();
- }
- return r;
- }
- bool TTimedMessages::Empty() const {
- return Items.empty();
- }
- size_t TTimedMessages::Size() const {
- return Items.size();
- }
- void TTimedMessages::Timeout(TInstant before, TMessagesPtrs* r) {
- // shortcut
- if (before == TInstant::Max()) {
- Clear(r);
- return;
- }
- while (!Items.empty()) {
- TItem& i = *Items.front();
- if (TInstant::MilliSeconds(i.Message->GetHeader()->SendTime) > before) {
- break;
- }
- r->push_back(i.Message.Release());
- Items.pop_front();
- }
- }
- void TTimedMessages::Clear(TMessagesPtrs* r) {
- while (!Items.empty()) {
- r->push_back(Items.front()->Message.Release());
- Items.pop_front();
- }
- }
- TSyncAckMessages::TSyncAckMessages() {
- KeyToMessage.set_empty_key(0);
- KeyToMessage.set_deleted_key(1);
- }
- TSyncAckMessages::~TSyncAckMessages() {
- Y_ABORT_UNLESS(KeyToMessage.empty());
- Y_ABORT_UNLESS(TimedItems.empty());
- }
- void TSyncAckMessages::Push(TBusMessagePtrAndHeader& m) {
- // Perform garbage collection if `TimedMessages` contain too many junk data
- if (TimedItems.size() > 1000 && TimedItems.size() > KeyToMessage.size() * 4) {
- Gc();
- }
- TValue value = {m.MessagePtr.Release()};
- std::pair<TKeyToMessage::iterator, bool> p = KeyToMessage.insert(TKeyToMessage::value_type(m.Header.Id, value));
- Y_ABORT_UNLESS(p.second, "non-unique id; %s", value.Message->Describe().data());
- TTimedItem item = {m.Header.Id, m.Header.SendTime};
- TimedItems.push_back(item);
- }
- TBusMessage* TSyncAckMessages::Pop(TBusKey id) {
- TKeyToMessage::iterator it = KeyToMessage.find(id);
- if (it == KeyToMessage.end()) {
- return nullptr;
- }
- TValue v = it->second;
- KeyToMessage.erase(it);
- // `TimedMessages` still contain record about this message
- return v.Message;
- }
- void TSyncAckMessages::Timeout(TInstant before, TMessagesPtrs* r) {
- // shortcut
- if (before == TInstant::Max()) {
- Clear(r);
- return;
- }
- Y_ASSERT(r->empty());
- while (!TimedItems.empty()) {
- TTimedItem i = TimedItems.front();
- if (TInstant::MilliSeconds(i.SendTime) > before) {
- break;
- }
- TKeyToMessage::iterator itMessage = KeyToMessage.find(i.Key);
- if (itMessage != KeyToMessage.end()) {
- r->push_back(itMessage->second.Message);
- KeyToMessage.erase(itMessage);
- }
- TimedItems.pop_front();
- }
- }
- void TSyncAckMessages::Clear(TMessagesPtrs* r) {
- for (TKeyToMessage::const_iterator i = KeyToMessage.begin(); i != KeyToMessage.end(); ++i) {
- r->push_back(i->second.Message);
- }
- KeyToMessage.clear();
- TimedItems.clear();
- }
- void TSyncAckMessages::Gc() {
- TDeque<TTimedItem> tmp;
- for (auto& timedItem : TimedItems) {
- if (KeyToMessage.find(timedItem.Key) == KeyToMessage.end()) {
- continue;
- }
- tmp.push_back(timedItem);
- }
- TimedItems.swap(tmp);
- }
- void TSyncAckMessages::RemoveAll(const TMessagesPtrs& messages) {
- for (auto message : messages) {
- TKeyToMessage::iterator it = KeyToMessage.find(message->GetHeader()->Id);
- Y_ABORT_UNLESS(it != KeyToMessage.end(), "delete non-existent message");
- KeyToMessage.erase(it);
- }
- }
- void TSyncAckMessages::DumpState() {
- Cerr << TimedItems.size() << Endl;
- Cerr << KeyToMessage.size() << Endl;
- }
- }
- }
|