|
@@ -257,8 +257,8 @@ class IndexerBatch:
|
|
|
@metrics.wraps("process_messages.reconstruct_messages")
|
|
|
def reconstruct_messages(
|
|
|
self,
|
|
|
- mapping: Mapping[OrgId, Mapping[str, Optional[int]]],
|
|
|
- bulk_record_meta: Mapping[OrgId, Mapping[str, Metadata]],
|
|
|
+ mapping: Mapping[UseCaseID, Mapping[OrgId, Mapping[str, Optional[int]]]],
|
|
|
+ bulk_record_meta: Mapping[UseCaseID, Mapping[OrgId, Mapping[str, Metadata]]],
|
|
|
) -> IndexerOutputMessageBatch:
|
|
|
new_messages: IndexerOutputMessageBatch = []
|
|
|
|
|
@@ -282,6 +282,7 @@ class IndexerBatch:
|
|
|
|
|
|
metric_name = old_payload_value["name"]
|
|
|
org_id = old_payload_value["org_id"]
|
|
|
+ use_case_id = old_payload_value["use_case_id"]
|
|
|
sentry_sdk.set_tag("sentry_metrics.organization_id", org_id)
|
|
|
tags = old_payload_value.get("tags", {})
|
|
|
used_tags.add(metric_name)
|
|
@@ -293,9 +294,9 @@ class IndexerBatch:
|
|
|
try:
|
|
|
for k, v in tags.items():
|
|
|
used_tags.update({k, v})
|
|
|
- new_k = mapping[org_id][k]
|
|
|
+ new_k = mapping[use_case_id][org_id][k]
|
|
|
if new_k is None:
|
|
|
- metadata = bulk_record_meta[org_id].get(k)
|
|
|
+ metadata = bulk_record_meta[use_case_id][org_id].get(k)
|
|
|
if (
|
|
|
metadata
|
|
|
and metadata.fetch_type_ext
|
|
@@ -308,9 +309,9 @@ class IndexerBatch:
|
|
|
|
|
|
value_to_write: Union[int, str] = v
|
|
|
if self.__should_index_tag_values:
|
|
|
- new_v = mapping[org_id][v]
|
|
|
+ new_v = mapping[use_case_id][org_id][v]
|
|
|
if new_v is None:
|
|
|
- metadata = bulk_record_meta[org_id].get(v)
|
|
|
+ metadata = bulk_record_meta[use_case_id][org_id].get(v)
|
|
|
if (
|
|
|
metadata
|
|
|
and metadata.fetch_type_ext
|
|
@@ -344,15 +345,15 @@ class IndexerBatch:
|
|
|
"string_type": "tags",
|
|
|
"num_global_quotas": exceeded_global_quotas,
|
|
|
"num_org_quotas": exceeded_org_quotas,
|
|
|
- "org_batch_size": len(mapping[org_id]),
|
|
|
+ "org_batch_size": len(mapping[use_case_id][org_id]),
|
|
|
},
|
|
|
)
|
|
|
continue
|
|
|
|
|
|
fetch_types_encountered = set()
|
|
|
for tag in used_tags:
|
|
|
- if tag in bulk_record_meta[org_id]:
|
|
|
- metadata = bulk_record_meta[org_id][tag]
|
|
|
+ if tag in bulk_record_meta[use_case_id][org_id]:
|
|
|
+ metadata = bulk_record_meta[use_case_id][org_id][tag]
|
|
|
fetch_types_encountered.add(metadata.fetch_type)
|
|
|
output_message_meta[metadata.fetch_type.value][str(metadata.id)] = tag
|
|
|
|
|
@@ -360,9 +361,9 @@ class IndexerBatch:
|
|
|
"".join(sorted(t.value for t in fetch_types_encountered)), "utf-8"
|
|
|
)
|
|
|
|
|
|
- numeric_metric_id = mapping[org_id][metric_name]
|
|
|
+ numeric_metric_id = mapping[use_case_id][org_id][metric_name]
|
|
|
if numeric_metric_id is None:
|
|
|
- metadata = bulk_record_meta[org_id].get(metric_name)
|
|
|
+ metadata = bulk_record_meta[use_case_id][org_id].get(metric_name)
|
|
|
metrics.incr(
|
|
|
"sentry_metrics.indexer.process_messages.dropped_message",
|
|
|
tags={
|
|
@@ -380,7 +381,7 @@ class IndexerBatch:
|
|
|
and metadata.fetch_type_ext
|
|
|
and metadata.fetch_type_ext.is_global
|
|
|
),
|
|
|
- "org_batch_size": len(mapping[org_id]),
|
|
|
+ "org_batch_size": len(mapping[use_case_id][org_id]),
|
|
|
},
|
|
|
)
|
|
|
continue
|