send_metrics.py 9.8 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342
  1. # pylint: skip-file
  2. # flake8: noqa
  3. import base64
  4. import datetime
  5. import functools
  6. import itertools
  7. import json
  8. import pprint
  9. import random
  10. import string
  11. import struct
  12. import click
  13. from arroyo.backends.kafka import KafkaPayload, KafkaProducer
  14. from arroyo.types import Topic
  15. from sentry.sentry_metrics.use_case_id_registry import UseCaseID
  16. def make_counter_payload(use_case, org_id, rand_str, sampling_weight=None):
  17. return {
  18. "name": f"c:{use_case}/{use_case}@none",
  19. "tags": {
  20. "environment": "production",
  21. "session.status": "init",
  22. f"metric_e2e_{use_case}_counter_k_{rand_str}": f"metric_e2e_{use_case}_counter_v_{rand_str}",
  23. },
  24. "timestamp": int(datetime.datetime.now(tz=datetime.UTC).timestamp()),
  25. "type": "c",
  26. "value": 1,
  27. "org_id": org_id,
  28. "retention_days": 90,
  29. "project_id": 3,
  30. **({"sampling_weight": sampling_weight} if sampling_weight else {}),
  31. }
  32. def make_dist_payload(use_case, org_id, rand_str, value_len, b64_encode, sampling_weight=None):
  33. nums = [random.random() for _ in range(value_len)]
  34. return {
  35. "name": f"d:{use_case}/duration@second",
  36. "tags": {
  37. "environment": "production",
  38. "session.status": "healthy",
  39. f"metric_e2e_{use_case}_dist_k_{rand_str}": f"metric_e2e_{use_case}_dist_v_{rand_str}",
  40. },
  41. "timestamp": int(datetime.datetime.now(tz=datetime.UTC).timestamp()),
  42. "type": "d",
  43. "value": (
  44. {
  45. "format": "base64",
  46. "data": base64.b64encode(struct.pack(f"<{len(nums)}d", *nums))
  47. .replace(b"=", b"")
  48. .decode("ascii"),
  49. }
  50. if b64_encode
  51. else {
  52. "format": "zstd",
  53. "data": "KLUv/QBYrQAAcAAA8D8AQAAAAAAAAAhAAgBgRgCw",
  54. }
  55. ),
  56. "org_id": org_id,
  57. "retention_days": 90,
  58. "project_id": 3,
  59. **({"sampling_weight": sampling_weight} if sampling_weight else {}),
  60. }
  61. def make_set_payload(use_case, org_id, rand_str, value_len, b64_encode, sampling_weight=None):
  62. INT_WIDTH = 4
  63. nums = [random.randint(0, 2048) for _ in range(value_len)]
  64. return {
  65. "name": f"s:{use_case}/error@none",
  66. "tags": {
  67. "environment": "production",
  68. "session.status": "errored",
  69. f"metric_e2e_{use_case}_set_k_{rand_str}": f"metric_e2e_{use_case}_set_v_{rand_str}",
  70. },
  71. "timestamp": int(datetime.datetime.now(tz=datetime.UTC).timestamp()),
  72. "type": "s",
  73. "value": (
  74. {
  75. "format": "base64",
  76. "data": base64.b64encode(
  77. b"".join([num.to_bytes(INT_WIDTH, byteorder="little") for num in nums])
  78. )
  79. .replace(b"=", b"")
  80. .decode("ascii"),
  81. }
  82. if b64_encode
  83. else {
  84. "format": "array",
  85. "data": nums,
  86. }
  87. ),
  88. "org_id": org_id,
  89. "retention_days": 90,
  90. "project_id": 3,
  91. **({"sampling_weight": sampling_weight} if sampling_weight else {}),
  92. }
  93. def make_gauge_payload(use_case, org_id, rand_str, sampling_weight):
  94. return {
  95. "name": f"s:{use_case}/error@none",
  96. "tags": {
  97. "environment": "production",
  98. "session.status": "errored",
  99. f"metric_e2e_{use_case}_gauge_k_{rand_str}": f"metric_e2e_{use_case}_gauge_v_{rand_str}",
  100. },
  101. "timestamp": int(datetime.datetime.now(tz=datetime.UTC).timestamp()),
  102. "type": "g",
  103. "value": {
  104. "min": 1,
  105. "max": 1,
  106. "sum": 1,
  107. "count": 1,
  108. "last": 1,
  109. },
  110. "org_id": org_id,
  111. "retention_days": 90,
  112. "project_id": 3,
  113. **({"sampling_weight": sampling_weight} if sampling_weight else {}),
  114. }
  115. def make_psql(rand_str, is_generic):
  116. return f"""
  117. SELECT string,
  118. organization_id,
  119. {"use_case_id," if is_generic else ""}
  120. date_added,
  121. last_seen
  122. FROM {"sentry_perfstringindexer" if is_generic else "sentry_stringindexer"}
  123. WHERE string ~ 'metric_e2e_.*{rand_str}';
  124. """
  125. def make_csql(rand_str, is_generic):
  126. return "UNION ALL".join(
  127. [
  128. f"""
  129. SELECT use_case_id,
  130. org_id,
  131. project_id,
  132. metric_id,
  133. timestamp,
  134. tags.key,
  135. tags.raw_value
  136. FROM {table_name}
  137. WHERE arrayExists(v -> match(v, 'metric_e2e_.*{rand_str}'), tags.raw_value)
  138. """
  139. for table_name in (
  140. [
  141. "generic_metric_counters_raw_local",
  142. "generic_metric_distributions_raw_local",
  143. "generic_metric_sets_raw_local",
  144. "generic_metric_gauges_raw_local",
  145. ]
  146. if is_generic
  147. else [
  148. "metrics_counters_v2_local",
  149. "metrics_distributions_v2_local",
  150. "metrics_sets_v2_local",
  151. ]
  152. )
  153. ]
  154. )
  155. def produce_msgs(messages, is_generic, host, dryrun, quiet):
  156. conf = {"bootstrap.servers": host}
  157. producer = KafkaProducer(conf)
  158. for i, message in enumerate(messages):
  159. print(f"{i + 1} / {len(messages)}")
  160. if not quiet:
  161. pprint.pprint(message)
  162. if not dryrun:
  163. producer.produce(
  164. Topic(name=("ingest-performance-metrics" if is_generic else "ingest-metrics")),
  165. KafkaPayload(key=None, value=json.dumps(message).encode("utf-8"), headers=[]),
  166. )
  167. print("Done")
  168. print()
  169. producer.close()
  170. @click.command()
  171. @click.option(
  172. "--metric-types", default="cdsg", show_default=True, help="The types of metrics to send"
  173. )
  174. @click.option(
  175. "--use-cases",
  176. multiple=True,
  177. default=[
  178. use_case_id.value for use_case_id in UseCaseID if use_case_id is not UseCaseID.SESSIONS
  179. ],
  180. show_default=True,
  181. help="The use case IDs.",
  182. )
  183. @click.option("--rand-str", default=None, help="The random string prefix for each key value pairs.")
  184. @click.option(
  185. "--host", default="127.0.0.1:9092", show_default=True, help="The host and port for kafka."
  186. )
  187. @click.option(
  188. "--dryrun",
  189. "-d",
  190. is_flag=True,
  191. default=False,
  192. show_default=True,
  193. help="Generate the messages without sending them.",
  194. )
  195. @click.option(
  196. "--quiet",
  197. "-q",
  198. is_flag=True,
  199. default=False,
  200. show_default=True,
  201. help="Disable printing the messages.",
  202. )
  203. @click.option(
  204. "--start-org-id",
  205. default=1,
  206. show_default=True,
  207. help="Specify which org id(s) to start from.",
  208. )
  209. @click.option(
  210. "--end-org-id",
  211. default=1,
  212. show_default=True,
  213. help="Specify which org id(s) to end with.",
  214. )
  215. @click.option(
  216. "--num-bad-msg",
  217. default=0,
  218. show_default=True,
  219. help="Number of additional badly formatted metric messages to send.",
  220. )
  221. @click.option(
  222. "--value-len",
  223. default=8,
  224. show_default=True,
  225. help="Number of elements for metrics (sets and distributions).",
  226. )
  227. @click.option(
  228. "--b64-encode",
  229. default=True,
  230. show_default=True,
  231. help="Encode sets and distribution metrics values in base64",
  232. )
  233. @click.option(
  234. "--sampling-weight",
  235. type=int,
  236. default=None,
  237. show_default=True,
  238. help="Sampling weight for the metrics",
  239. )
  240. def main(
  241. metric_types,
  242. use_cases,
  243. rand_str,
  244. host,
  245. dryrun,
  246. quiet,
  247. start_org_id,
  248. end_org_id,
  249. num_bad_msg,
  250. value_len,
  251. b64_encode,
  252. sampling_weight,
  253. ):
  254. if UseCaseID.SESSIONS.value in use_cases and len(use_cases) > 1:
  255. click.secho(
  256. "ERROR: UseCaseID.SESSIONS is in use_cases and there are more than 1 use cases",
  257. blink=True,
  258. bold=True,
  259. )
  260. exit(1)
  261. is_generic = UseCaseID.SESSIONS.value not in use_cases
  262. metric_types = "".join(set(metric_types))
  263. rand_str = rand_str or "".join(random.choices(string.ascii_uppercase + string.digits, k=8))
  264. payload_generators = {
  265. "c": functools.partial(
  266. make_counter_payload, rand_str=rand_str, sampling_weight=sampling_weight
  267. ),
  268. "d": functools.partial(
  269. make_dist_payload,
  270. rand_str=rand_str,
  271. value_len=value_len,
  272. b64_encode=b64_encode,
  273. sampling_weight=sampling_weight,
  274. ),
  275. "s": functools.partial(
  276. make_set_payload,
  277. rand_str=rand_str,
  278. value_len=value_len,
  279. b64_encode=b64_encode,
  280. sampling_weight=sampling_weight,
  281. ),
  282. "g": functools.partial(
  283. make_gauge_payload, rand_str=rand_str, sampling_weight=sampling_weight
  284. ),
  285. }
  286. messages = list(
  287. itertools.chain.from_iterable(
  288. (
  289. payload_generators[metric_type](use_case=use_case, org_id=org_id)
  290. for metric_type in metric_types
  291. )
  292. for use_case in use_cases
  293. for org_id in range(start_org_id, end_org_id + 1)
  294. )
  295. )
  296. messages.extend([{"BAD_VALUE": rand_str, "idx": i} for i in range(num_bad_msg)])
  297. random.shuffle(messages)
  298. produce_msgs(messages, is_generic, host, dryrun, quiet)
  299. strs_per_use_case = 3
  300. print(
  301. f"Use the following SQL to verify postgres, "
  302. f"there should be {strs_per_use_case} strings for each use cases, "
  303. f"{strs_per_use_case * len(use_cases) * (end_org_id - start_org_id + 1)} in total."
  304. )
  305. print(make_psql(rand_str, is_generic))
  306. if is_generic:
  307. print(
  308. f"Use the following SQL to verify clickhouse, "
  309. f"there should be {len(metric_types)} metrics for each use cases, "
  310. f"{len(metric_types) * len(use_cases) * (end_org_id - start_org_id + 1)} in total."
  311. )
  312. print(make_csql(rand_str, is_generic))
  313. if __name__ == "__main__":
  314. main()