#!/usr/bin/env python """Insert mock replay recording messages into the INGEST_REPLAYS_RECORDINGS topic. Helpful commands: - Run the consumer. - `sentry run consumer ingest-replay-recordings --consumer-group 0` - `sentry run consumer ingest-replay-recordings-two-step --consumer-group 0` - Check if offsets are committed correctly. - `docker exec -it kafka-kafka-1 kafka-consumer-groups --bootstrap-server localhost:9092 --describe --group 0` """ import logging import os import time import uuid import click import django from arroyo import Topic as ArroyoTopic from arroyo.backends.kafka import KafkaPayload, KafkaProducer, build_kafka_configuration from sentry_kafka_schemas.codecs import Codec from sentry_kafka_schemas.schema_types.ingest_replay_recordings_v1 import ReplayRecording from sentry.conf.types.kafka_definition import Topic, get_topic_codec from sentry.runner import configure from sentry.utils.kafka_config import get_kafka_producer_cluster_options, get_topic_definition configure() os.environ.setdefault("DJANGO_SETTINGS_MODULE", "sentry.conf.server") django.setup() logger = logging.getLogger(__name__) def get_producer() -> KafkaProducer: cluster_name = get_topic_definition(Topic.INGEST_REPLAYS_RECORDINGS)["cluster"] producer_config = get_kafka_producer_cluster_options(cluster_name) return KafkaProducer(build_kafka_configuration(default_config=producer_config)) RECORDING_CODEC: Codec[ReplayRecording] = get_topic_codec(Topic.INGEST_REPLAYS_RECORDINGS) @click.command() @click.option("--organization-id", type=int, required=True, help="Organization ID") @click.option("--project-id", type=int, required=True, help="Project ID") def main(organization_id: int, project_id: int) -> None: """Produce a mock uptime result message to the INGEST_REPLAYS_RECORDINGS topic.""" message: ReplayRecording = { "key_id": None, "org_id": organization_id, "payload": b'{"segment_id":0}\n[]', "project_id": project_id, "received": int(time.time()), "replay_event": None, "replay_id": uuid.uuid4().hex, "replay_video": None, "retention_days": 30, "type": "replay_recording_not_chunked", "version": 1, } producer = get_producer() topic = get_topic_definition(Topic.INGEST_REPLAYS_RECORDINGS)["real_topic_name"] payload = KafkaPayload(None, RECORDING_CODEC.encode(message), []) producer.produce(ArroyoTopic(topic), payload) producer.close() logger.info("Successfully produced message to %s", topic) if __name__ == "__main__": main()