update-migration 5.4 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164
  1. #!/usr/bin/env python
  2. import dataclasses
  3. import importlib
  4. import os
  5. import pathlib
  6. import re
  7. import click
  8. from django.apps import apps
  9. from sentry.runner import configure
  10. configure()
  11. @dataclasses.dataclass
  12. class MigrationMeta:
  13. number: int
  14. name: str
  15. path: str
  16. @property
  17. def full_name(self) -> str:
  18. return f"{self.number:04}_{self.name}"
  19. @classmethod
  20. def from_filename(cls, filepath: str) -> "MigrationMeta":
  21. filename = os.path.basename(filepath)
  22. name, _ = filename.split(".", 2)
  23. number, name = name.split("_", 1)
  24. number = int(number)
  25. return cls(number=number, name=name, path=filepath)
  26. def load_module(filepath: str):
  27. module_name = os.path.basename(filepath)
  28. spec = importlib.util.spec_from_loader(
  29. module_name, importlib.machinery.SourceFileLoader(module_name, filepath)
  30. )
  31. module = importlib.util.module_from_spec(spec)
  32. spec.loader.exec_module(module)
  33. return module
  34. def find_migration(migrations_path: str, migration: str) -> MigrationMeta:
  35. matches = list(pathlib.Path(migrations_path).glob(f"*{migration}*"))
  36. if len(matches) > 1:
  37. click.echo(f"Found multiple migrations matching {migration}")
  38. for match in matches:
  39. click.echo(f"- {match}")
  40. click.echo("Try again with a more specific pattern")
  41. raise click.Abort()
  42. if len(matches) == 0:
  43. click.echo(f"Could not find migration matching {migration}")
  44. raise click.Abort()
  45. return MigrationMeta.from_filename(str(matches[0].resolve()))
  46. def find_highest_migration(migrations_path: str, renamed_migration: MigrationMeta) -> MigrationMeta:
  47. highest = 0
  48. found = None
  49. matches = list(pathlib.Path(migrations_path).glob("[0-9]*"))
  50. for match in matches:
  51. meta = MigrationMeta.from_filename(str(match.resolve()))
  52. if meta.number > highest and meta.full_name != renamed_migration.full_name:
  53. highest = meta.number
  54. found = meta
  55. if not found:
  56. click.echo("Could not find the head migration")
  57. raise click.Abort()
  58. click.echo(f"> Current head migration is {found.full_name}")
  59. return found
  60. @click.command()
  61. @click.argument("migration", required=True)
  62. @click.argument("app_label", default="sentry")
  63. def main(migration: str, app_label: str):
  64. """
  65. Update a migration to the top of the migration history.
  66. migration - The name or number of the migration.
  67. app_label - The name of the django app the migration is in. Defaults to sentry
  68. Will do the following:
  69. - Rename the migration file.
  70. - Update `dependencies` in the migration to point at the highest migration.
  71. - Update the name of the related tests/migration if it exists.
  72. - Update migrations_lockfile.txt.
  73. """
  74. app_path = apps.get_app_config(app_label).path
  75. migrations_path = os.path.join(app_path, "migrations")
  76. migration_meta = find_migration(migrations_path, migration)
  77. current_head = find_highest_migration(migrations_path, migration_meta)
  78. # Create an instance of the migration so that we can read instance properties.
  79. module = load_module(migration_meta.path)
  80. migration_instance = module.Migration("", app_label)
  81. dependencies = migration_instance.dependencies
  82. new_name = migration_meta.full_name.replace(
  83. str(migration_meta.number), str(current_head.number + 1)
  84. )
  85. click.echo(f"> Updating migration {migration_meta.full_name} to {new_name}")
  86. with open(migration_meta.path) as f:
  87. contents = f.read()
  88. for dep in dependencies:
  89. if dep[0] == app_label:
  90. contents = contents.replace(dep[1], current_head.full_name)
  91. with open(migration_meta.path, "w") as f:
  92. f.write(contents)
  93. # Rename the file.
  94. os.rename(migration_meta.path, os.path.join(migrations_path, f"{new_name}.py"))
  95. click.echo("> Migration file rename complete")
  96. # Update lockfile
  97. lockfile = os.path.join(app_path, "..", "..", "migrations_lockfile.txt")
  98. with open(lockfile) as f:
  99. contents = f.read()
  100. contents = contents.replace(current_head.full_name, new_name)
  101. with open(lockfile, "w") as f:
  102. f.write(contents)
  103. click.echo("> Updated migrations_lockfile.txt")
  104. # Rename test if it exists.
  105. test_file_prefix = f"*{migration_meta.number}*"
  106. migration_test_path = os.path.join(app_path, "..", "..", "tests", "sentry", "migrations")
  107. matches = list(pathlib.Path(migration_test_path).glob(test_file_prefix))
  108. if len(matches) == 1:
  109. click.echo("> Updating migration test file to & from attributes")
  110. test_path = str(matches[0].resolve())
  111. with open(test_path) as f:
  112. contents = f.read()
  113. contents = re.sub(
  114. r"(migrate_from\s+=\s+\")([^\"]+)(\")",
  115. lambda matches: f"{matches.group(1)}{current_head.full_name}{matches.group(3)}",
  116. contents,
  117. )
  118. contents = re.sub(
  119. r"(migrate_to\s+=\s+\")([^\"]+)(\")",
  120. lambda matches: f"{matches.group(1)}{new_name}{matches.group(3)}",
  121. contents,
  122. )
  123. with open(test_path, "w") as f:
  124. f.write(contents)
  125. click.echo("> Renaming test file")
  126. os.rename(
  127. str(matches[0].resolve()), os.path.join(migration_test_path, f"test_{new_name}.py")
  128. )
  129. else:
  130. click.echo("> Could not find a migration test file")
  131. click.echo("All done!")
  132. if __name__ == "__main__":
  133. main()