test_kafka.py 4.6 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122
  1. from datetime import datetime
  2. from unittest import TestCase
  3. from arroyo.backends.kafka import KafkaPayload
  4. from arroyo.backends.local.backend import LocalBroker, LocalProducer
  5. from arroyo.backends.local.storages.memory import MemoryMessageStorage
  6. from arroyo.types import Partition, Topic
  7. from arroyo.utils.clock import TestingClock as Clock
  8. from sentry.sentry_metrics.client.kafka import KafkaMetricsBackend
  9. from sentry.testutils.metrics_backend import GenericMetricsTestMixIn
  10. from sentry.utils import json
  11. class KafkaMetricsInterfaceTest(GenericMetricsTestMixIn, TestCase):
  12. def test_produce_metrics(self) -> None:
  13. generic_metrics_backend = KafkaMetricsBackend()
  14. # For testing, we are calling close() here because we
  15. # are swapping out the KafkaProducer
  16. # with a LocalProducer, but regardless,
  17. # close() must always be called in order to close
  18. # the backend's KafkaProducer
  19. generic_metrics_backend.close()
  20. my_topic = Topic("my-topic")
  21. clock = Clock()
  22. broker_storage: MemoryMessageStorage[KafkaPayload] = MemoryMessageStorage()
  23. broker: LocalBroker[KafkaPayload] = LocalBroker(broker_storage, clock)
  24. broker.create_topic(my_topic, partitions=1)
  25. generic_metrics_backend.producer = LocalProducer(broker)
  26. generic_metrics_backend.kafka_topic = my_topic
  27. # produce a set metric onto the first offset
  28. generic_metrics_backend.set(
  29. self.use_case_id,
  30. self.org_id,
  31. self.project_id,
  32. self.metric_name,
  33. self.set_values,
  34. self.tags,
  35. self.unit,
  36. )
  37. set_metric = {
  38. "org_id": self.org_id,
  39. "project_id": self.project_id,
  40. "name": self.get_mri(self.metric_name, "s", self.use_case_id, self.unit),
  41. "value": self.set_values,
  42. "timestamp": int(datetime.now().timestamp()),
  43. "tags": self.tags,
  44. "retention_days": self.retention_days,
  45. "type": "s",
  46. }
  47. set_value = json.dumps(set_metric).encode("utf-8")
  48. produced_message = broker_storage.consume(Partition(my_topic, 0), 0)
  49. assert produced_message is not None
  50. assert produced_message.payload.value == set_value
  51. # check that there's no other remaining message in the topic
  52. assert broker_storage.consume(Partition(my_topic, 0), 1) is None
  53. # produce a counter metric onto the second offset
  54. generic_metrics_backend.counter(
  55. self.use_case_id,
  56. self.org_id,
  57. self.project_id,
  58. self.metric_name,
  59. self.counter_value,
  60. self.tags,
  61. self.unit,
  62. )
  63. counter_metric = {
  64. "org_id": self.org_id,
  65. "project_id": self.project_id,
  66. "name": self.get_mri(self.metric_name, "c", self.use_case_id, self.unit),
  67. "value": self.counter_value,
  68. "timestamp": int(datetime.now().timestamp()),
  69. "tags": self.tags,
  70. "retention_days": self.retention_days,
  71. "type": "c",
  72. }
  73. counter_value = json.dumps(counter_metric).encode("utf-8")
  74. produced_message = broker_storage.consume(Partition(my_topic, 0), 1)
  75. assert produced_message is not None
  76. assert produced_message.payload.value == counter_value
  77. # check that there's no other remaining message in the topic
  78. assert broker_storage.consume(Partition(my_topic, 0), 2) is None
  79. # produce a distribution metric onto the third offset
  80. generic_metrics_backend.distribution(
  81. self.use_case_id,
  82. self.org_id,
  83. self.project_id,
  84. self.metric_name,
  85. self.dist_values,
  86. self.tags,
  87. self.unit,
  88. )
  89. distribution_metric = {
  90. "org_id": self.org_id,
  91. "project_id": self.project_id,
  92. "name": self.get_mri(self.metric_name, "d", self.use_case_id, self.unit),
  93. "value": self.dist_values,
  94. "timestamp": int(datetime.now().timestamp()),
  95. "tags": self.tags,
  96. "retention_days": self.retention_days,
  97. "type": "d",
  98. }
  99. distribution_value = json.dumps(distribution_metric).encode("utf-8")
  100. produced_message = broker_storage.consume(Partition(my_topic, 0), 2)
  101. assert produced_message is not None
  102. assert produced_message.payload.value == distribution_value
  103. # check that there's no other remaining message in the topic
  104. assert broker_storage.consume(Partition(my_topic, 0), 3) is None