123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169 |
- from __future__ import annotations
- import argparse
- import os
- import subprocess
- import time
- from collections.abc import Sequence
- from typing import Callable
- class HealthcheckError(Exception):
- pass
- class HealthCheck:
- def __init__(
- self,
- service_id: str,
- container_name: str,
- check_by_default: bool,
- check: Callable[[], object] | None = None,
- deps: list[str] | None = None,
- retries: int = 3,
- timeout_secs: int = 5,
- ):
- self.service_id = service_id
- self.container_name = container_name
- self.check_by_default = check_by_default
- self.check = check
- self.deps = deps or []
- self.retries = retries
- self.timeout_secs = timeout_secs
- def check_container(self) -> None:
- response = subprocess.run(
- ("docker", "container", "inspect", "-f", "{{.State.Status}}", self.container_name),
- capture_output=True,
- text=True,
- )
- if response.stdout.strip() != "running":
- raise HealthcheckError(f"Container '{self.container_name}' is not running.")
- def check_kafka():
- subprocess.run(
- (
- "docker",
- "exec",
- "sentry_kafka",
- "kafka-topics",
- "--zookeeper",
- "sentry_zookeeper:2181",
- "--list",
- ),
- check=True,
- )
- def check_postgres() -> None:
- subprocess.run(
- ("docker", "exec", "sentry_postgres", "pg_isready", "-U", "postgres"), check=True
- )
- # Available health checks
- all_service_healthchecks = {
- "postgres": HealthCheck(
- "postgres",
- "sentry_postgres",
- True,
- check_postgres,
- ),
- "kafka": HealthCheck(
- "kafka",
- "sentry_kafka",
- os.getenv("NEED_KAFKA") == "true",
- check_kafka,
- deps=["zookeeper"],
- ),
- "zookeeper": HealthCheck(
- "zookeeper",
- "sentry_zookeeper",
- os.getenv("NEED_KAFKA") == "true",
- ),
- }
- def run_with_retries(cmd: Callable[[], object], retries: int, timeout: int) -> None:
- for retry in range(1, retries + 1):
- try:
- cmd()
- except (HealthcheckError, subprocess.CalledProcessError) as e:
- if retry == retries:
- print(f"Command failed, no more retries: {e}")
- raise HealthcheckError(f"Command failed: {e}")
- else:
- print(f"Command failed, retrying in {timeout}s (attempt {retry+1} of {retries})...")
- time.sleep(timeout)
- else:
- return
- def get_services_to_check(id: str) -> list[str]:
- checks = []
- hc = all_service_healthchecks[id]
- for dep in hc.deps:
- dep_checks = get_services_to_check(dep)
- for d in dep_checks:
- checks.append(d)
- checks.append(id)
- return checks
- def check_health(service_ids: list[str]) -> None:
- checks = [
- check_id for service_id in service_ids for check_id in get_services_to_check(service_id)
- ]
- # dict.fromkeys is used to remove duplicates while maintaining order
- unique_checks = list(dict.fromkeys(checks))
- for name in unique_checks:
- print(f"Checking service {name}")
- hc = all_service_healthchecks[name]
- print(f"Checking '{hc.container_name}' is running...")
- ls = " ".join(unique_checks)
- try:
- run_with_retries(hc.check_container, hc.retries, hc.timeout_secs)
- except HealthcheckError:
- raise HealthcheckError(
- f"Container '{hc.container_name}' is not running.\n"
- f" Start service: sentry devservices up {hc.service_id}\n"
- f" Restart all services: sentry devservices down {ls} && sentry devservices up {ls}"
- )
- if hc.check is not None:
- print(f"Checking '{hc.container_name}' container health...")
- try:
- run_with_retries(hc.check, hc.retries, hc.timeout_secs)
- except HealthcheckError:
- raise HealthcheckError(
- f"Container '{hc.container_name}' does not appear to be healthy.\n"
- f" Restart service: sentry devservices down {hc.service_id} && sentry devservices up {hc.service_id}\n"
- f" Restart all services: sentry devservices down {ls} && sentry devservices up {ls}"
- )
- def main(argv: Sequence[str] | None = None) -> None:
- parser = argparse.ArgumentParser()
- parser.add_argument(
- "--service",
- action="append",
- choices=list(dict.fromkeys(all_service_healthchecks)),
- help="The services you wish to check on. Defaults to all services.",
- )
- args = parser.parse_args(argv)
- healthchecks = args.service
- if healthchecks is None:
- healthchecks = [k for k, v in all_service_healthchecks.items() if v.check_by_default]
- try:
- check_health(healthchecks)
- except HealthcheckError as e:
- raise SystemExit(e)
- if __name__ == "__main__":
- main()
|