locustfile.py 3.3 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106
  1. import datetime
  2. import time
  3. from celery.result import AsyncResult
  4. from locust import HttpUser, between, task
  5. from events.test_data.event_generator import generate_random_event
  6. from glitchtip.celery import app
  7. class CeleryClient:
  8. """
  9. CeleryClient is a wrapper around the Celery client.
  10. It proxies any function calls and fires the *request* event when they finish,
  11. so that the calls get recorded in Locust.
  12. """
  13. def __init__(self, request_event):
  14. self.client = app
  15. self.task_timeout = 60
  16. self._request_event = request_event
  17. def send_task(self, name, args=None, kwargs=None, queue=None):
  18. options = {}
  19. if queue:
  20. options["queue"] = queue
  21. request_meta = {
  22. "request_type": "celery",
  23. "response_length": 0,
  24. "name": name,
  25. "start_time": time.time(),
  26. "response": None,
  27. "context": {},
  28. "exception": None,
  29. }
  30. t0 = datetime.datetime.now()
  31. try:
  32. async_result = self.client.send_task(
  33. name, args=args, kwargs=kwargs, **options
  34. )
  35. result = async_result.get(self.task_timeout) # blocking
  36. request_meta["response"] = result
  37. t1 = async_result.date_done
  38. except Exception as e:
  39. t1 = None
  40. request_meta["exception"] = e
  41. request_meta["response_time"] = (
  42. None if not t1 else (t1 - t0).total_seconds() * 1000
  43. )
  44. self._request_event.fire(
  45. **request_meta
  46. ) # this is what makes the request actually get logged in Locust
  47. return request_meta["response"]
  48. def monitor_task(self, task_name, task_id):
  49. """Monitor and record a known task by id"""
  50. request_meta = {
  51. "request_type": "celery",
  52. "response_length": 0,
  53. "name": task_name,
  54. "start_time": time.time(),
  55. "response": None,
  56. "context": {},
  57. "exception": None,
  58. }
  59. t0 = datetime.datetime.now()
  60. try:
  61. async_result = AsyncResult(task_id)
  62. async_result.wait(self.task_timeout)
  63. t1 = async_result.date_done
  64. except Exception as e:
  65. t1 = None
  66. request_meta["exception"] = e
  67. request_meta["response_time"] = (
  68. None if not t1 else (t1 - t0).total_seconds() * 1000
  69. )
  70. self._request_event.fire(
  71. **request_meta
  72. ) # this is what makes the request actually get logged in Locust
  73. return request_meta["response"]
  74. class WebsiteUser(HttpUser):
  75. wait_time = between(1.0, 2.0)
  76. def __init__(self, environment):
  77. super().__init__(environment)
  78. self.celery_client = CeleryClient(
  79. request_event=environment.events.request,
  80. )
  81. @task
  82. def send_event(self):
  83. project_id = "1"
  84. dsn = ""
  85. event_url = f"/api/{project_id}/store/?sentry_key={dsn}"
  86. event = generate_random_event(True)
  87. res = self.client.post(event_url, json=event)
  88. task_id = res.json().get("task_id")
  89. self.celery_client.monitor_task("ingest_event", task_id)
  90. # @task
  91. # def test_debug_task(self):
  92. # self.celery_client.send_task("glitchtip.celery.debug_task")