current_time_strategy.py 2.4 KB

1234567891011121314151617181920212223242526272829303132333435363738394041424344454647484950515253545556575859606162636465666768697071727374757677787980
  1. from datetime import datetime, timezone
  2. from typing import Generator, Optional
  3. from dateutil.relativedelta import relativedelta
  4. from .range_strategy import PostgresRangePartitioningStrategy
  5. from .time_partition import PostgresTimePartition
  6. from .time_partition_size import PostgresTimePartitionSize
  7. class PostgresCurrentTimePartitioningStrategy(
  8. PostgresRangePartitioningStrategy
  9. ):
  10. """Implments a time based partitioning strategy where each partition
  11. contains values for a specific time period.
  12. All buckets will be equal in size and start at the start of the
  13. unit. With monthly partitioning, partitions start on the 1st and
  14. with weekly partitioning, partitions start on monday.
  15. """
  16. def __init__(
  17. self,
  18. size: PostgresTimePartitionSize,
  19. count: int,
  20. max_age: Optional[relativedelta] = None,
  21. name_format: Optional[str] = None,
  22. ) -> None:
  23. """Initializes a new instance of :see:PostgresTimePartitioningStrategy.
  24. Arguments:
  25. size:
  26. The size of each partition.
  27. count:
  28. The amount of partitions to create ahead
  29. from the current date/time.
  30. max_age:
  31. Maximum age of a partition. Partitions
  32. older than this are deleted during
  33. auto cleanup.
  34. """
  35. self.size = size
  36. self.count = count
  37. self.max_age = max_age
  38. self.name_format = name_format
  39. def to_create(self) -> Generator[PostgresTimePartition, None, None]:
  40. current_datetime = self.size.start(self.get_start_datetime())
  41. for _ in range(self.count):
  42. yield PostgresTimePartition(
  43. start_datetime=current_datetime,
  44. size=self.size,
  45. name_format=self.name_format,
  46. )
  47. current_datetime += self.size.as_delta()
  48. def to_delete(self) -> Generator[PostgresTimePartition, None, None]:
  49. if not self.max_age:
  50. return
  51. current_datetime = self.size.start(
  52. self.get_start_datetime() - self.max_age
  53. )
  54. while True:
  55. yield PostgresTimePartition(
  56. start_datetime=current_datetime,
  57. size=self.size,
  58. name_format=self.name_format,
  59. )
  60. current_datetime -= self.size.as_delta()
  61. def get_start_datetime(self) -> datetime:
  62. return datetime.now(timezone.utc)