123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492 |
- from __future__ import annotations
- import logging
- from datetime import datetime
- from typing import (
- TYPE_CHECKING,
- Any,
- Collection,
- Mapping,
- MutableMapping,
- Optional,
- Sequence,
- Tuple,
- )
- from uuid import uuid4
- import pytz
- import sentry_kafka_schemas
- import urllib3
- from sentry import quotas
- from sentry.eventstore.models import GroupEvent
- from sentry.eventstream.base import EventStream, EventStreamEventType, GroupStates
- from sentry.utils import json, snuba
- from sentry.utils.safe import get_path
- from sentry.utils.sdk import set_current_event_project
- KW_SKIP_SEMANTIC_PARTITIONING = "skip_semantic_partitioning"
- logger = logging.getLogger(__name__)
- if TYPE_CHECKING:
- from sentry.eventstore.models import Event
- # Version 1 format: (1, TYPE, [...REST...])
- # Insert: (1, 'insert', {
- # ...event json...
- # }, {
- # ...state for post-processing...
- # })
- #
- # Mutations that *should be ignored*: (1, ('delete_groups'|'unmerge'|'merge'), {...})
- #
- # In short, for protocol version 1 only messages starting with (1, 'insert', ...)
- # should be processed.
- # Version 2 format: (2, TYPE, [...REST...])
- # Insert: (2, 'insert', {
- # ...event json...
- # }, {
- # ...state for post-processing...
- # })
- # Delete Groups: (2, '(start_delete_groups|end_delete_groups)', {
- # 'transaction_id': uuid,
- # 'project_id': id,
- # 'group_ids': [id2, id2, id3],
- # 'datetime': timestamp,
- # })
- # Merge: (2, '(start_merge|end_merge)', {
- # 'transaction_id': uuid,
- # 'project_id': id,
- # 'previous_group_ids': [id2, id2],
- # 'new_group_id': id,
- # 'datetime': timestamp,
- # })
- # Unmerge: (2, '(start_unmerge|end_unmerge)', {
- # 'transaction_id': uuid,
- # 'project_id': id,
- # 'previous_group_id': id,
- # 'new_group_id': id,
- # 'hashes': [hash2, hash2]
- # 'datetime': timestamp,
- # })
- # Delete Tag: (2, '(start_delete_tag|end_delete_tag)', {
- # 'transaction_id': uuid,
- # 'project_id': id,
- # 'tag': 'foo',
- # 'datetime': timestamp,
- # })
- class SnubaProtocolEventStream(EventStream):
- # Beware! Changing this protocol (introducing a new version, or the message
- # format/fields themselves) requires consideration of all downstream
- # consumers. This includes the post-processing forwarder code!
- EVENT_PROTOCOL_VERSION = 2
- # These keys correspond to tags that are typically prefixed with `sentry:`
- # and will wreak havok in the UI if both the `sentry:`-prefixed and
- # non-prefixed variations occur in a response.
- UNEXPECTED_TAG_KEYS = frozenset(["dist", "release", "user"])
- def _get_headers_for_insert(
- self,
- event: Event | GroupEvent,
- is_new: bool,
- is_regression: bool,
- is_new_group_environment: bool,
- primary_hash: Optional[str],
- received_timestamp: float,
- skip_consume: bool,
- group_states: Optional[GroupStates] = None,
- ) -> MutableMapping[str, str]:
- return {
- "Received-Timestamp": str(received_timestamp),
- "queue": self._get_queue_for_post_process(event),
- }
- def insert(
- self,
- event: Event | GroupEvent,
- is_new: bool,
- is_regression: bool,
- is_new_group_environment: bool,
- primary_hash: Optional[str],
- received_timestamp: float,
- skip_consume: bool = False,
- group_states: Optional[GroupStates] = None,
- **kwargs: Any,
- ) -> None:
- if isinstance(event, GroupEvent) and not event.occurrence:
- logger.error(
- "`GroupEvent` passed to `EventStream.insert`. `GroupEvent` may only be passed when "
- "associated with an `IssueOccurrence`",
- exc_info=True,
- )
- return
- project = event.project
- set_current_event_project(project.id)
- retention_days = quotas.backend.get_event_retention(organization=project.organization)
- event_data = event.get_raw_data(for_stream=True)
- unexpected_tags = {
- k
- for (k, v) in (get_path(event_data, "tags", filter=True) or [])
- if k in self.UNEXPECTED_TAG_KEYS
- }
- if unexpected_tags:
- logger.error("%r received unexpected tags: %r", self, unexpected_tags)
- headers = self._get_headers_for_insert(
- event,
- is_new,
- is_regression,
- is_new_group_environment,
- primary_hash,
- received_timestamp,
- skip_consume,
- group_states,
- )
- skip_semantic_partitioning = (
- kwargs[KW_SKIP_SEMANTIC_PARTITIONING]
- if KW_SKIP_SEMANTIC_PARTITIONING in kwargs
- else False
- )
- event_type = self._get_event_type(event)
- occurrence_data = self._get_occurrence_data(event)
- # instead of normalizing and doing custom 'contexts' processing in snuba, we elect to do it here instead to
- # avoid having to clutter up snuba code with business logic
- if event_type == EventStreamEventType.Generic:
- event_data = dict(event_data)
- contexts = event_data.setdefault("contexts", {})
- # add user.geo to contexts if it exists
- user_dict = event_data.get("user") or event_data.get("sentry.interfaces.User") or {}
- geo = user_dict.get("geo", {})
- if "geo" not in contexts and isinstance(geo, dict):
- contexts["geo"] = geo
- # transactions processing has a configurable 'skipped contexts' to skip writing specific contexts maps
- # to the row. for now, we're ignoring that until we have a need for it
- self._send(
- project.id,
- "insert",
- extra_data=(
- {
- "group_id": event.group_id,
- "group_ids": [group.id for group in getattr(event, "groups", [])],
- "event_id": event.event_id,
- "organization_id": project.organization_id,
- "project_id": event.project_id,
- # TODO(mitsuhiko): We do not want to send this incorrect
- # message but this is what snuba needs at the moment.
- "message": event.search_message,
- "platform": event.platform,
- "datetime": json.datetime_to_str(event.datetime),
- "data": event_data,
- "primary_hash": primary_hash,
- "retention_days": retention_days,
- "occurrence_id": occurrence_data.get("id"),
- "occurrence_data": occurrence_data,
- },
- {
- "is_new": is_new,
- "is_regression": is_regression,
- "is_new_group_environment": is_new_group_environment,
- "queue": headers["queue"],
- "skip_consume": skip_consume,
- "group_states": group_states,
- },
- ),
- headers=headers,
- skip_semantic_partitioning=skip_semantic_partitioning,
- event_type=event_type,
- )
- def start_delete_groups(
- self, project_id: int, group_ids: Sequence[int]
- ) -> Optional[Mapping[str, Any]]:
- if not group_ids:
- return None
- state = {
- "transaction_id": uuid4().hex,
- "project_id": project_id,
- "group_ids": list(group_ids),
- "datetime": datetime.now(tz=pytz.utc),
- }
- self._send(project_id, "start_delete_groups", extra_data=(state,), asynchronous=False)
- return state
- def end_delete_groups(self, state: Mapping[str, Any]) -> None:
- state_copy: MutableMapping[str, Any] = {**state}
- state_copy["datetime"] = json.datetime_to_str(datetime.now(tz=pytz.utc))
- self._send(
- state_copy["project_id"],
- "end_delete_groups",
- extra_data=(state_copy,),
- asynchronous=False,
- )
- def start_merge(
- self, project_id: int, previous_group_ids: Sequence[int], new_group_id: int
- ) -> Optional[Mapping[str, Any]]:
- if not previous_group_ids:
- return None
- state = {
- "transaction_id": uuid4().hex,
- "project_id": project_id,
- "previous_group_ids": list(previous_group_ids),
- "new_group_id": new_group_id,
- "datetime": json.datetime_to_str(datetime.now(tz=pytz.utc)),
- }
- self._send(project_id, "start_merge", extra_data=(state,), asynchronous=False)
- return state
- def end_merge(self, state: Mapping[str, Any]) -> None:
- state_copy: MutableMapping[str, Any] = {**state}
- state_copy["datetime"] = datetime.now(tz=pytz.utc)
- self._send(
- state_copy["project_id"], "end_merge", extra_data=(state_copy,), asynchronous=False
- )
- def start_unmerge(
- self, project_id: int, hashes: Collection[str], previous_group_id: int, new_group_id: int
- ) -> Optional[Mapping[str, Any]]:
- if not hashes:
- return None
- state = {
- "transaction_id": uuid4().hex,
- "project_id": project_id,
- "previous_group_id": previous_group_id,
- "new_group_id": new_group_id,
- "hashes": list(hashes),
- "datetime": json.datetime_to_str(datetime.now(tz=pytz.utc)),
- }
- self._send(project_id, "start_unmerge", extra_data=(state,), asynchronous=False)
- return state
- def end_unmerge(self, state: Mapping[str, Any]) -> None:
- state_copy: MutableMapping[str, Any] = {**state}
- state_copy["datetime"] = json.datetime_to_str(datetime.now(tz=pytz.utc))
- self._send(
- state_copy["project_id"], "end_unmerge", extra_data=(state_copy,), asynchronous=False
- )
- def start_delete_tag(self, project_id: int, tag: str) -> Optional[Mapping[str, Any]]:
- if not tag:
- return None
- state = {
- "transaction_id": uuid4().hex,
- "project_id": project_id,
- "tag": tag,
- "datetime": json.datetime_to_str(datetime.now(tz=pytz.utc)),
- }
- self._send(project_id, "start_delete_tag", extra_data=(state,), asynchronous=False)
- return state
- def end_delete_tag(self, state: Mapping[str, Any]) -> None:
- state_copy: MutableMapping[str, Any] = {**state}
- state_copy["datetime"] = json.datetime_to_str(datetime.now(tz=pytz.utc))
- self._send(
- state_copy["project_id"], "end_delete_tag", extra_data=(state_copy,), asynchronous=False
- )
- def tombstone_events_unsafe(
- self,
- project_id: int,
- event_ids: Sequence[str],
- old_primary_hash: Optional[str] = None,
- from_timestamp: Optional[datetime] = None,
- to_timestamp: Optional[datetime] = None,
- ) -> None:
- """
- Tell Snuba to eventually delete these events.
- This marks events as deleted but does not immediately exclude those
- events from all queries. Because of that limitation this is not proper,
- because not immediate, event deletion.
- "Proper" group deletion is essentially running this function for every
- event in the group, plus `exclude_groups` to make sure the changes are
- immediately user-visible.
- Reprocessing (v2) splits a group into events-to-be-reprocessed
- (re-insert with new group_id) and events-to-be-deleted
- (`tombstone_events`), then excludes the group from all queries
- (`exclude_groups`).
- :param old_primary_hash: If present, the event is only tombstoned
- to be reinserted over with a guaranteed-different primary hash.
- This is necessary with Snuba's errors table as the primary_hash is
- part of the PK/sortkey.
- """
- state = {
- "project_id": project_id,
- "event_ids": event_ids,
- "old_primary_hash": old_primary_hash,
- "from_timestamp": from_timestamp,
- "to_timestamp": to_timestamp,
- }
- self._send(project_id, "tombstone_events", extra_data=(state,), asynchronous=False)
- def replace_group_unsafe(
- self,
- project_id: int,
- event_ids: Sequence[str],
- new_group_id: int,
- from_timestamp: Optional[datetime] = None,
- to_timestamp: Optional[datetime] = None,
- ) -> None:
- """
- Tell Snuba to move events into a new group ID
- Same caveats as tombstone_events
- """
- state = {
- "project_id": project_id,
- "event_ids": event_ids,
- "new_group_id": new_group_id,
- "from_timestamp": from_timestamp,
- "to_timestamp": to_timestamp,
- }
- self._send(project_id, "replace_group", extra_data=(state,), asynchronous=False)
- def exclude_groups(self, project_id: int, group_ids: Sequence[int]) -> None:
- """
- Exclude a group from queries for a while until event tombstoning takes
- effect. See docstring of `tombstone_events`.
- `exclude_groups` basically makes Snuba add `where group_id not in (1,
- 2, ...)` to every query.
- """
- state = {"project_id": project_id, "group_ids": group_ids}
- self._send(project_id, "exclude_groups", extra_data=(state,), asynchronous=False)
- def _send(
- self,
- project_id: int,
- _type: str,
- extra_data: Tuple[Any, ...] = (),
- asynchronous: bool = True,
- headers: Optional[MutableMapping[str, str]] = None,
- skip_semantic_partitioning: bool = False,
- event_type: EventStreamEventType = EventStreamEventType.Error,
- ) -> None:
- raise NotImplementedError
- class SnubaEventStream(SnubaProtocolEventStream):
- def _send(
- self,
- project_id: int,
- _type: str,
- extra_data: Tuple[Any, ...] = (),
- asynchronous: bool = True,
- headers: Optional[MutableMapping[str, str]] = None,
- skip_semantic_partitioning: bool = False,
- event_type: EventStreamEventType = EventStreamEventType.Error,
- ) -> None:
- if headers is None:
- headers = {}
- data = (self.EVENT_PROTOCOL_VERSION, _type) + extra_data
- entity = "events"
- if event_type == EventStreamEventType.Transaction:
- entity = "transactions"
- if event_type == EventStreamEventType.Generic:
- entity = "search_issues"
- serialized_data = json.dumps(data)
- try:
- codec = sentry_kafka_schemas.get_codec(
- topic={
- "events": "events",
- "transactions": "transactions",
- "search_issues": "generic-events",
- }[entity]
- )
- except (sentry_kafka_schemas.SchemaNotFound, KeyError):
- # Needed since "generic-events" does not have a schema yet
- codec = sentry_kafka_schemas.codecs.json.JsonCodec(None)
- codec.decode(serialized_data.encode("utf-8"), validate=True)
- try:
- resp = snuba._snuba_pool.urlopen(
- "POST",
- f"/tests/{entity}/eventstream",
- body=serialized_data,
- headers={f"X-Sentry-{k}": v for k, v in headers.items()},
- )
- if resp.status != 200:
- raise snuba.SnubaError(
- f"HTTP {resp.status} response from Snuba! {json.loads(resp.data)}"
- )
- return None
- except urllib3.exceptions.HTTPError as err:
- raise snuba.SnubaError(err)
- def requires_post_process_forwarder(self) -> bool:
- return False
- def insert(
- self,
- event: Event | GroupEvent,
- is_new: bool,
- is_regression: bool,
- is_new_group_environment: bool,
- primary_hash: Optional[str],
- received_timestamp: float,
- skip_consume: bool = False,
- group_states: Optional[GroupStates] = None,
- **kwargs: Any,
- ) -> None:
- super().insert(
- event,
- is_new,
- is_regression,
- is_new_group_environment,
- primary_hash,
- received_timestamp,
- skip_consume,
- group_states,
- **kwargs,
- )
- self._dispatch_post_process_group_task(
- event.event_id,
- event.project_id,
- event.group_id,
- is_new,
- is_regression,
- is_new_group_environment,
- primary_hash,
- self._get_queue_for_post_process(event),
- skip_consume,
- group_states,
- occurrence_id=event.occurrence_id if isinstance(event, GroupEvent) else None,
- )
|