indexer_consumer.py 3.9 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899
  1. import logging
  2. from typing import Any, Dict, MutableMapping, Optional, Sequence
  3. from confluent_kafka import Producer
  4. from django.conf import settings
  5. from sentry.sentry_metrics import indexer
  6. from sentry.sentry_metrics.indexer.tasks import process_indexed_metrics
  7. from sentry.utils import json, kafka_config, metrics
  8. from sentry.utils.batching_kafka_consumer import AbstractBatchWorker, BatchingKafkaConsumer
  9. from sentry.utils.kafka import create_batching_kafka_consumer
  10. logger = logging.getLogger(__name__)
  11. def get_metrics_consumer(
  12. topic: Optional[str] = None, **options: Dict[str, str]
  13. ) -> BatchingKafkaConsumer:
  14. snuba_metrics = settings.KAFKA_TOPICS[settings.KAFKA_SNUBA_METRICS]
  15. snuba_metrics_producer = Producer(
  16. kafka_config.get_kafka_producer_cluster_options(snuba_metrics["cluster"]),
  17. )
  18. return create_batching_kafka_consumer(
  19. {topic},
  20. worker=MetricsIndexerWorker(producer=snuba_metrics_producer),
  21. **options,
  22. )
  23. class MetricsIndexerWorker(AbstractBatchWorker): # type: ignore
  24. def __init__(self, producer: Producer) -> None:
  25. self.__producer = producer
  26. self.__producer_topic = settings.KAFKA_TOPICS[settings.KAFKA_SNUBA_METRICS].get(
  27. "topic", "snuba-metrics"
  28. )
  29. def process_message(self, message: Any) -> MutableMapping[str, Any]:
  30. parsed_message: MutableMapping[str, Any] = json.loads(message.value(), use_rapid_json=True)
  31. metric_name = parsed_message["name"]
  32. tags = parsed_message.get("tags", {})
  33. strings = {
  34. metric_name,
  35. *tags.keys(),
  36. *tags.values(),
  37. }
  38. with metrics.timer("metrics_consumer.bulk_record"):
  39. mapping = indexer.bulk_record(list(strings)) # type: ignore
  40. new_tags = {mapping[k]: mapping[v] for k, v in tags.items()}
  41. parsed_message["tags"] = new_tags
  42. parsed_message["metric_id"] = mapping[metric_name]
  43. parsed_message["retention_days"] = 90
  44. return parsed_message
  45. def flush_batch(self, batch: Sequence[MutableMapping[str, Any]]) -> None:
  46. # produce the translated message to snuba-metrics topic
  47. for message in batch:
  48. self.__producer.produce(
  49. topic=self.__producer_topic,
  50. key=None,
  51. value=json.dumps(message).encode(),
  52. on_delivery=self.callback,
  53. )
  54. message_type = message.get("type", "unknown")
  55. metrics.incr(
  56. "metrics_consumer.producer.messages_seen", tags={"metric_type": message_type}
  57. )
  58. with metrics.timer("metrics_consumer.producer.flush"):
  59. messages_left = self.__producer.flush(5.0)
  60. if messages_left != 0:
  61. # TODO(meredith): We are not currently keeping track of
  62. # which callbacks failed. This means could potentially
  63. # be duplicating messages since we don't commit offsets
  64. # unless all the callbacks are successful.
  65. #
  66. # In the future if we know which callback failed, we can
  67. # commit only up to that point and retry on the remaining
  68. # messages.
  69. metrics.incr("metrics_consumer.producer.messages_left")
  70. raise Exception(f"didn't get all the callbacks: {messages_left} left")
  71. # if we have successfully produced messages to the snuba-metrics topic
  72. # then enque task to send a slimmed down payload to the product metrics data model.
  73. # TODO(meredith): once we know more about what the product data model needs
  74. # adjust payload to send the necessary data
  75. messages = [{"tags": m["tags"], "name": m["name"], "org_id": m["org_id"]} for m in batch]
  76. process_indexed_metrics.apply_async(kwargs={"messages": messages})
  77. def shutdown(self) -> None:
  78. return
  79. def callback(self, error: Any, message: Any) -> None:
  80. if error is not None:
  81. raise Exception(error.str())