|
@@ -336,24 +336,18 @@ class SnubaEventStream(SnubaProtocolEventStream):
|
|
|
|
|
|
data = (self.EVENT_PROTOCOL_VERSION, _type) + extra_data
|
|
data = (self.EVENT_PROTOCOL_VERSION, _type) + extra_data
|
|
|
|
|
|
- # TODO remove this once the unified dataset is available.
|
|
|
|
- # Inserting into both events and transactions datasets lets us
|
|
|
|
- # simulate what is currently happening via kafka when both the events
|
|
|
|
- # and transactions consumers are running.
|
|
|
|
- datasets = ["events"]
|
|
|
|
|
|
+ dataset = "events"
|
|
if get_path(extra_data, 0, "data", "type") == "transaction":
|
|
if get_path(extra_data, 0, "data", "type") == "transaction":
|
|
- datasets.append("transactions")
|
|
|
|
|
|
+ dataset = "transactions"
|
|
try:
|
|
try:
|
|
- resp = None
|
|
|
|
- for dataset in datasets:
|
|
|
|
- resp = snuba._snuba_pool.urlopen(
|
|
|
|
- "POST",
|
|
|
|
- f"/tests/{dataset}/eventstream",
|
|
|
|
- body=json.dumps(data),
|
|
|
|
- headers={f"X-Sentry-{k}": v for k, v in headers.items()},
|
|
|
|
- )
|
|
|
|
- if resp.status != 200:
|
|
|
|
- raise snuba.SnubaError("HTTP %s response from Snuba!" % resp.status)
|
|
|
|
|
|
+ resp = snuba._snuba_pool.urlopen(
|
|
|
|
+ "POST",
|
|
|
|
+ f"/tests/{dataset}/eventstream",
|
|
|
|
+ body=json.dumps(data),
|
|
|
|
+ headers={f"X-Sentry-{k}": v for k, v in headers.items()},
|
|
|
|
+ )
|
|
|
|
+ if resp.status != 200:
|
|
|
|
+ raise snuba.SnubaError("HTTP %s response from Snuba!" % resp.status)
|
|
return resp
|
|
return resp
|
|
except urllib3.exceptions.HTTPError as err:
|
|
except urllib3.exceptions.HTTPError as err:
|
|
raise snuba.SnubaError(err)
|
|
raise snuba.SnubaError(err)
|