1234567891011121314151617181920212223242526272829303132333435363738394041424344454647484950515253545556575859606162636465666768697071727374757677787980 |
- from datetime import datetime, timezone
- from typing import Generator, Optional
- from dateutil.relativedelta import relativedelta
- from .range_strategy import PostgresRangePartitioningStrategy
- from .time_partition import PostgresTimePartition
- from .time_partition_size import PostgresTimePartitionSize
- class PostgresCurrentTimePartitioningStrategy(
- PostgresRangePartitioningStrategy
- ):
- """Implments a time based partitioning strategy where each partition
- contains values for a specific time period.
- All buckets will be equal in size and start at the start of the
- unit. With monthly partitioning, partitions start on the 1st and
- with weekly partitioning, partitions start on monday.
- """
- def __init__(
- self,
- size: PostgresTimePartitionSize,
- count: int,
- max_age: Optional[relativedelta] = None,
- name_format: Optional[str] = None,
- ) -> None:
- """Initializes a new instance of :see:PostgresTimePartitioningStrategy.
- Arguments:
- size:
- The size of each partition.
- count:
- The amount of partitions to create ahead
- from the current date/time.
- max_age:
- Maximum age of a partition. Partitions
- older than this are deleted during
- auto cleanup.
- """
- self.size = size
- self.count = count
- self.max_age = max_age
- self.name_format = name_format
- def to_create(self) -> Generator[PostgresTimePartition, None, None]:
- current_datetime = self.size.start(self.get_start_datetime())
- for _ in range(self.count):
- yield PostgresTimePartition(
- start_datetime=current_datetime,
- size=self.size,
- name_format=self.name_format,
- )
- current_datetime += self.size.as_delta()
- def to_delete(self) -> Generator[PostgresTimePartition, None, None]:
- if not self.max_age:
- return
- current_datetime = self.size.start(
- self.get_start_datetime() - self.max_age
- )
- while True:
- yield PostgresTimePartition(
- start_datetime=current_datetime,
- size=self.size,
- name_format=self.name_format,
- )
- current_datetime -= self.size.as_delta()
- def get_start_datetime(self) -> datetime:
- return datetime.now(timezone.utc)
|