123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314 |
- # pylint: skip-file
- # flake8: noqa
- import base64
- import datetime
- import functools
- import itertools
- import json
- import pprint
- import random
- import string
- import struct
- import click
- from arroyo.backends.kafka import KafkaPayload, KafkaProducer
- from arroyo.types import Topic
- from sentry.sentry_metrics.use_case_id_registry import UseCaseID
- def make_counter_payload(use_case, org_id, rand_str):
- return {
- "name": f"c:{use_case}/{use_case}@none",
- "tags": {
- "environment": "production",
- "session.status": "init",
- f"metric_e2e_{use_case}_counter_k_{rand_str}": f"metric_e2e_{use_case}_counter_v_{rand_str}",
- },
- "timestamp": int(datetime.datetime.now(tz=datetime.UTC).timestamp()),
- "type": "c",
- "value": 1,
- "org_id": org_id,
- "retention_days": 90,
- "project_id": 3,
- }
- def make_dist_payload(use_case, org_id, rand_str, value_len, b64_encode):
- nums = [random.random() for _ in range(value_len)]
- return {
- "name": f"d:{use_case}/duration@second",
- "tags": {
- "environment": "production",
- "session.status": "healthy",
- f"metric_e2e_{use_case}_dist_k_{rand_str}": f"metric_e2e_{use_case}_dist_v_{rand_str}",
- },
- "timestamp": int(datetime.datetime.now(tz=datetime.UTC).timestamp()),
- "type": "d",
- "value": (
- {
- "format": "base64",
- "data": base64.b64encode(struct.pack(f"<{len(nums)}d", *nums)),
- }
- if b64_encode
- else {
- "format": "array",
- "data": nums,
- }
- ),
- "org_id": org_id,
- "retention_days": 90,
- "project_id": 3,
- }
- def make_set_payload(use_case, org_id, rand_str, value_len, b64_encode):
- INT_WIDTH = 4
- nums = [random.randint(0, 2048) for _ in range(value_len)]
- return {
- "name": f"s:{use_case}/error@none",
- "tags": {
- "environment": "production",
- "session.status": "errored",
- f"metric_e2e_{use_case}_set_k_{rand_str}": f"metric_e2e_{use_case}_set_v_{rand_str}",
- },
- "timestamp": int(datetime.datetime.now(tz=datetime.UTC).timestamp()),
- "type": "s",
- "value": (
- {
- "format": "base64",
- "data": base64.b64encode(
- b"".join([num.to_bytes(INT_WIDTH, byteorder="little") for num in nums])
- ),
- }
- if b64_encode
- else {
- "format": "array",
- "data": nums,
- }
- ),
- "org_id": org_id,
- "retention_days": 90,
- "project_id": 3,
- }
- def make_gauge_payload(use_case, org_id, rand_str):
- return {
- "name": f"s:{use_case}/error@none",
- "tags": {
- "environment": "production",
- "session.status": "errored",
- f"metric_e2e_{use_case}_set_k_{rand_str}": f"metric_e2e_{use_case}_set_v_{rand_str}",
- },
- "timestamp": int(datetime.datetime.now(tz=datetime.UTC).timestamp()),
- "type": "g",
- "value": {
- "min": 1,
- "max": 1,
- "sum": 1,
- "count": 1,
- "last": 1,
- },
- "org_id": org_id,
- "retention_days": 90,
- "project_id": 3,
- }
- make_psql = (
- lambda rand_str, is_generic: f"""
- SELECT string,
- organization_id,
- {"use_case_id," if is_generic else ""}
- date_added,
- last_seen
- FROM {"sentry_perfstringindexer" if is_generic else "sentry_stringindexer"}
- WHERE string ~ 'metric_e2e_.*{rand_str}';
- """
- )
- make_csql = lambda rand_str, is_generic: "UNION ALL".join(
- [
- f"""
- SELECT use_case_id,
- org_id,
- project_id,
- metric_id,
- timestamp,
- tags.key,
- tags.raw_value
- FROM {table_name}
- WHERE arrayExists(v -> match(v, 'metric_e2e_.*{rand_str}'), tags.raw_value)
- """
- for table_name in (
- [
- "generic_metric_counters_raw_local",
- "generic_metric_distributions_raw_local",
- "generic_metric_sets_raw_local",
- "generic_metric_gauges_raw_local",
- ]
- if is_generic
- else [
- "metrics_counters_v2_local",
- "metrics_distributions_v2_local",
- "metrics_sets_v2_local",
- ]
- )
- ]
- )
- def produce_msgs(messages, is_generic, host, dryrun, quiet):
- conf = {"bootstrap.servers": host}
- producer = KafkaProducer(conf)
- for i, message in enumerate(messages):
- print(f"{i + 1} / {len(messages)}")
- if not quiet:
- pprint.pprint(message)
- if not dryrun:
- producer.produce(
- Topic(name=("ingest-performance-metrics" if is_generic else "ingest-metrics")),
- KafkaPayload(key=None, value=json.dumps(message).encode("utf-8"), headers=[]),
- )
- print("Done")
- print()
- producer.close()
- @click.command()
- @click.option(
- "--metric-types", default="cdsg", show_default=True, help="The types of metrics to send"
- )
- @click.option(
- "--use-cases",
- multiple=True,
- default=[
- use_case_id.value for use_case_id in UseCaseID if use_case_id is not UseCaseID.SESSIONS
- ],
- show_default=True,
- help="The use case IDs.",
- )
- @click.option("--rand-str", default=None, help="The random string prefix for each key value pairs.")
- @click.option(
- "--host", default="127.0.0.1:9092", show_default=True, help="The host and port for kafka."
- )
- @click.option(
- "--dryrun",
- "-d",
- is_flag=True,
- default=False,
- show_default=True,
- help="Generate the messages without sending them.",
- )
- @click.option(
- "--quiet",
- "-q",
- is_flag=True,
- default=False,
- show_default=True,
- help="Disable printing the messages.",
- )
- @click.option(
- "--start-org-id",
- default=1,
- show_default=True,
- help="Specify which org id(s) to start from.",
- )
- @click.option(
- "--end-org-id",
- default=1,
- show_default=True,
- help="Specify which org id(s) to end with.",
- )
- @click.option(
- "--num-bad-msg",
- default=0,
- show_default=True,
- help="Number of additional badly formatted metric messages to send.",
- )
- @click.option(
- "--value-len",
- default=8,
- show_default=True,
- help="Number of elements for metrics (sets and distributions).",
- )
- @click.option(
- "--b64-encode",
- default=True,
- show_default=True,
- help="Encode sets and distribution metrics values in base64",
- )
- def main(
- metric_types,
- use_cases,
- rand_str,
- host,
- dryrun,
- quiet,
- start_org_id,
- end_org_id,
- num_bad_msg,
- value_len,
- b64_encode,
- ):
- if UseCaseID.SESSIONS.value in use_cases and len(use_cases) > 1:
- click.secho(
- "ERROR: UseCaseID.SESSIONS is in use_cases and there are more than 1 use cases",
- blink=True,
- bold=True,
- )
- exit(1)
- is_generic = UseCaseID.SESSIONS.value not in use_cases
- metric_types = "".join(set(metric_types))
- rand_str = rand_str or "".join(random.choices(string.ascii_uppercase + string.digits, k=8))
- payload_generators = {
- "c": functools.partial(make_counter_payload, rand_str=rand_str),
- "d": functools.partial(
- make_dist_payload, rand_str=rand_str, value_len=value_len, b64_encode=b64_encode
- ),
- "s": functools.partial(
- make_set_payload, rand_str=rand_str, value_len=value_len, b64_encode=b64_encode
- ),
- "g": functools.partial(make_gauge_payload, rand_str=rand_str),
- }
- messages = list(
- itertools.chain.from_iterable(
- (
- payload_generators[metric_type](use_case=use_case, org_id=org_id)
- for metric_type in metric_types
- )
- for use_case in use_cases
- for org_id in range(start_org_id, end_org_id + 1)
- )
- )
- messages.extend([{"BAD_VALUE": rand_str, "idx": i} for i in range(num_bad_msg)])
- random.shuffle(messages)
- produce_msgs(messages, is_generic, host, dryrun, quiet)
- strs_per_use_case = 3
- print(
- f"Use the following SQL to verify postgres, "
- f"there should be {strs_per_use_case} strings for each use cases, "
- f"{strs_per_use_case * len(use_cases) * (end_org_id - start_org_id + 1)} in total."
- )
- print(make_psql(rand_str, is_generic))
- if is_generic:
- print(
- f"Use the following SQL to verify clickhouse, "
- f"there should be {len(metric_types)} metrics for each use cases, "
- f"{len(metric_types) * len(use_cases) * (end_org_id - start_org_id + 1)} in total."
- )
- print(make_csql(rand_str, is_generic))
- if __name__ == "__main__":
- main()
|