|
@@ -16,8 +16,8 @@ void TBaseChangeSender::RegisterSender(THashMap<ui64, TSender>& senders, ui64 pa
|
|
|
|
|
|
for (const auto& [order, broadcast] : Broadcasting) {
|
|
|
if (AddBroadcastPartition(order, partitionId)) {
|
|
|
- // re-schedule record to send it in the correct order
|
|
|
- PendingSent.emplace(order, broadcast.Record);
|
|
|
+ // re-enqueue record to send it in the correct order
|
|
|
+ Enqueued.insert(broadcast.Record);
|
|
|
}
|
|
|
}
|
|
|
}
|
|
@@ -122,6 +122,11 @@ void TBaseChangeSender::ProcessRecords(TVector<TChangeRecord>&& records) {
|
|
|
MemUsage += record.GetBody().size();
|
|
|
}
|
|
|
|
|
|
+ if (record.IsBroadcast()) {
|
|
|
+ // assume that broadcast records are too small to affect memory consumption
|
|
|
+ MemUsage -= record.GetBody().size();
|
|
|
+ }
|
|
|
+
|
|
|
PendingSent.emplace(record.GetOrder(), std::move(record));
|
|
|
PendingBody.erase(it);
|
|
|
}
|
|
@@ -291,7 +296,7 @@ TBaseChangeSender::TBroadcast& TBaseChangeSender::EnsureBroadcast(const TChangeR
|
|
|
}
|
|
|
|
|
|
auto res = Broadcasting.emplace(record.GetOrder(), TBroadcast{
|
|
|
- .Record = record,
|
|
|
+ .Record = {record.GetOrder(), record.GetBody().size()},
|
|
|
.Partitions = partitionIds,
|
|
|
.PendingPartitions = partitionIds,
|
|
|
});
|
|
@@ -345,9 +350,7 @@ bool TBaseChangeSender::MaybeCompleteBroadcast(ui64 order) {
|
|
|
return false;
|
|
|
}
|
|
|
|
|
|
- MemUsage -= broadcast.Record.GetBody().size();
|
|
|
Broadcasting.erase(it);
|
|
|
-
|
|
|
return true;
|
|
|
}
|
|
|
|