|
@@ -1,4 +1,5 @@
|
|
|
import logging
|
|
|
+from collections import defaultdict
|
|
|
from datetime import timedelta
|
|
|
from typing import Any
|
|
|
|
|
@@ -223,7 +224,7 @@ def run_timeseries_query(
|
|
|
result: SnubaData = []
|
|
|
confidences: SnubaData = []
|
|
|
for timeseries in rpc_response.result_timeseries:
|
|
|
- processed, confidence = _process_timeseries(timeseries, params, granularity_secs)
|
|
|
+ processed, confidence = _process_all_timeseries([timeseries], params, granularity_secs)
|
|
|
if len(result) == 0:
|
|
|
result = processed
|
|
|
confidences = confidence
|
|
@@ -259,7 +260,7 @@ def run_timeseries_query(
|
|
|
|
|
|
if comp_rpc_response.result_timeseries:
|
|
|
timeseries = comp_rpc_response.result_timeseries[0]
|
|
|
- processed, _ = _process_timeseries(timeseries, params, granularity_secs)
|
|
|
+ processed, _ = _process_all_timeseries([timeseries], params, granularity_secs)
|
|
|
label = get_function_alias(timeseries.label)
|
|
|
for existing, new in zip(result, processed):
|
|
|
existing["comparisonCount"] = new[label]
|
|
@@ -380,7 +381,7 @@ def run_top_events_timeseries_query(
|
|
|
other_response = snuba_rpc.timeseries_rpc(other_request)
|
|
|
|
|
|
"""Process the results"""
|
|
|
- map_result_key_to_timeseries = {}
|
|
|
+ map_result_key_to_timeseries = defaultdict(list)
|
|
|
for timeseries in rpc_response.result_timeseries:
|
|
|
groupby_attributes = timeseries.group_by_attributes
|
|
|
remapped_groupby = {}
|
|
@@ -395,12 +396,12 @@ def run_top_events_timeseries_query(
|
|
|
resolved_groupby, _ = search_resolver.resolve_attribute(col)
|
|
|
remapped_groupby[col] = groupby_attributes[resolved_groupby.internal_name]
|
|
|
result_key = create_result_key(remapped_groupby, groupby_columns, {})
|
|
|
- map_result_key_to_timeseries[result_key] = timeseries
|
|
|
+ map_result_key_to_timeseries[result_key].append(timeseries)
|
|
|
final_result = {}
|
|
|
# Top Events actually has the order, so we need to iterate through it, regenerate the result keys
|
|
|
for index, row in enumerate(top_events["data"]):
|
|
|
result_key = create_result_key(row, groupby_columns, {})
|
|
|
- result_data, result_confidence = _process_timeseries(
|
|
|
+ result_data, result_confidence = _process_all_timeseries(
|
|
|
map_result_key_to_timeseries[result_key],
|
|
|
params,
|
|
|
granularity_secs,
|
|
@@ -416,8 +417,8 @@ def run_top_events_timeseries_query(
|
|
|
granularity_secs,
|
|
|
)
|
|
|
if other_response.result_timeseries:
|
|
|
- result_data, result_confidence = _process_timeseries(
|
|
|
- other_response.result_timeseries[0],
|
|
|
+ result_data, result_confidence = _process_all_timeseries(
|
|
|
+ [timeseries for timeseries in other_response.result_timeseries],
|
|
|
params,
|
|
|
granularity_secs,
|
|
|
)
|
|
@@ -434,19 +435,29 @@ def run_top_events_timeseries_query(
|
|
|
return final_result
|
|
|
|
|
|
|
|
|
-def _process_timeseries(
|
|
|
- timeseries: TimeSeries, params: SnubaParams, granularity_secs: int, order: int | None = None
|
|
|
+def _process_all_timeseries(
|
|
|
+ all_timeseries: list[TimeSeries],
|
|
|
+ params: SnubaParams,
|
|
|
+ granularity_secs: int,
|
|
|
+ order: int | None = None,
|
|
|
) -> tuple[SnubaData, SnubaData]:
|
|
|
result: SnubaData = []
|
|
|
confidence: SnubaData = []
|
|
|
- # Timeseries serialization expects the function alias (eg. `count` not `count()`)
|
|
|
- label = get_function_alias(timeseries.label)
|
|
|
- if len(result) < len(timeseries.buckets):
|
|
|
- for bucket in timeseries.buckets:
|
|
|
- result.append({"time": bucket.seconds})
|
|
|
- confidence.append({"time": bucket.seconds})
|
|
|
- for index, data_point in enumerate(timeseries.data_points):
|
|
|
- result[index][label] = process_value(data_point.data)
|
|
|
- confidence[index][label] = CONFIDENCES.get(data_point.reliability, None)
|
|
|
+
|
|
|
+ for timeseries in all_timeseries:
|
|
|
+ # Timeseries serialization expects the function alias (eg. `count` not `count()`)
|
|
|
+ label = get_function_alias(timeseries.label)
|
|
|
+ if result:
|
|
|
+ for index, bucket in enumerate(timeseries.buckets):
|
|
|
+ assert result[index]["time"] == bucket.seconds
|
|
|
+ assert confidence[index]["time"] == bucket.seconds
|
|
|
+ else:
|
|
|
+ for bucket in timeseries.buckets:
|
|
|
+ result.append({"time": bucket.seconds})
|
|
|
+ confidence.append({"time": bucket.seconds})
|
|
|
+
|
|
|
+ for index, data_point in enumerate(timeseries.data_points):
|
|
|
+ result[index][label] = process_value(data_point.data)
|
|
|
+ confidence[index][label] = CONFIDENCES.get(data_point.reliability, None)
|
|
|
|
|
|
return result, confidence
|