123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357 |
- import logging
- import typing
- import uuid
- from datetime import datetime
- from typing import Annotated, Any, Literal, Optional, Union
- from urllib.parse import parse_qs
- from django.utils.timezone import now
- from ninja import Field
- from ninja import Schema as BaseSchema
- from pydantic import (
- AliasChoices,
- RootModel,
- ValidationError,
- WrapValidator,
- field_validator,
- model_validator,
- )
- from apps.issue_events.constants import IssueEventType
- from ..common_event_schema import (
- BaseIssueEvent,
- BaseRequest,
- EventBreadcrumb,
- ListKeyValue,
- )
- from ..common_event_utils import invalid_to_none
- logger = logging.getLogger(__name__)
- class Schema(BaseSchema):
- """Schema configuration for all event ingest schemas"""
- class Config(BaseSchema.Config):
- coerce_numbers_to_str = True # Lax is best for ingest
- class TagKeyValue(Schema):
- key: str
- value: str
- class Signal(Schema):
- number: int
- code: Optional[int]
- name: Optional[str]
- code_name: Optional[str]
- class MachException(Schema):
- number: int
- code: int
- subcode: int
- name: Optional[str]
- class NSError(Schema):
- code: int
- domain: str
- class Errno(Schema):
- number: int
- name: Optional[str]
- class MechanismMeta(Schema):
- signal: Optional[Signal] = None
- match_exception: Optional[MachException] = None
- ns_error: Optional[NSError] = None
- errno: Optional[Errno] = None
- class ExceptionMechanism(Schema):
- type: str
- description: Optional[str] = None
- help_link: Optional[str] = None
- handled: Optional[bool] = None
- synthetic: Optional[bool] = None
- meta: Optional[dict] = None
- data: Optional[dict] = None
- class StackTraceFrame(Schema):
- filename: Optional[str] = None
- function: Optional[str] = None
- raw_function: Optional[str] = None
- module: Optional[str] = None
- lineno: Optional[int] = None
- colno: Optional[int] = None
- abs_path: Optional[str] = None
- context_line: Optional[str] = None
- pre_context: Optional[list[str]] = None
- post_context: Optional[list[str]] = None
- source_link: Optional[str] = None
- in_app: Optional[bool] = None
- stack_start: Optional[bool] = None
- vars: Optional[dict[str, Union[str, dict, list]]] = None
- instruction_addr: Optional[str] = None
- addr_mode: Optional[str] = None
- symbol_addr: Optional[str] = None
- image_addr: Optional[str] = None
- package: Optional[str] = None
- platform: Optional[str] = None
- class StackTrace(Schema):
- frames: list[StackTraceFrame]
- registers: Optional[dict[str, str]] = None
- class EventException(Schema):
- type: str
- value: Annotated[Optional[str], WrapValidator(invalid_to_none)]
- module: Optional[str] = None
- thread_id: Optional[str] = None
- mechanism: Optional[ExceptionMechanism] = None
- stacktrace: Optional[StackTrace] = None
- class ValueEventException(Schema):
- values: list[EventException]
- class EventMessage(Schema):
- formatted: str = Field(max_length=8192, default="")
- message: Optional[str] = None
- params: Optional[Union[list[str], dict[str, str]]] = None
- @model_validator(mode="after")
- def set_formatted(self) -> "EventMessage":
- """
- When the EventMessage formatted string is not set,
- attempt to set it based on message and params interpolation
- """
- if not self.formatted and self.message:
- params = self.params
- if isinstance(params, list) and params is not None:
- self.formatted = self.message % tuple(params)
- elif isinstance(params, dict):
- self.formatted = self.message.format(**params)
- return self
- class EventTemplate(Schema):
- lineno: int
- abs_path: Optional[str] = None
- filename: str
- context_line: str
- pre_context: Optional[list[str]] = None
- post_context: Optional[list[str]] = None
- class ValueEventBreadcrumb(Schema):
- values: list[EventBreadcrumb]
- class ClientSDKPackage(Schema):
- name: Optional[str] = None
- version: Optional[str] = None
- class ClientSDKInfo(Schema):
- integrations: Optional[list[Optional[str]]] = None
- name: Optional[str]
- packages: Optional[list[ClientSDKPackage]] = None
- version: Optional[str]
- class RequestHeaders(Schema):
- content_type: Optional[str]
- class RequestEnv(Schema):
- remote_addr: Optional[str]
- QueryString = Union[str, ListKeyValue, dict[str, Optional[str]]]
- """Raw URL querystring, list, or dict"""
- Headers = Union[list[list[Optional[str]]], dict[str, Optional[str]]]
- """Header in list or dict format, expected to normalize to list"""
- class IngestRequest(BaseRequest):
- headers: Optional[Headers] = None
- query_string: Optional[QueryString] = None
- @field_validator("headers", mode="before")
- @classmethod
- def fix_non_standard_headers(cls, v):
- """
- Fix non-documented format used by PHP Sentry Client
- Convert {"Foo": ["bar"]} into {"Foo: "bar"}
- """
- if isinstance(v, dict):
- return {
- key: value[0] if isinstance(value, list) else value
- for key, value in v.items()
- }
- return v
- @field_validator("query_string", "headers")
- @classmethod
- def prefer_list_key_value(
- cls, v: Optional[Union[QueryString, Headers]]
- ) -> Optional[ListKeyValue]:
- """Store all querystring, header formats in a list format"""
- result: Optional[ListKeyValue] = None
- if isinstance(v, str) and v: # It must be a raw querystring, parse it
- qs = parse_qs(v)
- result = [[key, value] for key, values in qs.items() for value in values]
- elif isinstance(v, dict): # Convert dict to list
- result = [[key, value] for key, value in v.items()]
- elif isinstance(v, list): # Normalize list (throw out any weird data)
- result = [item[:2] for item in v if len(item) >= 2]
- if result:
- # Remove empty and any key called "Cookie" which could be sensitive data
- entry_to_remove = ["Cookie", ""]
- return sorted(
- [entry for entry in result if entry != entry_to_remove],
- key=lambda x: (x[0], x[1]),
- )
- return result
- class IngestIssueEvent(BaseIssueEvent):
- timestamp: datetime = Field(default_factory=now)
- level: Optional[str] = "error"
- logentry: Optional[EventMessage] = None
- logger: Optional[str] = None
- transaction: Optional[str] = Field(
- validation_alias=AliasChoices("transaction", "culprit"), default=None
- )
- server_name: Optional[str] = None
- release: Optional[str] = None
- dist: Optional[str] = None
- tags: Optional[Union[dict[str, str], list[TagKeyValue]]] = None
- environment: Optional[str] = None
- modules: Optional[dict[str, Optional[str]]] = None
- extra: Optional[Any] = None
- fingerprint: Optional[list[str]] = None
- errors: Optional[list[Any]] = None
- exception: Optional[Union[list[EventException], ValueEventException]] = None
- message: Optional[Union[str, EventMessage]] = None
- template: Optional[EventTemplate] = None
- breadcrumbs: Optional[Union[list[EventBreadcrumb], ValueEventBreadcrumb]] = None
- sdk: Optional[ClientSDKInfo] = None
- request: Optional[IngestRequest] = None
- class EventIngestSchema(IngestIssueEvent):
- event_id: uuid.UUID
- class EnvelopeHeaderSchema(Schema):
- event_id: uuid.UUID
- dsn: Optional[str] = None
- sdk: Optional[ClientSDKInfo] = None
- sent_at: datetime = Field(default_factory=now)
- SupportedItemType = Literal["transaction", "event"]
- SUPPORTED_ITEMS = typing.get_args(SupportedItemType)
- class ItemHeaderSchema(Schema):
- content_type: Optional[str]
- type: SupportedItemType
- length: Optional[int]
- class EnvelopeSchema(RootModel[list[dict[str, Any]]]):
- root: list[dict[str, Any]]
- _header: EnvelopeHeaderSchema
- _items: list[tuple[ItemHeaderSchema, IngestIssueEvent]] = []
- @model_validator(mode="after")
- def validate_envelope(self) -> "EnvelopeSchema":
- data = self.root
- try:
- header = data.pop(0)
- except IndexError:
- raise ValidationError([{"message": "Envelope is empty"}])
- self._header = EnvelopeHeaderSchema(**header)
- while len(data) >= 2:
- item_header_data = data.pop(0)
- if item_header_data.get("type", None) not in SUPPORTED_ITEMS:
- continue
- item_header = ItemHeaderSchema(**item_header_data)
- if item_header.type == "event":
- try:
- item = IngestIssueEvent(**data.pop(0))
- except ValidationError as err:
- logger.warning("Envelope Event item invalid", exc_info=True)
- raise err
- self._items.append((item_header, item))
- return self
- class CSPReportSchema(Schema):
- """
- https://developer.mozilla.org/en-US/docs/Web/HTTP/Headers/Content-Security-Policy-Report-Only#violation_report_syntax
- """
- blocked_uri: str = Field(alias="blocked-uri")
- disposition: Literal["enforce", "report"] = Field(alias="disposition")
- document_uri: str = Field(alias="document-uri")
- effective_directive: str = Field(alias="effective-directive")
- original_policy: Optional[str] = Field(alias="original-policy")
- script_sample: Optional[str] = Field(alias="script-sample", default=None)
- status_code: Optional[int] = Field(alias="status-code")
- line_number: Optional[int] = None
- column_number: Optional[int] = None
- class SecuritySchema(Schema):
- csp_report: CSPReportSchema = Field(alias="csp-report")
- ## Normalized Interchange Issue Events
- class IssueEventSchema(IngestIssueEvent):
- """
- Event storage and interchange format
- Used in json view and celery interchange
- Don't use this for api intake
- """
- type: Literal[IssueEventType.DEFAULT] = IssueEventType.DEFAULT
- class ErrorIssueEventSchema(IngestIssueEvent):
- type: Literal[IssueEventType.ERROR] = IssueEventType.ERROR
- class CSPIssueEventSchema(IngestIssueEvent):
- type: Literal[IssueEventType.CSP] = IssueEventType.CSP
- csp: CSPReportSchema
- class InterchangeIssueEvent(Schema):
- """Normalized wrapper around issue event. Event should not contain repeat information."""
- event_id: uuid.UUID = Field(default_factory=uuid.uuid4)
- project_id: int
- received: datetime = Field(default_factory=now)
- payload: Union[
- IssueEventSchema, ErrorIssueEventSchema, CSPIssueEventSchema
- ] = Field(discriminator="type")
|