send_metrics.py 6.2 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219
  1. # pylint: skip-file
  2. # flake8: noqa
  3. import datetime
  4. import itertools
  5. import json
  6. import pprint
  7. import random
  8. import string
  9. import click
  10. from arroyo.backends.kafka import KafkaPayload, KafkaProducer
  11. from arroyo.types import Topic
  12. from sentry.sentry_metrics.use_case_id_registry import UseCaseID
  13. make_counter_payload = lambda use_case, org_id, rand_str: {
  14. "name": f"c:{use_case}/{use_case}@none",
  15. "tags": {
  16. "environment": "production",
  17. "session.status": "init",
  18. f"metric_e2e_{use_case}_counter_k_{rand_str}": f"metric_e2e_{use_case}_counter_v_{rand_str}",
  19. },
  20. "timestamp": int(datetime.datetime.now(tz=datetime.timezone.utc).timestamp()),
  21. "type": "c",
  22. "value": 1,
  23. "org_id": org_id,
  24. "retention_days": 90,
  25. "project_id": 3,
  26. }
  27. make_dist_payload = lambda use_case, org_id, rand_str, value_len: {
  28. "name": f"d:{use_case}/duration@second",
  29. "tags": {
  30. "environment": "production",
  31. "session.status": "healthy",
  32. f"metric_e2e_{use_case}_dist_k_{rand_str}": f"metric_e2e_{use_case}_dist_v_{rand_str}",
  33. },
  34. "timestamp": int(datetime.datetime.now(tz=datetime.timezone.utc).timestamp()),
  35. "type": "d",
  36. "value": [i for i in range(value_len)],
  37. "org_id": org_id,
  38. "retention_days": 90,
  39. "project_id": 3,
  40. }
  41. make_set_payload = lambda use_case, org_id, rand_str, value_len: {
  42. "name": f"s:{use_case}/error@none",
  43. "tags": {
  44. "environment": "production",
  45. "session.status": "errored",
  46. f"metric_e2e_{use_case}_set_k_{rand_str}": f"metric_e2e_{use_case}_set_v_{rand_str}",
  47. },
  48. "timestamp": int(datetime.datetime.now(tz=datetime.timezone.utc).timestamp()),
  49. "type": "s",
  50. "value": [i for i in range(value_len)],
  51. "org_id": org_id,
  52. "retention_days": 90,
  53. "project_id": 3,
  54. }
  55. make_psql = (
  56. lambda rand_str, is_generic: f"""
  57. SELECT string,
  58. organization_id,
  59. {"use_case_id," if is_generic else ""}
  60. date_added,
  61. last_seen
  62. FROM {"sentry_perfstringindexer" if is_generic else "sentry_stringindexer"}
  63. WHERE string ~ 'metric_e2e_.*{rand_str}';
  64. """
  65. )
  66. make_csql = lambda rand_str, is_generic: "UNION ALL".join(
  67. [
  68. f"""
  69. SELECT use_case_id,
  70. org_id,
  71. project_id,
  72. metric_id,
  73. timestamp,
  74. tags.key,
  75. tags.raw_value
  76. FROM {table_name}
  77. WHERE arrayExists(v -> match(v, 'metric_e2e_.*{rand_str}'), tags.raw_value)
  78. """
  79. for table_name in (
  80. [
  81. "generic_metric_counters_raw_local",
  82. "generic_metric_distributions_raw_local",
  83. "generic_metric_sets_raw_local",
  84. ]
  85. if is_generic
  86. else [
  87. "metrics_counters_v2_local",
  88. "metrics_distributions_v2_local",
  89. "metrics_sets_v2_local",
  90. ]
  91. )
  92. ]
  93. )
  94. def produce_msgs(messages, is_generic, host, dryrun):
  95. conf = {"bootstrap.servers": host}
  96. producer = KafkaProducer(conf)
  97. for i, message in enumerate(messages):
  98. print(f"{i + 1} / {len(messages)}")
  99. # pprint.pprint(message)
  100. if not dryrun:
  101. producer.produce(
  102. Topic(name=("ingest-performance-metrics" if is_generic else "ingest-metrics")),
  103. KafkaPayload(key=None, value=json.dumps(message).encode("utf-8"), headers=[]),
  104. )
  105. print("Done")
  106. print()
  107. producer.close()
  108. @click.command()
  109. @click.option(
  110. "--use-cases",
  111. multiple=True,
  112. default=[
  113. use_case_id.value for use_case_id in UseCaseID if use_case_id is not UseCaseID.SESSIONS
  114. ],
  115. show_default=True,
  116. help="The use case IDs.",
  117. )
  118. @click.option("--rand-str", default=None, help="The random string prefix for each key value pairs.")
  119. @click.option(
  120. "--host", default="127.0.0.1:9092", show_default=True, help="The host and port for kafka."
  121. )
  122. @click.option(
  123. "--dryrun",
  124. is_flag=True,
  125. default=False,
  126. show_default=True,
  127. help="Print the messages without sending them.",
  128. )
  129. @click.option(
  130. "--start-org-id",
  131. default=1,
  132. show_default=True,
  133. help="Specify which org id(s) to start from",
  134. )
  135. @click.option(
  136. "--end-org-id",
  137. default=1,
  138. show_default=True,
  139. help="Specify which org id(s) to end with",
  140. )
  141. @click.option(
  142. "--num-bad-msg",
  143. default=0,
  144. show_default=True,
  145. help="Number of additional badly formatted metric messages to send",
  146. )
  147. @click.option(
  148. "--value-len",
  149. default=6,
  150. show_default=True,
  151. help="Number of elements for metrics (sets and distributions)",
  152. )
  153. def main(use_cases, rand_str, host, dryrun, start_org_id, end_org_id, num_bad_msg, value_len):
  154. if UseCaseID.SESSIONS.value in use_cases and len(use_cases) > 1:
  155. click.secho(
  156. "ERROR: UseCaseID.SESSIONS is in use_cases and there are more than 1 use cases",
  157. blink=True,
  158. bold=True,
  159. )
  160. exit(1)
  161. rand_str = rand_str or "".join(random.choices(string.ascii_uppercase + string.digits, k=8))
  162. is_generic = UseCaseID.SESSIONS.value not in use_cases
  163. messages = list(
  164. itertools.chain.from_iterable(
  165. (
  166. make_counter_payload(use_case, org, rand_str),
  167. make_dist_payload(use_case, org, rand_str, value_len),
  168. make_set_payload(use_case, org, rand_str, value_len),
  169. )
  170. for use_case in use_cases
  171. for org in range(start_org_id, end_org_id + 1)
  172. )
  173. )
  174. messages.extend([{"BAD_VALUE": rand_str, "idx": i} for i in range(num_bad_msg)])
  175. random.shuffle(messages)
  176. produce_msgs(messages, is_generic, host, dryrun)
  177. metrics_per_use_case = 3
  178. strs_per_use_case = 3
  179. print(
  180. f"Use the following SQL to verify postgres, "
  181. f"there should be {strs_per_use_case} strings for each use cases, "
  182. f"{strs_per_use_case * len(use_cases) * (end_org_id - start_org_id + 1)} in total."
  183. )
  184. print(make_psql(rand_str, is_generic))
  185. if is_generic:
  186. print(
  187. f"Use the following SQL to verify clickhouse, "
  188. f"there should be {metrics_per_use_case} metrics for each use cases, "
  189. f"{metrics_per_use_case * len(use_cases) * (end_org_id - start_org_id + 1)} in total."
  190. )
  191. print(make_csql(rand_str, is_generic))
  192. if __name__ == "__main__":
  193. main()