time_partition.py 1.7 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960
  1. from datetime import datetime
  2. from typing import Optional
  3. from .error import PostgresPartitioningError
  4. from .range_partition import PostgresRangePartition
  5. from .time_partition_size import (
  6. PostgresTimePartitionSize,
  7. PostgresTimePartitionUnit,
  8. )
  9. class PostgresTimePartition(PostgresRangePartition):
  10. """Time-based range table partition.
  11. :see:PostgresTimePartitioningStrategy for more info.
  12. """
  13. _unit_name_format = {
  14. PostgresTimePartitionUnit.YEARS: "%Y",
  15. PostgresTimePartitionUnit.MONTHS: "%Y_%b",
  16. PostgresTimePartitionUnit.WEEKS: "%Y_week_%W",
  17. PostgresTimePartitionUnit.DAYS: "%Y_%b_%d",
  18. }
  19. def __init__(
  20. self,
  21. size: PostgresTimePartitionSize,
  22. start_datetime: datetime,
  23. name_format: Optional[str] = None,
  24. ) -> None:
  25. end_datetime = start_datetime + size.as_delta()
  26. super().__init__(
  27. from_values=start_datetime.strftime("%Y-%m-%d"),
  28. to_values=end_datetime.strftime("%Y-%m-%d"),
  29. )
  30. self.size = size
  31. self.start_datetime = start_datetime
  32. self.end_datetime = end_datetime
  33. self.name_format = name_format
  34. def name(self) -> str:
  35. name_format = self.name_format or self._unit_name_format.get(
  36. self.size.unit
  37. )
  38. if not name_format:
  39. raise PostgresPartitioningError("Unknown size/unit")
  40. return self.start_datetime.strftime(name_format).lower()
  41. def deconstruct(self) -> dict:
  42. return {
  43. **super().deconstruct(),
  44. "size_unit": self.size.unit.value,
  45. "size_value": self.size.value,
  46. }
  47. __all__ = ["PostgresTimePartition"]