|
@@ -7,7 +7,6 @@ import itertools
|
|
|
import logging
|
|
|
import threading
|
|
|
from collections.abc import Callable, Iterable, Mapping, MutableMapping, Sequence
|
|
|
-from concurrent import futures
|
|
|
from typing import TYPE_CHECKING, Any, TypeVar
|
|
|
|
|
|
from django.utils.functional import LazyObject, empty
|
|
@@ -22,8 +21,6 @@ from .types import AnyCallable
|
|
|
|
|
|
logger = logging.getLogger(__name__)
|
|
|
|
|
|
-STATUS_SUCCESS = "success"
|
|
|
-
|
|
|
|
|
|
class Service:
|
|
|
__all__: Iterable[str] = ()
|
|
@@ -529,146 +526,3 @@ def make_writebehind_selector(
|
|
|
return [move_from, move_to]
|
|
|
|
|
|
return selector
|
|
|
-
|
|
|
-
|
|
|
-def get_invalid_timing_reason(timing: tuple[float | None, float | None]) -> str:
|
|
|
- start, stop = timing
|
|
|
- if start is None and stop is None:
|
|
|
- return "no_data"
|
|
|
- elif start is None:
|
|
|
- return "no_start"
|
|
|
- elif stop is None:
|
|
|
- return "no_stop"
|
|
|
- else:
|
|
|
- raise Exception("unexpected value for timing")
|
|
|
-
|
|
|
-
|
|
|
-def get_future_status(future: TimedFuture) -> str:
|
|
|
- try:
|
|
|
- future.result(timeout=0)
|
|
|
- return STATUS_SUCCESS
|
|
|
- except futures.CancelledError:
|
|
|
- return "cancelled" # neither succeeded nor failed
|
|
|
- except futures.TimeoutError:
|
|
|
- raise # tried to check before ready
|
|
|
- except Exception:
|
|
|
- return "failure"
|
|
|
-
|
|
|
-
|
|
|
-def callback_timing(
|
|
|
- context: Context,
|
|
|
- method_name: str,
|
|
|
- callargs: Mapping[str, Any],
|
|
|
- backend_names: Sequence[str],
|
|
|
- results: Sequence[TimedFuture],
|
|
|
- metric_name: str,
|
|
|
- result_comparator: Callable[[str, str, str, Any, Any], Mapping[str, str]] | None = None,
|
|
|
- sample_rate: float | None = None,
|
|
|
-) -> None:
|
|
|
- """
|
|
|
- Collects timing stats on results returned to the callback method of a `ServiceDelegator`. Either
|
|
|
- partial this and pass it directly as the `callback_func` or
|
|
|
- :param metric_name: Prefix to use when writing these timing metrics to Datadog
|
|
|
- :param method_name: method_name passed to callback
|
|
|
- :param backend_names: backend_names passed to callback
|
|
|
- :param results: results passed to callback
|
|
|
- :param result_comparator: An optional comparator to compare the primary result to each secondary
|
|
|
- result. Should return a dict represents the result of the comparison. This will be merged into
|
|
|
- tags to be stored in the metrics backend.
|
|
|
- :return:
|
|
|
- """
|
|
|
- if not len(backend_names) > 1:
|
|
|
- return
|
|
|
- primary_backend_name = backend_names[0]
|
|
|
- primary_future = results[0]
|
|
|
- primary_status = get_future_status(primary_future)
|
|
|
- primary_timing = primary_future.get_timing()
|
|
|
-
|
|
|
- # If either endpoint of the timing data is not set, just ignore this call.
|
|
|
- # This really shouldn't happen on the primary backend, but playing it safe
|
|
|
- # here out of an abundance of caution.
|
|
|
- if not all(primary_timing):
|
|
|
- logger.warning(
|
|
|
- "Received timing with unexpected endpoint: %r, primary_backend_name: %r, future_status: %r",
|
|
|
- primary_timing,
|
|
|
- primary_backend_name,
|
|
|
- primary_status,
|
|
|
- )
|
|
|
- return
|
|
|
-
|
|
|
- primary_duration_ms = (primary_timing[1] - primary_timing[0]) * 1000
|
|
|
-
|
|
|
- metric_kwargs = {}
|
|
|
- if sample_rate is not None:
|
|
|
- metric_kwargs["sample_rate"] = sample_rate
|
|
|
-
|
|
|
- metrics.timing(
|
|
|
- f"{metric_name}.timing_ms",
|
|
|
- primary_duration_ms,
|
|
|
- tags={
|
|
|
- "method": method_name,
|
|
|
- "backend": primary_backend_name,
|
|
|
- "status": primary_status,
|
|
|
- "primary": "true",
|
|
|
- },
|
|
|
- **metric_kwargs, # type: ignore[arg-type]
|
|
|
- )
|
|
|
-
|
|
|
- for i, secondary_backend_name in enumerate(backend_names[1:], 1):
|
|
|
- secondary_future = results[i]
|
|
|
- secondary_timing = secondary_future.get_timing()
|
|
|
- secondary_status = get_future_status(secondary_future)
|
|
|
-
|
|
|
- tags = {
|
|
|
- "method": method_name,
|
|
|
- "primary_backend": primary_backend_name,
|
|
|
- "primary_status": primary_status,
|
|
|
- "secondary_backend": secondary_backend_name,
|
|
|
- "secondary_status": secondary_status,
|
|
|
- }
|
|
|
-
|
|
|
- if result_comparator:
|
|
|
- comparator_result = result_comparator(
|
|
|
- method_name,
|
|
|
- primary_status,
|
|
|
- secondary_status,
|
|
|
- primary_future.result(),
|
|
|
- secondary_future.result(),
|
|
|
- )
|
|
|
- tags.update(comparator_result)
|
|
|
-
|
|
|
- # If either endpoint of the timing data is not set, this means
|
|
|
- # something weird happened (more than likely a cancellation.)
|
|
|
- if not all(secondary_timing):
|
|
|
- metrics.incr(
|
|
|
- f"{metric_name}.timing_invalid",
|
|
|
- tags={**tags, "reason": get_invalid_timing_reason(secondary_timing)},
|
|
|
- )
|
|
|
- else:
|
|
|
- secondary_duration_ms = (secondary_timing[1] - secondary_timing[0]) * 1000
|
|
|
- metrics.distribution(
|
|
|
- f"{metric_name}.timing_ms",
|
|
|
- secondary_duration_ms,
|
|
|
- tags={
|
|
|
- "method": method_name,
|
|
|
- "backend": secondary_backend_name,
|
|
|
- "status": secondary_status,
|
|
|
- "primary": "false",
|
|
|
- },
|
|
|
- unit="millisecond",
|
|
|
- **metric_kwargs, # type: ignore[arg-type]
|
|
|
- )
|
|
|
- metrics.distribution(
|
|
|
- f"{metric_name}.timing_delta_ms",
|
|
|
- secondary_duration_ms - primary_duration_ms,
|
|
|
- tags=tags,
|
|
|
- unit="millisecond",
|
|
|
- **metric_kwargs, # type: ignore[arg-type]
|
|
|
- )
|
|
|
- metrics.distribution(
|
|
|
- f"{metric_name}.timing_relative_delta",
|
|
|
- secondary_duration_ms / primary_duration_ms,
|
|
|
- tags=tags,
|
|
|
- unit="millisecond",
|
|
|
- **metric_kwargs, # type: ignore[arg-type]
|
|
|
- )
|