buffer_data.h 1.8 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172
  1. #include "ydb/core/scheme/scheme_tablecell.h"
  2. #include "ydb/core/tx/datashard/upload_stats.h"
  3. #include "ydb/core/tx/tx_proxy/upload_rows.h"
  4. namespace NKikimr::NDataShard {
  5. class TBufferData: public IStatHolder, public TNonCopyable {
  6. public:
  7. TBufferData()
  8. : Rows{std::make_shared<NTxProxy::TUploadRows>()} {
  9. }
  10. ui64 GetRows() const override final {
  11. return Rows->size();
  12. }
  13. auto GetRowsData() const {
  14. return Rows;
  15. }
  16. ui64 GetBytes() const override final {
  17. return ByteSize;
  18. }
  19. void FlushTo(TBufferData& other) {
  20. Y_ABORT_UNLESS(this != &other);
  21. Y_ABORT_UNLESS(other.IsEmpty());
  22. other.Rows.swap(Rows);
  23. other.ByteSize = std::exchange(ByteSize, 0);
  24. other.LastKey = std::exchange(LastKey, {});
  25. }
  26. void Clear() {
  27. Rows->clear();
  28. ByteSize = 0;
  29. LastKey = {};
  30. }
  31. void AddRow(TSerializedCellVec&& key, TSerializedCellVec&& targetPk, TString&& targetValue) {
  32. Rows->emplace_back(std::move(targetPk), std::move(targetValue));
  33. ByteSize += Rows->back().first.GetBuffer().size() + Rows->back().second.size();
  34. LastKey = std::move(key);
  35. }
  36. bool IsEmpty() const {
  37. return Rows->empty();
  38. }
  39. size_t Size() const {
  40. return Rows->size();
  41. }
  42. bool IsReachLimits(const TUploadLimits& Limits) {
  43. // TODO(mbkkt) why [0..BatchRowsLimit) but [0..BatchBytesLimit]
  44. return Rows->size() >= Limits.BatchRowsLimit || ByteSize > Limits.BatchBytesLimit;
  45. }
  46. auto&& ExtractLastKey() {
  47. return std::move(LastKey);
  48. }
  49. const auto& GetLastKey() const {
  50. return LastKey;
  51. }
  52. private:
  53. std::shared_ptr<NTxProxy::TUploadRows> Rows;
  54. ui64 ByteSize = 0;
  55. TSerializedCellVec LastKey;
  56. };
  57. }