snuba.py 17 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492
  1. from __future__ import annotations
  2. import logging
  3. from datetime import datetime
  4. from typing import (
  5. TYPE_CHECKING,
  6. Any,
  7. Collection,
  8. Mapping,
  9. MutableMapping,
  10. Optional,
  11. Sequence,
  12. Tuple,
  13. )
  14. from uuid import uuid4
  15. import pytz
  16. import sentry_kafka_schemas
  17. import urllib3
  18. from sentry import quotas
  19. from sentry.eventstore.models import GroupEvent
  20. from sentry.eventstream.base import EventStream, EventStreamEventType, GroupStates
  21. from sentry.utils import json, snuba
  22. from sentry.utils.safe import get_path
  23. from sentry.utils.sdk import set_current_event_project
  24. KW_SKIP_SEMANTIC_PARTITIONING = "skip_semantic_partitioning"
  25. logger = logging.getLogger(__name__)
  26. if TYPE_CHECKING:
  27. from sentry.eventstore.models import Event
  28. # Version 1 format: (1, TYPE, [...REST...])
  29. # Insert: (1, 'insert', {
  30. # ...event json...
  31. # }, {
  32. # ...state for post-processing...
  33. # })
  34. #
  35. # Mutations that *should be ignored*: (1, ('delete_groups'|'unmerge'|'merge'), {...})
  36. #
  37. # In short, for protocol version 1 only messages starting with (1, 'insert', ...)
  38. # should be processed.
  39. # Version 2 format: (2, TYPE, [...REST...])
  40. # Insert: (2, 'insert', {
  41. # ...event json...
  42. # }, {
  43. # ...state for post-processing...
  44. # })
  45. # Delete Groups: (2, '(start_delete_groups|end_delete_groups)', {
  46. # 'transaction_id': uuid,
  47. # 'project_id': id,
  48. # 'group_ids': [id2, id2, id3],
  49. # 'datetime': timestamp,
  50. # })
  51. # Merge: (2, '(start_merge|end_merge)', {
  52. # 'transaction_id': uuid,
  53. # 'project_id': id,
  54. # 'previous_group_ids': [id2, id2],
  55. # 'new_group_id': id,
  56. # 'datetime': timestamp,
  57. # })
  58. # Unmerge: (2, '(start_unmerge|end_unmerge)', {
  59. # 'transaction_id': uuid,
  60. # 'project_id': id,
  61. # 'previous_group_id': id,
  62. # 'new_group_id': id,
  63. # 'hashes': [hash2, hash2]
  64. # 'datetime': timestamp,
  65. # })
  66. # Delete Tag: (2, '(start_delete_tag|end_delete_tag)', {
  67. # 'transaction_id': uuid,
  68. # 'project_id': id,
  69. # 'tag': 'foo',
  70. # 'datetime': timestamp,
  71. # })
  72. class SnubaProtocolEventStream(EventStream):
  73. # Beware! Changing this protocol (introducing a new version, or the message
  74. # format/fields themselves) requires consideration of all downstream
  75. # consumers. This includes the post-processing forwarder code!
  76. EVENT_PROTOCOL_VERSION = 2
  77. # These keys correspond to tags that are typically prefixed with `sentry:`
  78. # and will wreak havok in the UI if both the `sentry:`-prefixed and
  79. # non-prefixed variations occur in a response.
  80. UNEXPECTED_TAG_KEYS = frozenset(["dist", "release", "user"])
  81. def _get_headers_for_insert(
  82. self,
  83. event: Event | GroupEvent,
  84. is_new: bool,
  85. is_regression: bool,
  86. is_new_group_environment: bool,
  87. primary_hash: Optional[str],
  88. received_timestamp: float,
  89. skip_consume: bool,
  90. group_states: Optional[GroupStates] = None,
  91. ) -> MutableMapping[str, str]:
  92. return {
  93. "Received-Timestamp": str(received_timestamp),
  94. "queue": self._get_queue_for_post_process(event),
  95. }
  96. def insert(
  97. self,
  98. event: Event | GroupEvent,
  99. is_new: bool,
  100. is_regression: bool,
  101. is_new_group_environment: bool,
  102. primary_hash: Optional[str],
  103. received_timestamp: float,
  104. skip_consume: bool = False,
  105. group_states: Optional[GroupStates] = None,
  106. **kwargs: Any,
  107. ) -> None:
  108. if isinstance(event, GroupEvent) and not event.occurrence:
  109. logger.error(
  110. "`GroupEvent` passed to `EventStream.insert`. `GroupEvent` may only be passed when "
  111. "associated with an `IssueOccurrence`",
  112. exc_info=True,
  113. )
  114. return
  115. project = event.project
  116. set_current_event_project(project.id)
  117. retention_days = quotas.backend.get_event_retention(organization=project.organization)
  118. event_data = event.get_raw_data(for_stream=True)
  119. unexpected_tags = {
  120. k
  121. for (k, v) in (get_path(event_data, "tags", filter=True) or [])
  122. if k in self.UNEXPECTED_TAG_KEYS
  123. }
  124. if unexpected_tags:
  125. logger.error("%r received unexpected tags: %r", self, unexpected_tags)
  126. headers = self._get_headers_for_insert(
  127. event,
  128. is_new,
  129. is_regression,
  130. is_new_group_environment,
  131. primary_hash,
  132. received_timestamp,
  133. skip_consume,
  134. group_states,
  135. )
  136. skip_semantic_partitioning = (
  137. kwargs[KW_SKIP_SEMANTIC_PARTITIONING]
  138. if KW_SKIP_SEMANTIC_PARTITIONING in kwargs
  139. else False
  140. )
  141. event_type = self._get_event_type(event)
  142. occurrence_data = self._get_occurrence_data(event)
  143. # instead of normalizing and doing custom 'contexts' processing in snuba, we elect to do it here instead to
  144. # avoid having to clutter up snuba code with business logic
  145. if event_type == EventStreamEventType.Generic:
  146. event_data = dict(event_data)
  147. contexts = event_data.setdefault("contexts", {})
  148. # add user.geo to contexts if it exists
  149. user_dict = event_data.get("user") or event_data.get("sentry.interfaces.User") or {}
  150. geo = user_dict.get("geo", {})
  151. if "geo" not in contexts and isinstance(geo, dict):
  152. contexts["geo"] = geo
  153. # transactions processing has a configurable 'skipped contexts' to skip writing specific contexts maps
  154. # to the row. for now, we're ignoring that until we have a need for it
  155. self._send(
  156. project.id,
  157. "insert",
  158. extra_data=(
  159. {
  160. "group_id": event.group_id,
  161. "group_ids": [group.id for group in getattr(event, "groups", [])],
  162. "event_id": event.event_id,
  163. "organization_id": project.organization_id,
  164. "project_id": event.project_id,
  165. # TODO(mitsuhiko): We do not want to send this incorrect
  166. # message but this is what snuba needs at the moment.
  167. "message": event.search_message,
  168. "platform": event.platform,
  169. "datetime": json.datetime_to_str(event.datetime),
  170. "data": event_data,
  171. "primary_hash": primary_hash,
  172. "retention_days": retention_days,
  173. "occurrence_id": occurrence_data.get("id"),
  174. "occurrence_data": occurrence_data,
  175. },
  176. {
  177. "is_new": is_new,
  178. "is_regression": is_regression,
  179. "is_new_group_environment": is_new_group_environment,
  180. "queue": headers["queue"],
  181. "skip_consume": skip_consume,
  182. "group_states": group_states,
  183. },
  184. ),
  185. headers=headers,
  186. skip_semantic_partitioning=skip_semantic_partitioning,
  187. event_type=event_type,
  188. )
  189. def start_delete_groups(
  190. self, project_id: int, group_ids: Sequence[int]
  191. ) -> Optional[Mapping[str, Any]]:
  192. if not group_ids:
  193. return None
  194. state = {
  195. "transaction_id": uuid4().hex,
  196. "project_id": project_id,
  197. "group_ids": list(group_ids),
  198. "datetime": datetime.now(tz=pytz.utc),
  199. }
  200. self._send(project_id, "start_delete_groups", extra_data=(state,), asynchronous=False)
  201. return state
  202. def end_delete_groups(self, state: Mapping[str, Any]) -> None:
  203. state_copy: MutableMapping[str, Any] = {**state}
  204. state_copy["datetime"] = json.datetime_to_str(datetime.now(tz=pytz.utc))
  205. self._send(
  206. state_copy["project_id"],
  207. "end_delete_groups",
  208. extra_data=(state_copy,),
  209. asynchronous=False,
  210. )
  211. def start_merge(
  212. self, project_id: int, previous_group_ids: Sequence[int], new_group_id: int
  213. ) -> Optional[Mapping[str, Any]]:
  214. if not previous_group_ids:
  215. return None
  216. state = {
  217. "transaction_id": uuid4().hex,
  218. "project_id": project_id,
  219. "previous_group_ids": list(previous_group_ids),
  220. "new_group_id": new_group_id,
  221. "datetime": json.datetime_to_str(datetime.now(tz=pytz.utc)),
  222. }
  223. self._send(project_id, "start_merge", extra_data=(state,), asynchronous=False)
  224. return state
  225. def end_merge(self, state: Mapping[str, Any]) -> None:
  226. state_copy: MutableMapping[str, Any] = {**state}
  227. state_copy["datetime"] = datetime.now(tz=pytz.utc)
  228. self._send(
  229. state_copy["project_id"], "end_merge", extra_data=(state_copy,), asynchronous=False
  230. )
  231. def start_unmerge(
  232. self, project_id: int, hashes: Collection[str], previous_group_id: int, new_group_id: int
  233. ) -> Optional[Mapping[str, Any]]:
  234. if not hashes:
  235. return None
  236. state = {
  237. "transaction_id": uuid4().hex,
  238. "project_id": project_id,
  239. "previous_group_id": previous_group_id,
  240. "new_group_id": new_group_id,
  241. "hashes": list(hashes),
  242. "datetime": json.datetime_to_str(datetime.now(tz=pytz.utc)),
  243. }
  244. self._send(project_id, "start_unmerge", extra_data=(state,), asynchronous=False)
  245. return state
  246. def end_unmerge(self, state: Mapping[str, Any]) -> None:
  247. state_copy: MutableMapping[str, Any] = {**state}
  248. state_copy["datetime"] = json.datetime_to_str(datetime.now(tz=pytz.utc))
  249. self._send(
  250. state_copy["project_id"], "end_unmerge", extra_data=(state_copy,), asynchronous=False
  251. )
  252. def start_delete_tag(self, project_id: int, tag: str) -> Optional[Mapping[str, Any]]:
  253. if not tag:
  254. return None
  255. state = {
  256. "transaction_id": uuid4().hex,
  257. "project_id": project_id,
  258. "tag": tag,
  259. "datetime": json.datetime_to_str(datetime.now(tz=pytz.utc)),
  260. }
  261. self._send(project_id, "start_delete_tag", extra_data=(state,), asynchronous=False)
  262. return state
  263. def end_delete_tag(self, state: Mapping[str, Any]) -> None:
  264. state_copy: MutableMapping[str, Any] = {**state}
  265. state_copy["datetime"] = json.datetime_to_str(datetime.now(tz=pytz.utc))
  266. self._send(
  267. state_copy["project_id"], "end_delete_tag", extra_data=(state_copy,), asynchronous=False
  268. )
  269. def tombstone_events_unsafe(
  270. self,
  271. project_id: int,
  272. event_ids: Sequence[str],
  273. old_primary_hash: Optional[str] = None,
  274. from_timestamp: Optional[datetime] = None,
  275. to_timestamp: Optional[datetime] = None,
  276. ) -> None:
  277. """
  278. Tell Snuba to eventually delete these events.
  279. This marks events as deleted but does not immediately exclude those
  280. events from all queries. Because of that limitation this is not proper,
  281. because not immediate, event deletion.
  282. "Proper" group deletion is essentially running this function for every
  283. event in the group, plus `exclude_groups` to make sure the changes are
  284. immediately user-visible.
  285. Reprocessing (v2) splits a group into events-to-be-reprocessed
  286. (re-insert with new group_id) and events-to-be-deleted
  287. (`tombstone_events`), then excludes the group from all queries
  288. (`exclude_groups`).
  289. :param old_primary_hash: If present, the event is only tombstoned
  290. to be reinserted over with a guaranteed-different primary hash.
  291. This is necessary with Snuba's errors table as the primary_hash is
  292. part of the PK/sortkey.
  293. """
  294. state = {
  295. "project_id": project_id,
  296. "event_ids": event_ids,
  297. "old_primary_hash": old_primary_hash,
  298. "from_timestamp": from_timestamp,
  299. "to_timestamp": to_timestamp,
  300. }
  301. self._send(project_id, "tombstone_events", extra_data=(state,), asynchronous=False)
  302. def replace_group_unsafe(
  303. self,
  304. project_id: int,
  305. event_ids: Sequence[str],
  306. new_group_id: int,
  307. from_timestamp: Optional[datetime] = None,
  308. to_timestamp: Optional[datetime] = None,
  309. ) -> None:
  310. """
  311. Tell Snuba to move events into a new group ID
  312. Same caveats as tombstone_events
  313. """
  314. state = {
  315. "project_id": project_id,
  316. "event_ids": event_ids,
  317. "new_group_id": new_group_id,
  318. "from_timestamp": from_timestamp,
  319. "to_timestamp": to_timestamp,
  320. }
  321. self._send(project_id, "replace_group", extra_data=(state,), asynchronous=False)
  322. def exclude_groups(self, project_id: int, group_ids: Sequence[int]) -> None:
  323. """
  324. Exclude a group from queries for a while until event tombstoning takes
  325. effect. See docstring of `tombstone_events`.
  326. `exclude_groups` basically makes Snuba add `where group_id not in (1,
  327. 2, ...)` to every query.
  328. """
  329. state = {"project_id": project_id, "group_ids": group_ids}
  330. self._send(project_id, "exclude_groups", extra_data=(state,), asynchronous=False)
  331. def _send(
  332. self,
  333. project_id: int,
  334. _type: str,
  335. extra_data: Tuple[Any, ...] = (),
  336. asynchronous: bool = True,
  337. headers: Optional[MutableMapping[str, str]] = None,
  338. skip_semantic_partitioning: bool = False,
  339. event_type: EventStreamEventType = EventStreamEventType.Error,
  340. ) -> None:
  341. raise NotImplementedError
  342. class SnubaEventStream(SnubaProtocolEventStream):
  343. def _send(
  344. self,
  345. project_id: int,
  346. _type: str,
  347. extra_data: Tuple[Any, ...] = (),
  348. asynchronous: bool = True,
  349. headers: Optional[MutableMapping[str, str]] = None,
  350. skip_semantic_partitioning: bool = False,
  351. event_type: EventStreamEventType = EventStreamEventType.Error,
  352. ) -> None:
  353. if headers is None:
  354. headers = {}
  355. data = (self.EVENT_PROTOCOL_VERSION, _type) + extra_data
  356. entity = "events"
  357. if event_type == EventStreamEventType.Transaction:
  358. entity = "transactions"
  359. if event_type == EventStreamEventType.Generic:
  360. entity = "search_issues"
  361. serialized_data = json.dumps(data)
  362. try:
  363. codec = sentry_kafka_schemas.get_codec(
  364. topic={
  365. "events": "events",
  366. "transactions": "transactions",
  367. "search_issues": "generic-events",
  368. }[entity]
  369. )
  370. except (sentry_kafka_schemas.SchemaNotFound, KeyError):
  371. # Needed since "generic-events" does not have a schema yet
  372. codec = sentry_kafka_schemas.codecs.json.JsonCodec(None)
  373. codec.decode(serialized_data.encode("utf-8"), validate=True)
  374. try:
  375. resp = snuba._snuba_pool.urlopen(
  376. "POST",
  377. f"/tests/{entity}/eventstream",
  378. body=serialized_data,
  379. headers={f"X-Sentry-{k}": v for k, v in headers.items()},
  380. )
  381. if resp.status != 200:
  382. raise snuba.SnubaError(
  383. f"HTTP {resp.status} response from Snuba! {json.loads(resp.data)}"
  384. )
  385. return None
  386. except urllib3.exceptions.HTTPError as err:
  387. raise snuba.SnubaError(err)
  388. def requires_post_process_forwarder(self) -> bool:
  389. return False
  390. def insert(
  391. self,
  392. event: Event | GroupEvent,
  393. is_new: bool,
  394. is_regression: bool,
  395. is_new_group_environment: bool,
  396. primary_hash: Optional[str],
  397. received_timestamp: float,
  398. skip_consume: bool = False,
  399. group_states: Optional[GroupStates] = None,
  400. **kwargs: Any,
  401. ) -> None:
  402. super().insert(
  403. event,
  404. is_new,
  405. is_regression,
  406. is_new_group_environment,
  407. primary_hash,
  408. received_timestamp,
  409. skip_consume,
  410. group_states,
  411. **kwargs,
  412. )
  413. self._dispatch_post_process_group_task(
  414. event.event_id,
  415. event.project_id,
  416. event.group_id,
  417. is_new,
  418. is_regression,
  419. is_new_group_environment,
  420. primary_hash,
  421. self._get_queue_for_post_process(event),
  422. skip_consume,
  423. group_states,
  424. occurrence_id=event.occurrence_id if isinstance(event, GroupEvent) else None,
  425. )