send_metrics.py 3.8 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135
  1. # flake8: noqa
  2. import datetime
  3. import itertools
  4. import json
  5. import pprint
  6. import random
  7. import string
  8. import sys
  9. from arroyo.backends.kafka import KafkaPayload, KafkaProducer
  10. from arroyo.types import Topic
  11. BOOTSTRAP_HOST = "127.0.0.1:9092"
  12. TOPIC_NAME = "ingest-performance-metrics"
  13. conf = {"bootstrap.servers": BOOTSTRAP_HOST}
  14. make_counter_payload = lambda use_case, rand_str: {
  15. "name": f"c:{use_case}/{use_case}@none",
  16. "tags": {
  17. "environment": "production",
  18. "session.status": "init",
  19. f"gen_metric_e2e_{use_case}_counter_k_{rand_str}": f"gen_metric_e2e_{use_case}_counter_v_{rand_str}",
  20. },
  21. "timestamp": int(datetime.datetime.now(tz=datetime.timezone.utc).timestamp()),
  22. "type": "c",
  23. "value": 1,
  24. "org_id": 1,
  25. "retention_days": 90,
  26. "project_id": 3,
  27. }
  28. make_dist_payload = lambda use_case, rand_str: {
  29. "name": f"d:{use_case}/duration@second",
  30. "tags": {
  31. "environment": "production",
  32. "session.status": "healthy",
  33. f"gen_metric_e2e_{use_case}_dist_k_{rand_str}": f"gen_metric_e2e_{use_case}_dist_v_{rand_str}",
  34. },
  35. "timestamp": int(datetime.datetime.now(tz=datetime.timezone.utc).timestamp()),
  36. "type": "d",
  37. "value": [4, 5, 6],
  38. "org_id": 1,
  39. "retention_days": 90,
  40. "project_id": 3,
  41. }
  42. make_set_payload = lambda use_case, rand_str: {
  43. "name": f"s:{use_case}/error@none",
  44. "tags": {
  45. "environment": "production",
  46. "session.status": "errored",
  47. f"gen_metric_e2e_{use_case}_set_k_{rand_str}": f"gen_metric_e2e_{use_case}_set_v_{rand_str}",
  48. },
  49. "timestamp": int(datetime.datetime.now(tz=datetime.timezone.utc).timestamp()),
  50. "type": "s",
  51. "value": [3],
  52. "org_id": 1,
  53. "retention_days": 90,
  54. "project_id": 3,
  55. }
  56. make_psql = (
  57. lambda rand_str: f"""
  58. SELECT string,
  59. organization_id,
  60. date_added,
  61. use_case_id
  62. FROM sentry_perfstringindexer
  63. WHERE string ~ 'gen_metric_e2e_.*{rand_str}';
  64. """
  65. )
  66. make_csql = lambda rand_str: "UNION ALL".join(
  67. [
  68. f"""
  69. SELECT use_case_id,
  70. org_id,
  71. project_id,
  72. metric_id,
  73. timestamp,
  74. tags.key,
  75. tags.raw_value
  76. FROM {table_name}
  77. WHERE arrayExists(v -> match(v, 'gen_metric_e2e_.*{rand_str}'), tags.raw_value)
  78. """
  79. for table_name in [
  80. "generic_metric_counters_raw_local",
  81. "generic_metric_distributions_raw_local",
  82. "generic_metric_sets_raw_local",
  83. ]
  84. ]
  85. )
  86. def produce_msgs(messages):
  87. producer = KafkaProducer(conf)
  88. for i, message in enumerate(messages):
  89. print(f"Sending message {i + 1} of {len(messages)}:")
  90. pprint.pprint(message)
  91. producer.produce(
  92. Topic(name=TOPIC_NAME),
  93. KafkaPayload(key=None, value=json.dumps(message).encode("utf-8"), headers=[]),
  94. )
  95. print("Done")
  96. print()
  97. producer.close()
  98. if __name__ == "__main__":
  99. rand_str = "".join(random.choices(string.ascii_uppercase + string.digits, k=8))
  100. use_cases = ["spans", "transactions", sys.argv[1]]
  101. messages = list(
  102. itertools.chain.from_iterable(
  103. (
  104. make_counter_payload(use_case, rand_str),
  105. make_dist_payload(use_case, rand_str),
  106. make_set_payload(use_case, rand_str),
  107. )
  108. for use_case in use_cases
  109. )
  110. )
  111. random.shuffle(messages)
  112. produce_msgs(messages)
  113. print(
  114. f"Use the following SQL to verify postgres, there should be {(strs_per_use_case := 6)} strings for each use cases, {strs_per_use_case * len(use_cases)} in total."
  115. )
  116. print(make_psql(rand_str))
  117. print(
  118. f"Use the following SQL to verify clickhouse, there should be {(metrics_per_use_case := 3)} metrics for each use cases, {len(use_cases)} in total."
  119. )
  120. print(make_csql(rand_str))