123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164 |
- #!/usr/bin/env python
- import dataclasses
- import importlib
- import os
- import pathlib
- import re
- import click
- from django.apps import apps
- from sentry.runner import configure
- configure()
- @dataclasses.dataclass
- class MigrationMeta:
- number: int
- name: str
- path: str
- @property
- def full_name(self) -> str:
- return f"{self.number:04}_{self.name}"
- @classmethod
- def from_filename(cls, filepath: str) -> "MigrationMeta":
- filename = os.path.basename(filepath)
- name, _ = filename.split(".", 2)
- number, name = name.split("_", 1)
- number = int(number)
- return cls(number=number, name=name, path=filepath)
- def load_module(filepath: str):
- module_name = os.path.basename(filepath)
- spec = importlib.util.spec_from_loader(
- module_name, importlib.machinery.SourceFileLoader(module_name, filepath)
- )
- module = importlib.util.module_from_spec(spec)
- spec.loader.exec_module(module)
- return module
- def find_migration(migrations_path: str, migration: str) -> MigrationMeta:
- matches = list(pathlib.Path(migrations_path).glob(f"*{migration}*"))
- if len(matches) > 1:
- click.echo(f"Found multiple migrations matching {migration}")
- for match in matches:
- click.echo(f"- {match}")
- click.echo("Try again with a more specific pattern")
- raise click.Abort()
- if len(matches) == 0:
- click.echo(f"Could not find migration matching {migration}")
- raise click.Abort()
- return MigrationMeta.from_filename(str(matches[0].resolve()))
- def find_highest_migration(migrations_path: str, renamed_migration: MigrationMeta) -> MigrationMeta:
- highest = 0
- found = None
- matches = list(pathlib.Path(migrations_path).glob("[0-9]*"))
- for match in matches:
- meta = MigrationMeta.from_filename(str(match.resolve()))
- if meta.number > highest and meta.full_name != renamed_migration.full_name:
- highest = meta.number
- found = meta
- if not found:
- click.echo("Could not find the head migration")
- raise click.Abort()
- click.echo(f"> Current head migration is {found.full_name}")
- return found
- @click.command()
- @click.argument("migration", required=True)
- @click.argument("app_label", default="sentry")
- def main(migration: str, app_label: str):
- """
- Update a migration to the top of the migration history.
- migration - The name or number of the migration.
- app_label - The name of the django app the migration is in. Defaults to sentry
- Will do the following:
- - Rename the migration file.
- - Update `dependencies` in the migration to point at the highest migration.
- - Update the name of the related tests/migration if it exists.
- - Update migrations_lockfile.txt.
- """
- app_path = apps.get_app_config(app_label).path
- migrations_path = os.path.join(app_path, "migrations")
- migration_meta = find_migration(migrations_path, migration)
- current_head = find_highest_migration(migrations_path, migration_meta)
- # Create an instance of the migration so that we can read instance properties.
- module = load_module(migration_meta.path)
- migration_instance = module.Migration("", app_label)
- dependencies = migration_instance.dependencies
- new_name = migration_meta.full_name.replace(
- str(migration_meta.number), str(current_head.number + 1)
- )
- click.echo(f"> Updating migration {migration_meta.full_name} to {new_name}")
- with open(migration_meta.path) as f:
- contents = f.read()
- for dep in dependencies:
- if dep[0] == app_label:
- contents = contents.replace(dep[1], current_head.full_name)
- with open(migration_meta.path, "w") as f:
- f.write(contents)
- # Rename the file.
- os.rename(migration_meta.path, os.path.join(migrations_path, f"{new_name}.py"))
- click.echo("> Migration file rename complete")
- # Update lockfile
- lockfile = os.path.join(app_path, "..", "..", "migrations_lockfile.txt")
- with open(lockfile) as f:
- contents = f.read()
- contents = contents.replace(current_head.full_name, new_name)
- with open(lockfile, "w") as f:
- f.write(contents)
- click.echo("> Updated migrations_lockfile.txt")
- # Rename test if it exists.
- test_file_prefix = f"*{migration_meta.number}*"
- migration_test_path = os.path.join(app_path, "..", "..", "tests", "sentry", "migrations")
- matches = list(pathlib.Path(migration_test_path).glob(test_file_prefix))
- if len(matches) == 1:
- click.echo("> Updating migration test file to & from attributes")
- test_path = str(matches[0].resolve())
- with open(test_path) as f:
- contents = f.read()
- contents = re.sub(
- r"(migrate_from\s+=\s+\")([^\"]+)(\")",
- lambda matches: f"{matches.group(1)}{current_head.full_name}{matches.group(3)}",
- contents,
- )
- contents = re.sub(
- r"(migrate_to\s+=\s+\")([^\"]+)(\")",
- lambda matches: f"{matches.group(1)}{new_name}{matches.group(3)}",
- contents,
- )
- with open(test_path, "w") as f:
- f.write(contents)
- click.echo("> Renaming test file")
- os.rename(
- str(matches[0].resolve()), os.path.join(migration_test_path, f"test_{new_name}.py")
- )
- else:
- click.echo("> Could not find a migration test file")
- click.echo("All done!")
- if __name__ == "__main__":
- main()
|