123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199 |
- # pylint: skip-file
- # flake8: noqa
- import datetime
- import itertools
- import json
- import pprint
- import random
- import string
- import click
- from arroyo.backends.kafka import KafkaPayload, KafkaProducer
- from arroyo.types import Topic
- from sentry.sentry_metrics.use_case_id_registry import UseCaseID
- make_counter_payload = lambda use_case, org_id, rand_str: {
- "name": f"c:{use_case}/{use_case}@none",
- "tags": {
- "environment": "production",
- "session.status": "init",
- f"metric_e2e_{use_case}_counter_k_{rand_str}": f"metric_e2e_{use_case}_counter_v_{rand_str}",
- },
- "timestamp": int(datetime.datetime.now(tz=datetime.timezone.utc).timestamp()),
- "type": "c",
- "value": 1,
- "org_id": org_id,
- "retention_days": 90,
- "project_id": 3,
- }
- make_dist_payload = lambda use_case, org_id, rand_str: {
- "name": f"d:{use_case}/duration@second",
- "tags": {
- "environment": "production",
- "session.status": "healthy",
- f"metric_e2e_{use_case}_dist_k_{rand_str}": f"metric_e2e_{use_case}_dist_v_{rand_str}",
- },
- "timestamp": int(datetime.datetime.now(tz=datetime.timezone.utc).timestamp()),
- "type": "d",
- "value": [4, 5, 6],
- "org_id": org_id,
- "retention_days": 90,
- "project_id": 3,
- }
- make_set_payload = lambda use_case, org_id, rand_str: {
- "name": f"s:{use_case}/error@none",
- "tags": {
- "environment": "production",
- "session.status": "errored",
- f"metric_e2e_{use_case}_set_k_{rand_str}": f"metric_e2e_{use_case}_set_v_{rand_str}",
- },
- "timestamp": int(datetime.datetime.now(tz=datetime.timezone.utc).timestamp()),
- "type": "s",
- "value": [3],
- "org_id": org_id,
- "retention_days": 90,
- "project_id": 3,
- }
- make_psql = (
- lambda rand_str, is_generic: f"""
- SELECT string,
- organization_id,
- {"use_case_id," if is_generic else ""}
- date_added,
- last_seen
- FROM {"sentry_perfstringindexer" if is_generic else "sentry_stringindexer"}
- WHERE string ~ 'metric_e2e_.*{rand_str}';
- """
- )
- make_csql = lambda rand_str, is_generic: "UNION ALL".join(
- [
- f"""
- SELECT use_case_id,
- org_id,
- project_id,
- metric_id,
- timestamp,
- tags.key,
- tags.raw_value
- FROM {table_name}
- WHERE arrayExists(v -> match(v, 'metric_e2e_.*{rand_str}'), tags.raw_value)
- """
- for table_name in (
- [
- "generic_metric_counters_raw_local",
- "generic_metric_distributions_raw_local",
- "generic_metric_sets_raw_local",
- ]
- if is_generic
- else [
- "metrics_counters_v2_local",
- "metrics_distributions_v2_local",
- "metrics_sets_v2_local",
- ]
- )
- ]
- )
- def produce_msgs(messages, is_generic, host, dryrun):
- conf = {"bootstrap.servers": host}
- producer = KafkaProducer(conf)
- for i, message in enumerate(messages):
- print(f"{i + 1} / {len(messages)}")
- pprint.pprint(message)
- if not dryrun:
- producer.produce(
- Topic(name=("ingest-performance-metrics" if is_generic else "ingest-metrics")),
- KafkaPayload(key=None, value=json.dumps(message).encode("utf-8"), headers=[]),
- )
- print("Done")
- print()
- producer.close()
- @click.command()
- @click.option(
- "--use-cases",
- multiple=True,
- default=[
- use_case_id.value for use_case_id in UseCaseID if use_case_id is not UseCaseID.SESSIONS
- ],
- show_default=True,
- help="The use case IDs.",
- )
- @click.option("--rand-str", default=None, help="The random string prefix for each key value pairs.")
- @click.option(
- "--host", default="127.0.0.1:9092", show_default=True, help="The host and port for kafka."
- )
- @click.option(
- "--dryrun",
- is_flag=True,
- default=False,
- show_default=True,
- help="Print the messages without sending them.",
- )
- @click.option(
- "--org-id",
- "-o",
- multiple=True,
- default=[1],
- show_default=True,
- help="Specify which org id(s) to send",
- )
- def main(use_cases, rand_str, host, dryrun, org_id):
- if UseCaseID.SESSIONS.value in use_cases and len(use_cases) > 1:
- click.secho(
- "ERROR: UseCaseID.SESSIONS is in use_cases and there are more than 1 use cases",
- blink=True,
- bold=True,
- )
- exit(1)
- is_generic = UseCaseID.SESSIONS.value not in use_cases
- rand_str = rand_str or "".join(random.choices(string.ascii_uppercase + string.digits, k=8))
- messages = list(
- itertools.chain.from_iterable(
- (
- make_counter_payload(use_case, org, rand_str),
- make_dist_payload(use_case, org, rand_str),
- make_set_payload(use_case, org, rand_str),
- )
- for use_case in use_cases
- for org in org_id
- )
- )
- random.shuffle(messages)
- produce_msgs(messages, is_generic, host, dryrun)
- metrics_per_use_case = 3
- strs_per_use_case = 3
- print(
- f"Use the following SQL to verify postgres, "
- f"there should be {strs_per_use_case} strings for each use cases, "
- f"{strs_per_use_case * len(use_cases) * len(org_id)} in total."
- )
- print(make_psql(rand_str, is_generic))
- if is_generic:
- print(
- f"Use the following SQL to verify clickhouse, "
- f"there should be {metrics_per_use_case} metrics for each use cases, "
- f"{metrics_per_use_case * len(use_cases) * len(org_id)} in total."
- )
- print(make_csql(rand_str, is_generic))
- if __name__ == "__main__":
- main()
|