app.py 3.1 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125
  1. """
  2. All-in-one GlitchTip process
  3. Do not scale this beyond 1 instance. Instead use bin/run-* scripts
  4. """
  5. import contextlib
  6. import os
  7. import signal
  8. import threading
  9. import time
  10. import django
  11. import uvicorn
  12. from django.core.management import call_command
  13. from glitchtip.celery import app
  14. os.environ.setdefault("DJANGO_SETTINGS_MODULE", "glitchtip.settings")
  15. django.setup()
  16. class Server(uvicorn.Server):
  17. def install_signal_handlers(self):
  18. pass
  19. @contextlib.contextmanager
  20. def run_in_thread(self):
  21. thread = threading.Thread(target=self.run)
  22. thread.start()
  23. try:
  24. while not self.started:
  25. time.sleep(1e-3)
  26. yield
  27. finally:
  28. self.should_exit = True
  29. thread.join()
  30. def run_celery_worker(stop_event: threading.Event):
  31. # We restart the worker periodically to mitigate celery memory leaks
  32. while not stop_event.is_set():
  33. worker = app.Worker(pool="threads", loglevel="info")
  34. worker_thread = threading.Thread(target=worker.start)
  35. worker_thread.start()
  36. # Run for 6 hours
  37. for _ in range(60 * 60 * 6):
  38. if stop_event.is_set():
  39. break
  40. time.sleep(1)
  41. # Stop the worker
  42. if worker_thread.is_alive():
  43. app.control.broadcast("shutdown", reply=True, destination=[worker.hostname])
  44. worker_thread.join()
  45. if stop_event.is_set():
  46. break
  47. def run_celery_beat():
  48. app.Beat().run()
  49. def run_django_server(stop_event: threading.Event):
  50. config = uvicorn.Config(
  51. "glitchtip.asgi:application",
  52. workers=int(os.environ.get("WEB_CONCURRENCY", 1)),
  53. host="0.0.0.0",
  54. port=8000,
  55. log_level="info",
  56. lifespan="off",
  57. )
  58. server = Server(config=config)
  59. with server.run_in_thread():
  60. while not stop_event.is_set():
  61. time.sleep(1)
  62. def run_init():
  63. call_command("migrate", no_input=True, skip_checks=True)
  64. def run_pgpartition(stop_event: threading.Event):
  65. """Run every 12 hours. Handle sigterms cleanly"""
  66. while not stop_event.is_set():
  67. call_command("pgpartition", yes=True)
  68. for _ in range(12 * 60 * 60):
  69. if stop_event.is_set():
  70. break
  71. time.sleep(1)
  72. def handle_signal(stop_event: threading.Event):
  73. def _handler(sig, frame):
  74. stop_event.set()
  75. exit(0)
  76. return _handler
  77. def main():
  78. run_init()
  79. stop_event = threading.Event()
  80. signal.signal(signal.SIGTERM, handle_signal(stop_event))
  81. signal.signal(signal.SIGINT, handle_signal(stop_event))
  82. threads = [
  83. threading.Thread(target=run_celery_worker, args=(stop_event,)),
  84. # Force beat thread to halt
  85. threading.Thread(target=run_celery_beat, daemon=True),
  86. threading.Thread(target=run_pgpartition, args=(stop_event,)),
  87. threading.Thread(target=run_django_server, args=(stop_event,)),
  88. ]
  89. for thread in threads:
  90. thread.start()
  91. # celery worker gets to be the single process handles a simplistic sigterm
  92. threads[0].join()
  93. if __name__ == "__main__":
  94. main()