import logging
import typing
import uuid
from datetime import datetime
from typing import Annotated, Any, Literal, Union
from urllib.parse import parse_qs, urlparse

from django.utils.timezone import now
from ninja import Field
from pydantic import (
    AliasChoices,
    BeforeValidator,
    JsonValue,
    RootModel,
    ValidationError,
    WrapValidator,
    field_validator,
    model_validator,
)

from apps.issue_events.constants import IssueEventType

from ..shared.schema.base import LaxIngestSchema
from ..shared.schema.contexts import ContextsSchema
from ..shared.schema.event import (
    BaseIssueEvent,
    BaseRequest,
    EventBreadcrumb,
    ListKeyValue,
)
from ..shared.schema.user import EventUser
from ..shared.schema.utils import invalid_to_none

logger = logging.getLogger(__name__)


CoercedStr = Annotated[
    str, BeforeValidator(lambda v: str(v) if isinstance(v, bool) else v)
]
"""
Coerced Str that will coerce bool to str when found
"""


def coerce_list(v: Any) -> Any:
    """Wrap non-list dict into list: {"a": 1} to [{"a": 1}]"""
    return v if not isinstance(v, dict) else [v]


class Signal(LaxIngestSchema):
    number: int
    code: int | None
    name: str | None
    code_name: str | None


class MachException(LaxIngestSchema):
    number: int
    code: int
    subcode: int
    name: str | None


class NSError(LaxIngestSchema):
    code: int
    domain: str


class Errno(LaxIngestSchema):
    number: int
    name: str | None


class MechanismMeta(LaxIngestSchema):
    signal: Signal | None = None
    match_exception: MachException | None = None
    ns_error: NSError | None = None
    errno: Errno | None = None


class ExceptionMechanism(LaxIngestSchema):
    type: str
    description: str | None = None
    help_link: str | None = None
    handled: bool | None = None
    synthetic: bool | None = None
    meta: dict | None = None
    data: dict | None = None


class StackTraceFrame(LaxIngestSchema):
    filename: str | None = None
    function: str | None = None
    raw_function: str | None = None
    module: str | None = None
    lineno: int | None = None
    colno: int | None = None
    abs_path: str | None = None
    context_line: str | None = None
    pre_context: list[str | None] | None = None
    post_context: list[str | None] | None = None
    source_link: str | None = None
    in_app: bool | None = None
    stack_start: bool | None = None
    vars: dict[str, Union[str, dict, list]] | None = None
    instruction_addr: str | None = None
    addr_mode: str | None = None
    symbol_addr: str | None = None
    image_addr: str | None = None
    package: str | None = None
    platform: str | None = None

    def is_url(self, filename: str) -> bool:
        return filename.startswith(("file:", "http:", "https:", "applewebdata:"))

    @model_validator(mode="after")
    def normalize_files(self):
        if not self.abs_path and self.filename:
            self.abs_path = self.filename
        if self.filename and self.is_url(self.filename):
            self.filename = urlparse(self.filename).path
        return self

    @field_validator("pre_context", "post_context")
    @classmethod
    def replace_null(cls, context: list[str | None]) -> list[str | None] | None:
        if context:
            return [line if line else "" for line in context]
        return None


class StackTrace(LaxIngestSchema):
    frames: list[StackTraceFrame]
    registers: dict[str, str] | None = None


class EventException(LaxIngestSchema):
    type: str | None = None
    value: Annotated[str | None, WrapValidator(invalid_to_none)] = None
    module: str | None = None
    thread_id: str | None = None
    mechanism: ExceptionMechanism | None = None
    stacktrace: Annotated[StackTrace | None, WrapValidator(invalid_to_none)] = None

    @model_validator(mode="after")
    def check_type_value(self):
        if self.type is None and self.value is None:
            return None
        return self


class ValueEventException(LaxIngestSchema):
    values: list[EventException]

    @field_validator("values")
    @classmethod
    def strip_null(cls, v: list[EventException]) -> list[EventException]:
        return [e for e in v if e is not None]


class EventMessage(LaxIngestSchema):
    formatted: str = Field(max_length=8192, default="")
    message: str | None = None
    params: Union[list[str], dict[str, str]] | None = None

    @model_validator(mode="after")
    def set_formatted(self) -> "EventMessage":
        """
        When the EventMessage formatted string is not set,
        attempt to set it based on message and params interpolation
        """
        if not self.formatted and self.message:
            params = self.params
            if isinstance(params, list) and params:
                self.formatted = self.message % tuple(params)
            elif isinstance(params, dict):
                self.formatted = self.message.format(**params)
        return self


class EventTemplate(LaxIngestSchema):
    lineno: int
    abs_path: str | None = None
    filename: str
    context_line: str
    pre_context: list[str] | None = None
    post_context: list[str] | None = None


class ValueEventBreadcrumb(LaxIngestSchema):
    values: list[EventBreadcrumb]


class ClientSDKPackage(LaxIngestSchema):
    name: str | None = None
    version: str | None = None


class ClientSDKInfo(LaxIngestSchema):
    integrations: list[str | None] | None = None
    name: str | None
    packages: list[ClientSDKPackage] | None = None
    version: str | None

    @field_validator("packages", mode="before")
    def name_must_contain_space(cls, v: Any) -> Any:
        return coerce_list(v)


class RequestHeaders(LaxIngestSchema):
    content_type: str | None


class RequestEnv(LaxIngestSchema):
    remote_addr: str | None


QueryString = Union[str, ListKeyValue, dict[str, str | None]]
"""Raw URL querystring, list, or dict"""
KeyValueFormat = Union[list[list[str | None]], dict[str, CoercedStr | None]]
"""
key-values in list or dict format. Example {browser: firefox} or [[browser, firefox]]
"""


class IngestRequest(BaseRequest):
    headers: KeyValueFormat | None = None
    query_string: QueryString | None = None

    @field_validator("headers", mode="before")
    @classmethod
    def fix_non_standard_headers(cls, v):
        """
        Fix non-documented format used by PHP Sentry Client
        Convert {"Foo": ["bar"]} into {"Foo: "bar"}
        """
        if isinstance(v, dict):
            return {
                key: value[0] if isinstance(value, list) else value
                for key, value in v.items()
            }
        return v

    @field_validator("query_string", "headers")
    @classmethod
    def prefer_list_key_value(
        cls, v: Union[QueryString, KeyValueFormat] | None
    ) -> ListKeyValue | None:
        """Store all querystring, header formats in a list format"""
        result: ListKeyValue | None = None
        if isinstance(v, str) and v:  # It must be a raw querystring, parse it
            qs = parse_qs(v)
            result = [[key, value] for key, values in qs.items() for value in values]
        elif isinstance(v, dict):  # Convert dict to list
            result = [[key, value] for key, value in v.items()]
        elif isinstance(v, list):  # Normalize list (throw out any weird data)
            result = [item[:2] for item in v if len(item) >= 2]

        if result:
            # Remove empty and any key called "Cookie" which could be sensitive data
            entry_to_remove = ["Cookie", ""]
            return sorted(
                [entry for entry in result if entry != entry_to_remove],
                key=lambda x: (x[0], x[1]),
            )
        return result


class IngestIssueEvent(BaseIssueEvent):
    timestamp: datetime = Field(default_factory=now)
    level: str | None = "error"
    logentry: EventMessage | None = None
    logger: str | None = None
    transaction: str | None = Field(
        validation_alias=AliasChoices("transaction", "culprit"), default=None
    )
    server_name: str | None = None
    release: str | None = None
    dist: str | None = None
    tags: KeyValueFormat | None = None
    environment: str | None = None
    modules: dict[str, str | None] | None = None
    extra: dict[str, Any] | None = None
    fingerprint: list[str] | None = None
    errors: list[Any] | None = None

    exception: Union[list[EventException], ValueEventException] | None = None
    message: Union[str, EventMessage] | None = None
    template: EventTemplate | None = None

    breadcrumbs: Union[list[EventBreadcrumb], ValueEventBreadcrumb] | None = None
    sdk: ClientSDKInfo | None = None
    request: IngestRequest | None = None
    contexts: ContextsSchema | None = None
    user: EventUser | None = None

    @field_validator("tags")
    @classmethod
    def prefer_dict(cls, v: KeyValueFormat | None) -> dict[str, str | None] | None:
        if isinstance(v, list):
            return {key: value for key, value in v if key is not None}
        return v


class EventIngestSchema(IngestIssueEvent):
    event_id: uuid.UUID


class TransactionEventSchema(LaxIngestSchema):
    type: Literal["transaction"]
    contexts: JsonValue
    measurements: JsonValue | None = None
    start_timestamp: datetime
    timestamp: datetime
    transaction: str

    # # SentrySDKEventSerializer
    breadcrumbs: JsonValue | None = None
    fingerprint: list[str] | None = None
    tags: KeyValueFormat | None = None
    event_id: uuid.UUID = Field(default_factory=uuid.uuid4)
    extra: JsonValue | None
    request: IngestRequest | None = None
    server_name: str | None
    sdk: ClientSDKInfo | None = None
    platform: str | None
    release: str | None = None
    environment: str | None = None
    _meta: JsonValue | None


class EnvelopeHeaderSchema(LaxIngestSchema):
    event_id: uuid.UUID | None = None
    dsn: str | None = None
    sdk: ClientSDKInfo | None = None
    sent_at: datetime = Field(default_factory=now)


SupportedItemType = Literal["transaction", "event"]
IgnoredItemType = Literal[
    "session", "sessions", "client_report", "attachment", "user_report", "check_in"
]
SUPPORTED_ITEMS = typing.get_args(SupportedItemType)


class ItemHeaderSchema(LaxIngestSchema):
    content_type: str | None = None
    type: Union[SupportedItemType, IgnoredItemType]
    length: int | None = None


class EnvelopeSchema(RootModel[list[dict[str, Any]]]):
    root: list[dict[str, Any]]
    _header: EnvelopeHeaderSchema
    _items: list[
        tuple[ItemHeaderSchema, IngestIssueEvent | TransactionEventSchema]
    ] = []

    @model_validator(mode="after")
    def validate_envelope(self) -> "EnvelopeSchema":
        data = self.root
        try:
            header = data.pop(0)
        except IndexError:
            raise ValidationError([{"message": "Envelope is empty"}])
        self._header = EnvelopeHeaderSchema(**header)

        while len(data) >= 2:
            item_header_data = data.pop(0)
            if item_header_data.get("type", None) not in SUPPORTED_ITEMS:
                continue
            item_header = ItemHeaderSchema(**item_header_data)
            if item_header.type == "event":
                try:
                    item = IngestIssueEvent(**data.pop(0))
                except ValidationError as err:
                    logger.warning("Envelope Event item invalid", exc_info=True)
                    raise err
                self._items.append((item_header, item))
            elif item_header.type == "transaction":
                item = TransactionEventSchema(**data.pop(0))
                self._items.append((item_header, item))

        return self


class CSPReportSchema(LaxIngestSchema):
    """
    https://developer.mozilla.org/en-US/docs/Web/HTTP/Headers/Content-Security-Policy-Report-Only#violation_report_syntax
    """

    blocked_uri: str = Field(alias="blocked-uri")
    disposition: Literal["enforce", "report"] = Field(alias="disposition")
    document_uri: str = Field(alias="document-uri")
    effective_directive: str = Field(alias="effective-directive")
    original_policy: str | None = Field(alias="original-policy")
    script_sample: str | None = Field(alias="script-sample", default=None)
    status_code: int | None = Field(alias="status-code")
    line_number: int | None = None
    column_number: int | None = None


class SecuritySchema(LaxIngestSchema):
    csp_report: CSPReportSchema = Field(alias="csp-report")


## Normalized Interchange Issue Events


class IssueEventSchema(IngestIssueEvent):
    """
    Event storage and interchange format
    Used in json view and celery interchange
    Don't use this for api intake
    """

    type: Literal[IssueEventType.DEFAULT] = IssueEventType.DEFAULT


class ErrorIssueEventSchema(IngestIssueEvent):
    type: Literal[IssueEventType.ERROR] = IssueEventType.ERROR


class CSPIssueEventSchema(IngestIssueEvent):
    type: Literal[IssueEventType.CSP] = IssueEventType.CSP
    csp: CSPReportSchema


class InterchangeEvent(LaxIngestSchema):
    """Normalized wrapper around issue event. Event should not contain repeat information."""

    event_id: uuid.UUID = Field(default_factory=uuid.uuid4)
    project_id: int
    organization_id: int
    received: datetime = Field(default_factory=now)
    payload: (
        IssueEventSchema
        | ErrorIssueEventSchema
        | CSPIssueEventSchema
        | TransactionEventSchema
    ) = Field(discriminator="type")


class InterchangeIssueEvent(InterchangeEvent):
    payload: (
        IssueEventSchema
        | ErrorIssueEventSchema
        | CSPIssueEventSchema
        | TransactionEventSchema
    ) = Field(discriminator="type")


class InterchangeTransactionEvent(InterchangeEvent):
    payload: TransactionEventSchema