12345678910111213141516171819202122232425262728293031323334353637383940414243444546474849505152535455565758596061626364656667686970717273747576777879 |
- #!/usr/bin/env python
- """Insert mock replay recording messages into the INGEST_REPLAYS_RECORDINGS topic.
- Helpful commands:
- - Run the consumer.
- - `sentry run consumer ingest-replay-recordings --consumer-group 0`
- - `sentry run consumer ingest-replay-recordings-two-step --consumer-group 0`
- - Check if offsets are committed correctly.
- - `docker exec -it kafka-kafka-1 kafka-consumer-groups --bootstrap-server localhost:9092 --describe --group 0`
- """
- import logging
- import os
- import time
- import uuid
- import click
- import django
- from arroyo import Topic as ArroyoTopic
- from arroyo.backends.kafka import KafkaPayload, KafkaProducer, build_kafka_configuration
- from sentry_kafka_schemas.codecs import Codec
- from sentry_kafka_schemas.schema_types.ingest_replay_recordings_v1 import ReplayRecording
- from sentry.conf.types.kafka_definition import Topic, get_topic_codec
- from sentry.runner import configure
- from sentry.utils.kafka_config import get_kafka_producer_cluster_options, get_topic_definition
- configure()
- os.environ.setdefault("DJANGO_SETTINGS_MODULE", "sentry.conf.server")
- django.setup()
- logger = logging.getLogger(__name__)
- def get_producer() -> KafkaProducer:
- cluster_name = get_topic_definition(Topic.INGEST_REPLAYS_RECORDINGS)["cluster"]
- producer_config = get_kafka_producer_cluster_options(cluster_name)
- return KafkaProducer(build_kafka_configuration(default_config=producer_config))
- RECORDING_CODEC: Codec[ReplayRecording] = get_topic_codec(Topic.INGEST_REPLAYS_RECORDINGS)
- @click.command()
- @click.option("--organization-id", type=int, required=True, help="Organization ID")
- @click.option("--project-id", type=int, required=True, help="Project ID")
- def main(organization_id: int, project_id: int) -> None:
- """Produce a mock uptime result message to the INGEST_REPLAYS_RECORDINGS topic."""
- message: ReplayRecording = {
- "key_id": None,
- "org_id": organization_id,
- "payload": b'{"segment_id":0}\n[]',
- "project_id": project_id,
- "received": int(time.time()),
- "replay_event": None,
- "replay_id": uuid.uuid4().hex,
- "replay_video": None,
- "retention_days": 30,
- "type": "replay_recording_not_chunked",
- "version": 1,
- }
- producer = get_producer()
- topic = get_topic_definition(Topic.INGEST_REPLAYS_RECORDINGS)["real_topic_name"]
- payload = KafkaPayload(None, RECORDING_CODEC.encode(message), [])
- producer.produce(ArroyoTopic(topic), payload)
- producer.close()
- logger.info("Successfully produced message to %s", topic)
- if __name__ == "__main__":
- main()
|