mock-replay-recording 2.6 KB

12345678910111213141516171819202122232425262728293031323334353637383940414243444546474849505152535455565758596061626364656667686970717273747576777879
  1. #!/usr/bin/env python
  2. """Insert mock replay recording messages into the INGEST_REPLAYS_RECORDINGS topic.
  3. Helpful commands:
  4. - Run the consumer.
  5. - `sentry run consumer ingest-replay-recordings --consumer-group 0`
  6. - `sentry run consumer ingest-replay-recordings-two-step --consumer-group 0`
  7. - Check if offsets are committed correctly.
  8. - `docker exec -it kafka-kafka-1 kafka-consumer-groups --bootstrap-server localhost:9092 --describe --group 0`
  9. """
  10. import logging
  11. import os
  12. import time
  13. import uuid
  14. import click
  15. import django
  16. from arroyo import Topic as ArroyoTopic
  17. from arroyo.backends.kafka import KafkaPayload, KafkaProducer, build_kafka_configuration
  18. from sentry_kafka_schemas.codecs import Codec
  19. from sentry_kafka_schemas.schema_types.ingest_replay_recordings_v1 import ReplayRecording
  20. from sentry.conf.types.kafka_definition import Topic, get_topic_codec
  21. from sentry.runner import configure
  22. from sentry.utils.kafka_config import get_kafka_producer_cluster_options, get_topic_definition
  23. configure()
  24. os.environ.setdefault("DJANGO_SETTINGS_MODULE", "sentry.conf.server")
  25. django.setup()
  26. logger = logging.getLogger(__name__)
  27. def get_producer() -> KafkaProducer:
  28. cluster_name = get_topic_definition(Topic.INGEST_REPLAYS_RECORDINGS)["cluster"]
  29. producer_config = get_kafka_producer_cluster_options(cluster_name)
  30. return KafkaProducer(build_kafka_configuration(default_config=producer_config))
  31. RECORDING_CODEC: Codec[ReplayRecording] = get_topic_codec(Topic.INGEST_REPLAYS_RECORDINGS)
  32. @click.command()
  33. @click.option("--organization-id", type=int, required=True, help="Organization ID")
  34. @click.option("--project-id", type=int, required=True, help="Project ID")
  35. def main(organization_id: int, project_id: int) -> None:
  36. """Produce a mock uptime result message to the INGEST_REPLAYS_RECORDINGS topic."""
  37. message: ReplayRecording = {
  38. "key_id": None,
  39. "org_id": organization_id,
  40. "payload": b'{"segment_id":0}\n[]',
  41. "project_id": project_id,
  42. "received": int(time.time()),
  43. "replay_event": None,
  44. "replay_id": uuid.uuid4().hex,
  45. "replay_video": None,
  46. "retention_days": 30,
  47. "type": "replay_recording_not_chunked",
  48. "version": 1,
  49. }
  50. producer = get_producer()
  51. topic = get_topic_definition(Topic.INGEST_REPLAYS_RECORDINGS)["real_topic_name"]
  52. payload = KafkaPayload(None, RECORDING_CODEC.encode(message), [])
  53. producer.produce(ArroyoTopic(topic), payload)
  54. producer.close()
  55. logger.info("Successfully produced message to %s", topic)
  56. if __name__ == "__main__":
  57. main()