|
@@ -2,14 +2,13 @@ from __future__ import absolute_import
|
|
|
|
|
|
import logging
|
|
|
|
|
|
-from kafka import KafkaProducer
|
|
|
+from confluent_kafka import Producer
|
|
|
from django.utils.functional import cached_property
|
|
|
|
|
|
from sentry import quotas
|
|
|
from sentry.models import Organization
|
|
|
from sentry.eventstream.base import EventStream
|
|
|
from sentry.utils import json
|
|
|
-from sentry.utils.pubsub import QueuedPublisher
|
|
|
|
|
|
logger = logging.getLogger(__name__)
|
|
|
|
|
@@ -20,24 +19,20 @@ logger = logging.getLogger(__name__)
|
|
|
EVENT_PROTOCOL_VERSION = 1
|
|
|
|
|
|
|
|
|
-class KafkaPublisher(object):
|
|
|
- def __init__(self, connection):
|
|
|
- self.connection = connection or {}
|
|
|
+class KafkaEventStream(EventStream):
|
|
|
+ def __init__(self, publish_topic='events', producer_configuration=None, **options):
|
|
|
+ if producer_configuration is None:
|
|
|
+ producer_configuration = {}
|
|
|
|
|
|
- @cached_property
|
|
|
- def client(self):
|
|
|
- return KafkaProducer(**self.connection)
|
|
|
+ self.publish_topic = publish_topic
|
|
|
+ self.producer_configuration = producer_configuration
|
|
|
|
|
|
- def publish(self, topic, value, key=None):
|
|
|
- return self.client.send(topic, key=key, value=value)
|
|
|
+ @cached_property
|
|
|
+ def producer(self):
|
|
|
+ return Producer(self.producer_configuration)
|
|
|
|
|
|
-
|
|
|
-class KafkaEventStream(EventStream):
|
|
|
- def __init__(self, publish_topic='events', sync=False, connection=None, **options):
|
|
|
- self.publish_topic = publish_topic
|
|
|
- self.pubsub = KafkaPublisher(connection)
|
|
|
- if not sync:
|
|
|
- self.pubsub = QueuedPublisher(self.pubsub)
|
|
|
+ def delivery_callback(self, error, message):
|
|
|
+ logger.warning('Could not publish event (error: %s): %r', error, message)
|
|
|
|
|
|
def publish(self, group, event, is_new, is_sample, is_regression, is_new_group_environment, primary_hash, skip_consume=False):
|
|
|
project = event.project
|
|
@@ -45,6 +40,16 @@ class KafkaEventStream(EventStream):
|
|
|
organization=Organization(project.organization_id)
|
|
|
)
|
|
|
|
|
|
+ # Polling the producer is required to ensure callbacks are fired. This
|
|
|
+ # means that the latency between a message being delivered (or failing
|
|
|
+ # to be delivered) and the corresponding callback being fired is
|
|
|
+ # roughly the same as the duration of time that passes between publish
|
|
|
+ # calls. If this ends up being too high, the publisher should be moved
|
|
|
+ # into a background thread that can poll more frequently without
|
|
|
+ # interfering with request handling. (This does `poll` does not act as
|
|
|
+ # a heartbeat for the purposes of any sort of session expiration.)
|
|
|
+ self.producer.poll(0.0)
|
|
|
+
|
|
|
try:
|
|
|
key = '%s:%s' % (event.project_id, event.event_id)
|
|
|
value = (EVENT_PROTOCOL_VERSION, 'insert', {
|
|
@@ -64,8 +69,12 @@ class KafkaEventStream(EventStream):
|
|
|
'is_regression': is_regression,
|
|
|
'is_new_group_environment': is_new_group_environment,
|
|
|
})
|
|
|
-
|
|
|
- self.pubsub.publish(self.publish_topic, key=key.encode('utf-8'), value=json.dumps(value))
|
|
|
+ self.producer.produce(
|
|
|
+ self.publish_topic,
|
|
|
+ key=key.encode('utf-8'),
|
|
|
+ value=json.dumps(value),
|
|
|
+ on_delivery=self.delivery_callback,
|
|
|
+ )
|
|
|
except Exception as error:
|
|
|
logger.warning('Could not publish event: %s', error, exc_info=True)
|
|
|
raise
|