watermark_tracker.cpp 947 B

123456789101112131415161718192021222324252627282930313233343536373839
  1. #include "watermark_tracker.h"
  2. #include<util/system/yassert.h>
  3. namespace NKikimr {
  4. namespace NMiniKQL {
  5. TWatermarkTracker::TWatermarkTracker(
  6. ui64 delay,
  7. ui64 granularity)
  8. : Delay(delay)
  9. , Granularity(granularity)
  10. {
  11. Y_ABORT_UNLESS(granularity > 0);
  12. }
  13. std::optional<ui64> TWatermarkTracker::HandleNextEventTime(ui64 ts) {
  14. if (Y_UNLIKELY(ts >= NextEventWithWatermark)) {
  15. NextEventWithWatermark = CalcNextEventWithWatermark(ts);
  16. return CalcLastWatermark();
  17. }
  18. return std::nullopt;
  19. }
  20. ui64 TWatermarkTracker::CalcNextEventWithWatermark(ui64 ts) {
  21. return ts + Granularity - (ts - Delay) % Granularity;
  22. }
  23. std::optional<ui64> TWatermarkTracker::CalcLastWatermark() {
  24. if (Y_UNLIKELY(Delay + Granularity > NextEventWithWatermark)) {
  25. // Protect from negative values
  26. return std::nullopt;
  27. }
  28. return NextEventWithWatermark - Delay - Granularity;
  29. }
  30. } // NMiniKQL
  31. } // NKikimr