time_partition_size.py 2.9 KB

1234567891011121314151617181920212223242526272829303132333435363738394041424344454647484950515253545556575859606162636465666768697071727374757677787980818283848586878889909192939495
  1. import enum
  2. from datetime import date, datetime
  3. from typing import Optional, Union
  4. from dateutil.relativedelta import relativedelta
  5. from .error import PostgresPartitioningError
  6. class PostgresTimePartitionUnit(enum.Enum):
  7. YEARS = "years"
  8. MONTHS = "months"
  9. WEEKS = "weeks"
  10. DAYS = "days"
  11. class PostgresTimePartitionSize:
  12. """Size of a time-based range partition table."""
  13. unit: PostgresTimePartitionUnit
  14. value: int
  15. def __init__(
  16. self,
  17. years: Optional[int] = None,
  18. months: Optional[int] = None,
  19. weeks: Optional[int] = None,
  20. days: Optional[int] = None,
  21. ) -> None:
  22. sizes = [years, months, weeks, days]
  23. if not any(sizes):
  24. raise PostgresPartitioningError("Partition cannot be 0 in size.")
  25. if len([size for size in sizes if size and size > 0]) > 1:
  26. raise PostgresPartitioningError(
  27. "Partition can only have on size unit."
  28. )
  29. if years:
  30. self.unit = PostgresTimePartitionUnit.YEARS
  31. self.value = years
  32. elif months:
  33. self.unit = PostgresTimePartitionUnit.MONTHS
  34. self.value = months
  35. elif weeks:
  36. self.unit = PostgresTimePartitionUnit.WEEKS
  37. self.value = weeks
  38. elif days:
  39. self.unit = PostgresTimePartitionUnit.DAYS
  40. self.value = days
  41. else:
  42. raise PostgresPartitioningError(
  43. "Unsupported time partitioning unit"
  44. )
  45. def as_delta(self) -> relativedelta:
  46. if self.unit == PostgresTimePartitionUnit.YEARS:
  47. return relativedelta(years=self.value)
  48. if self.unit == PostgresTimePartitionUnit.MONTHS:
  49. return relativedelta(months=self.value)
  50. if self.unit == PostgresTimePartitionUnit.WEEKS:
  51. return relativedelta(weeks=self.value)
  52. if self.unit == PostgresTimePartitionUnit.DAYS:
  53. return relativedelta(days=self.value)
  54. raise PostgresPartitioningError(
  55. "Unsupported time partitioning unit: %s" % self.unit
  56. )
  57. def start(self, dt: datetime) -> datetime:
  58. if self.unit == PostgresTimePartitionUnit.YEARS:
  59. return self._ensure_datetime(dt.replace(month=1, day=1))
  60. if self.unit == PostgresTimePartitionUnit.MONTHS:
  61. return self._ensure_datetime(dt.replace(day=1))
  62. if self.unit == PostgresTimePartitionUnit.WEEKS:
  63. return self._ensure_datetime(dt - relativedelta(days=dt.weekday()))
  64. return self._ensure_datetime(dt)
  65. @staticmethod
  66. def _ensure_datetime(dt: Union[date, datetime]) -> datetime:
  67. return datetime(year=dt.year, month=dt.month, day=dt.day)
  68. def __repr__(self) -> str:
  69. return "PostgresTimePartitionSize<%s, %s>" % (self.unit, self.value)
  70. __all__ = ["PostgresTimePartitionUnit", "PostgresTimePartitionSize"]