manager.py 4.5 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139
  1. from typing import List, Optional, Tuple, Type
  2. from django.db import connections
  3. from psqlextra.models import PostgresPartitionedModel
  4. from .config import PostgresPartitioningConfig
  5. from .constants import AUTO_PARTITIONED_COMMENT
  6. from .error import PostgresPartitioningError
  7. from .partition import PostgresPartition
  8. from .plan import PostgresModelPartitioningPlan, PostgresPartitioningPlan
  9. PartitionList = List[Tuple[PostgresPartitionedModel, List[PostgresPartition]]]
  10. class PostgresPartitioningManager:
  11. """Helps managing partitions by automatically creating new partitions and
  12. deleting old ones according to the configuration."""
  13. def __init__(self, configs: List[PostgresPartitioningConfig]) -> None:
  14. self.configs = configs
  15. self._validate_configs(self.configs)
  16. def plan(
  17. self,
  18. skip_create: bool = False,
  19. skip_delete: bool = False,
  20. using: Optional[str] = None,
  21. ) -> PostgresPartitioningPlan:
  22. """Plans which partitions should be deleted/created.
  23. Arguments:
  24. skip_create:
  25. If set to True, no partitions will be marked
  26. for creation, regardless of the configuration.
  27. skip_delete:
  28. If set to True, no partitions will be marked
  29. for deletion, regardless of the configuration.
  30. using:
  31. Optional name of the database connection to use.
  32. Returns:
  33. A plan describing what partitions would be created
  34. and deleted if the plan is applied.
  35. """
  36. model_plans = []
  37. for config in self.configs:
  38. model_plan = self._plan_for_config(
  39. config,
  40. skip_create=skip_create,
  41. skip_delete=skip_delete,
  42. using=using,
  43. )
  44. if not model_plan:
  45. continue
  46. model_plans.append(model_plan)
  47. return PostgresPartitioningPlan(model_plans)
  48. def find_config_for_model(
  49. self, model: PostgresPartitionedModel
  50. ) -> Optional[PostgresPartitioningConfig]:
  51. """Finds the partitioning config for the specified model."""
  52. return next(
  53. (config for config in self.configs if config.model == model), None
  54. )
  55. def _plan_for_config(
  56. self,
  57. config: PostgresPartitioningConfig,
  58. skip_create: bool = False,
  59. skip_delete: bool = False,
  60. using: Optional[str] = None,
  61. ) -> Optional[PostgresModelPartitioningPlan]:
  62. """Creates a partitioning plan for one partitioning config."""
  63. connection = connections[using or "default"]
  64. table = self._get_partitioned_table(connection, config.model)
  65. model_plan = PostgresModelPartitioningPlan(config)
  66. if not skip_create:
  67. for partition in config.strategy.to_create():
  68. if table.partition_by_name(name=partition.name()):
  69. continue
  70. model_plan.creations.append(partition)
  71. if not skip_delete:
  72. for partition in config.strategy.to_delete():
  73. introspected_partition = table.partition_by_name(
  74. name=partition.name()
  75. )
  76. if not introspected_partition:
  77. break
  78. if introspected_partition.comment != AUTO_PARTITIONED_COMMENT:
  79. continue
  80. model_plan.deletions.append(partition)
  81. if len(model_plan.creations) == 0 and len(model_plan.deletions) == 0:
  82. return None
  83. return model_plan
  84. @staticmethod
  85. def _get_partitioned_table(
  86. connection, model: Type[PostgresPartitionedModel]
  87. ):
  88. with connection.cursor() as cursor:
  89. table = connection.introspection.get_partitioned_table(
  90. cursor, model._meta.db_table
  91. )
  92. if not table:
  93. raise PostgresPartitioningError(
  94. f"Model {model.__name__}, with table "
  95. f"{model._meta.db_table} does not exists in the "
  96. "database. Did you run `python manage.py migrate`?"
  97. )
  98. return table
  99. @staticmethod
  100. def _validate_configs(configs: List[PostgresPartitioningConfig]):
  101. """Ensures there is only one config per model."""
  102. models = set([config.model.__name__ for config in configs])
  103. if len(models) != len(configs):
  104. raise PostgresPartitioningError(
  105. "Only one partitioning config per model is allowed"
  106. )