update-migration 5.5 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167
  1. #!/usr/bin/env python
  2. import dataclasses
  3. import importlib
  4. import os
  5. import pathlib
  6. import re
  7. import click
  8. from django.apps import apps
  9. from django.conf import settings
  10. from sentry.runner import configure
  11. configure()
  12. @dataclasses.dataclass
  13. class MigrationMeta:
  14. number: int
  15. name: str
  16. path: str
  17. @property
  18. def full_name(self) -> str:
  19. return f"{self.number:04}_{self.name}"
  20. @classmethod
  21. def from_filename(cls, filepath: str) -> "MigrationMeta":
  22. filename = os.path.basename(filepath)
  23. name, _ = filename.split(".", 2)
  24. number, name = name.split("_", 1)
  25. number = int(number)
  26. return cls(number=number, name=name, path=filepath)
  27. def load_module(filepath: str):
  28. module_name = os.path.basename(filepath)
  29. spec = importlib.util.spec_from_loader(
  30. module_name, importlib.machinery.SourceFileLoader(module_name, filepath)
  31. )
  32. module = importlib.util.module_from_spec(spec)
  33. spec.loader.exec_module(module)
  34. return module
  35. def find_migration(migrations_path: str, migration: str) -> MigrationMeta:
  36. matches = list(pathlib.Path(migrations_path).glob(f"*{migration}*"))
  37. if len(matches) > 1:
  38. click.echo(f"Found multiple migrations matching {migration}")
  39. for match in matches:
  40. click.echo(f"- {match}")
  41. click.echo("Try again with a more specific pattern")
  42. raise click.Abort()
  43. if len(matches) == 0:
  44. click.echo(f"Could not find migration matching {migration}")
  45. raise click.Abort()
  46. return MigrationMeta.from_filename(str(matches[0].resolve()))
  47. def find_highest_migration(migrations_path: str, renamed_migration: MigrationMeta) -> MigrationMeta:
  48. highest = 0
  49. found = None
  50. matches = list(pathlib.Path(migrations_path).glob("[0-9]*"))
  51. for match in matches:
  52. meta = MigrationMeta.from_filename(str(match.resolve()))
  53. if meta.number > highest and meta.full_name != renamed_migration.full_name:
  54. highest = meta.number
  55. found = meta
  56. if not found:
  57. click.echo("Could not find the head migration")
  58. raise click.Abort()
  59. click.echo(f"> Current head migration is {found.full_name}")
  60. return found
  61. @click.command()
  62. @click.argument("migration", required=True)
  63. @click.argument("app_label", default="sentry")
  64. def main(migration: str, app_label: str):
  65. """
  66. Update a migration to the top of the migration history.
  67. migration - The name or number of the migration.
  68. app_label - The name of the django app the migration is in. Defaults to sentry
  69. Will do the following:
  70. - Rename the migration file.
  71. - Update `dependencies` in the migration to point at the highest migration.
  72. - Update the name of the related tests/migration if it exists.
  73. - Update migrations_lockfile.txt.
  74. """
  75. app_path = apps.get_app_config(app_label).path
  76. migrations_path = os.path.join(app_path, "migrations")
  77. migration_meta = find_migration(migrations_path, migration)
  78. current_head = find_highest_migration(migrations_path, migration_meta)
  79. # Create an instance of the migration so that we can read instance properties.
  80. module = load_module(migration_meta.path)
  81. migration_instance = module.Migration("", app_label)
  82. dependencies = migration_instance.dependencies
  83. new_name = migration_meta.full_name.replace(
  84. str(migration_meta.number), str(current_head.number + 1)
  85. )
  86. click.echo(f"> Updating migration {migration_meta.full_name} to {new_name}")
  87. with open(migration_meta.path) as f:
  88. contents = f.read()
  89. for dep in dependencies:
  90. if dep[0] == app_label:
  91. contents = contents.replace(dep[1], current_head.full_name)
  92. with open(migration_meta.path, "w") as f:
  93. f.write(contents)
  94. # Rename the file.
  95. os.rename(migration_meta.path, os.path.join(migrations_path, f"{new_name}.py"))
  96. click.echo("> Migration file rename complete")
  97. # Update lockfile
  98. lockfile = os.path.join(settings.MIGRATIONS_LOCKFILE_PATH, "migrations_lockfile.txt")
  99. with open(lockfile) as f:
  100. contents = f.read()
  101. contents = contents.replace(current_head.full_name, new_name)
  102. with open(lockfile, "w") as f:
  103. f.write(contents)
  104. click.echo("> Updated migrations_lockfile.txt")
  105. # Rename test if it exists.
  106. test_file_prefix = f"*{migration_meta.number}*"
  107. migration_test_path = os.path.join(
  108. settings.PROJECT_ROOT, os.path.pardir, os.path.pardir, "tests", "sentry", "migrations"
  109. )
  110. matches = list(pathlib.Path(migration_test_path).glob(test_file_prefix))
  111. if len(matches) == 1:
  112. click.echo("> Updating migration test file to & from attributes")
  113. test_path = str(matches[0].resolve())
  114. with open(test_path) as f:
  115. contents = f.read()
  116. contents = re.sub(
  117. r"(migrate_from\s+=\s+\")([^\"]+)(\")",
  118. lambda matches: f"{matches.group(1)}{current_head.full_name}{matches.group(3)}",
  119. contents,
  120. )
  121. contents = re.sub(
  122. r"(migrate_to\s+=\s+\")([^\"]+)(\")",
  123. lambda matches: f"{matches.group(1)}{new_name}{matches.group(3)}",
  124. contents,
  125. )
  126. with open(test_path, "w") as f:
  127. f.write(contents)
  128. click.echo("> Renaming test file")
  129. os.rename(
  130. str(matches[0].resolve()), os.path.join(migration_test_path, f"test_{new_name}.py")
  131. )
  132. else:
  133. click.echo("> Could not find a migration test file")
  134. click.echo("All done!")
  135. if __name__ == "__main__":
  136. main()