123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135 |
- # flake8: noqa
- import datetime
- import itertools
- import json
- import pprint
- import random
- import string
- import sys
- from arroyo.backends.kafka import KafkaPayload, KafkaProducer
- from arroyo.types import Topic
- BOOTSTRAP_HOST = "127.0.0.1:9092"
- TOPIC_NAME = "ingest-performance-metrics"
- conf = {"bootstrap.servers": BOOTSTRAP_HOST}
- make_counter_payload = lambda use_case, rand_str: {
- "name": f"c:{use_case}/{use_case}@none",
- "tags": {
- "environment": "production",
- "session.status": "init",
- f"gen_metric_e2e_{use_case}_counter_k_{rand_str}": f"gen_metric_e2e_{use_case}_counter_v_{rand_str}",
- },
- "timestamp": int(datetime.datetime.now(tz=datetime.timezone.utc).timestamp()),
- "type": "c",
- "value": 1,
- "org_id": 1,
- "retention_days": 90,
- "project_id": 3,
- }
- make_dist_payload = lambda use_case, rand_str: {
- "name": f"d:{use_case}/duration@second",
- "tags": {
- "environment": "production",
- "session.status": "healthy",
- f"gen_metric_e2e_{use_case}_dist_k_{rand_str}": f"gen_metric_e2e_{use_case}_dist_v_{rand_str}",
- },
- "timestamp": int(datetime.datetime.now(tz=datetime.timezone.utc).timestamp()),
- "type": "d",
- "value": [4, 5, 6],
- "org_id": 1,
- "retention_days": 90,
- "project_id": 3,
- }
- make_set_payload = lambda use_case, rand_str: {
- "name": f"s:{use_case}/error@none",
- "tags": {
- "environment": "production",
- "session.status": "errored",
- f"gen_metric_e2e_{use_case}_set_k_{rand_str}": f"gen_metric_e2e_{use_case}_set_v_{rand_str}",
- },
- "timestamp": int(datetime.datetime.now(tz=datetime.timezone.utc).timestamp()),
- "type": "s",
- "value": [3],
- "org_id": 1,
- "retention_days": 90,
- "project_id": 3,
- }
- make_psql = (
- lambda rand_str: f"""
- SELECT string,
- organization_id,
- date_added,
- use_case_id
- FROM sentry_perfstringindexer
- WHERE string ~ 'gen_metric_e2e_.*{rand_str}';
- """
- )
- make_csql = lambda rand_str: "UNION ALL".join(
- [
- f"""
- SELECT use_case_id,
- org_id,
- project_id,
- metric_id,
- timestamp,
- tags.key,
- tags.raw_value
- FROM {table_name}
- WHERE arrayExists(v -> match(v, 'gen_metric_e2e_.*{rand_str}'), tags.raw_value)
- """
- for table_name in [
- "generic_metric_counters_raw_local",
- "generic_metric_distributions_raw_local",
- "generic_metric_sets_raw_local",
- ]
- ]
- )
- def produce_msgs(messages):
- producer = KafkaProducer(conf)
- for i, message in enumerate(messages):
- print(f"Sending message {i + 1} of {len(messages)}:")
- pprint.pprint(message)
- producer.produce(
- Topic(name=TOPIC_NAME),
- KafkaPayload(key=None, value=json.dumps(message).encode("utf-8"), headers=[]),
- )
- print("Done")
- print()
- producer.close()
- if __name__ == "__main__":
- rand_str = "".join(random.choices(string.ascii_uppercase + string.digits, k=8))
- use_cases = ["spans", "transactions", sys.argv[1]]
- messages = list(
- itertools.chain.from_iterable(
- (
- make_counter_payload(use_case, rand_str),
- make_dist_payload(use_case, rand_str),
- make_set_payload(use_case, rand_str),
- )
- for use_case in use_cases
- )
- )
- random.shuffle(messages)
- produce_msgs(messages)
- print(
- f"Use the following SQL to verify postgres, there should be {(strs_per_use_case := 6)} strings for each use cases, {strs_per_use_case * len(use_cases)} in total."
- )
- print(make_psql(rand_str))
- print(
- f"Use the following SQL to verify clickhouse, there should be {(metrics_per_use_case := 3)} metrics for each use cases, {len(use_cases)} in total."
- )
- print(make_csql(rand_str))
|