test_ingest_consumer.py 3.3 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110
  1. from __future__ import absolute_import
  2. import datetime
  3. import time
  4. import logging
  5. import msgpack
  6. import pytest
  7. from sentry.event_manager import EventManager
  8. from sentry.ingest_consumer import ConsumerType, run_ingest_consumer
  9. from sentry.models.event import Event
  10. from sentry.testutils.factories import Factories
  11. from django.conf import settings
  12. logger = logging.getLogger(__name__)
  13. def _get_test_message(project):
  14. """
  15. creates a test message to be inserted in a kafka queue
  16. """
  17. now = datetime.datetime.now()
  18. # the event id should be 32 digits
  19. event_id = "{}".format(now.strftime("000000000000%Y%m%d%H%M%S%f"))
  20. message_text = "some message {}".format(event_id)
  21. project_id = project.id # must match the project id set up by the test fixtures
  22. event = {
  23. "message": message_text,
  24. "extra": {"the_id": event_id},
  25. "project_id": project_id,
  26. "event_id": event_id,
  27. }
  28. em = EventManager(event, project=project)
  29. em.normalize()
  30. normalized_event = dict(em.get_data())
  31. message = {
  32. "ty": (0, ()),
  33. "start_time": time.time(),
  34. "event_id": event_id,
  35. "project_id": 1,
  36. "payload": normalized_event,
  37. }
  38. val = msgpack.packb(message)
  39. return val, event_id
  40. def _shutdown_requested(max_secs, num_events):
  41. """
  42. Requests a shutdown after the specified interval has passed or the specified number
  43. of events are detected
  44. :param max_secs: number of seconds after which to request a shutdown
  45. :param num_events: number of events after which to request a shutdown
  46. :return: True if a shutdown is requested False otherwise
  47. """
  48. def inner():
  49. end_time = time.time()
  50. if end_time - start_time > max_secs:
  51. logger.debug("Shutdown requested because max secs exceeded")
  52. return True
  53. elif Event.objects.count() >= num_events:
  54. logger.debug("Shutdown requested because num events reached")
  55. return True
  56. else:
  57. return False
  58. start_time = time.time()
  59. return inner
  60. @pytest.mark.django_db
  61. def test_ingest_consumer_reads_from_topic_and_calls_celery_task(
  62. task_runner, kafka_producer, kafka_admin
  63. ):
  64. consumer_group = "test-consumer"
  65. admin = kafka_admin(settings)
  66. admin.delete_events_topic()
  67. producer = kafka_producer(settings)
  68. organization = Factories.create_organization()
  69. project = Factories.create_project(organization=organization)
  70. topic_event_name = ConsumerType.get_topic_name(ConsumerType.Events)
  71. event_ids = set()
  72. for _ in range(3):
  73. message, event_id = _get_test_message(project)
  74. event_ids.add(event_id)
  75. producer.produce(topic_event_name, message)
  76. with task_runner():
  77. run_ingest_consumer(
  78. commit_batch_size=2,
  79. consumer_group=consumer_group,
  80. consumer_type=ConsumerType.Events,
  81. max_fetch_time_seconds=0.1,
  82. initial_offset_reset="earliest",
  83. is_shutdown_requested=_shutdown_requested(max_secs=10, num_events=3),
  84. )
  85. # check that we got the messages
  86. assert Event.objects.count() == 3
  87. for event_id in event_ids:
  88. message = Event.objects.get(event_id=event_id)
  89. assert message is not None
  90. # check that the data has not been scrambled
  91. assert message.data["extra"]["the_id"] == event_id