123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296 |
- #!/usr/bin/env python
- import csv
- import os
- import random
- import sys
- import time
- from collections import defaultdict
- from subprocess import list2cmdline
- import click
- from honcho.manager import Manager
- from sentry.runner import configure
- from sentry.runner.formatting import get_honcho_printer
- configure()
- @click.command("taskworker-test")
- @click.option("--number", "-n", help="Number of messages to append", default=10_000)
- @click.option("--verbose", "-v", help="Enable verbose output", default=False, is_flag=True)
- @click.option("--seed", "-s", help="pseudo random number generator seed", default=None)
- @click.option("--consumers", help="Number of consumer processes", default=1)
- @click.option("--workers", help="Number of worker processes", default=1)
- @click.option("--mode", help="The worker/consumer mode to use", default="pull")
- @click.option("--storage", help="The storage mode to use", default="postgres")
- @click.option("--report-only", help="Run report output only", is_flag=True, default=False)
- def main(
- number: int,
- verbose: bool,
- consumers: int,
- workers: int,
- seed: float | None,
- mode: str,
- report_only: bool,
- storage: str,
- ):
- from sentry.taskdemo import variable_time
- if report_only:
- return print_results("./taskworker.log", workers)
- if verbose:
- click.echo(f"Adding {number} task messages")
- if not seed:
- seed = random.random()
- # Fill the topic up with a number of tasks
- start = time.monotonic()
- for i in range(number):
- variable_time.delay(wait=random.random(), taskno=i)
- end = time.monotonic()
- click.echo(f"Appending {number} tasks took {(end-start)}s")
- cwd = os.getcwd()
- os.unlink("./taskworker.log")
- # Use honcho to control the worker and consumer proceses.
- honcho_printer = get_honcho_printer(prefix=True, pretty=False)
- manager = Manager(honcho_printer)
- processes = []
- if mode == "pull":
- processes.append(
- {
- "name": "grpc",
- "cmd": ["sentry", "run", "kafka-task-grpc-pull", "--storage", storage],
- },
- )
- for i in range(consumers):
- processes.append(
- {
- "name": f"consumer-{i}",
- "cmd": [
- "sentry",
- "run",
- "consumer",
- "taskworker",
- "--consumer-group",
- "taskworker-pull",
- "--log-level",
- "warning",
- "--",
- "--storage",
- storage,
- ],
- }
- )
- for i in range(workers):
- processes.append(
- {
- "name": f"worker-{i}",
- "cmd": [
- "sentry",
- "run",
- "taskworker-pull",
- "--namespace",
- "demos",
- ],
- },
- )
- elif mode == "push":
- worker_ports = [50051 + i for i in range(workers)]
- worker_addrs = ",".join([f"127.0.0.1:{port}" for port in worker_ports])
- processes.append(
- {
- "name": "grpc",
- "cmd": [
- "sentry",
- "run",
- "kafka-task-grpc-push",
- "--worker-addrs",
- worker_addrs,
- "--storage",
- storage,
- ],
- },
- )
- processes.append(
- {
- "name": "consumer",
- "cmd": [
- "sentry",
- "run",
- "consumer",
- "taskworker",
- "--consumer-group",
- "taskworker-pull",
- "--log-level",
- "warning",
- "--",
- "--storage",
- storage,
- ],
- }
- )
- for port in worker_ports:
- processes.append(
- {
- "name": f"worker-{port}",
- "cmd": [
- "sentry",
- "run",
- "taskworker-push",
- "--namespace",
- "demos",
- "--port",
- str(port),
- ],
- },
- )
- elif mode == "reply":
- worker_ports = [50051 + i for i in range(workers)]
- worker_addrs = ",".join([f"127.0.0.1:{port}" for port in worker_ports])
- processes.append(
- {
- "name": "consumer",
- "cmd": [
- "sentry",
- "run",
- "consumer",
- "taskworker-reply",
- "--consumer-group",
- "taskworker-pull",
- "--log-level",
- "warning",
- "--",
- "--storage",
- storage,
- "--worker-addrs",
- worker_addrs,
- ],
- }
- )
- for port in worker_ports:
- processes.append(
- {
- "name": f"worker-{port}",
- "cmd": [
- "sentry",
- "run",
- "taskworker-reply",
- "--namespace",
- "demos",
- "--port",
- str(port),
- ],
- },
- )
- else:
- raise RuntimeError(f"Unexpected mode of {mode}. Use `push`, `pull`, or `reply` instead.")
- for process in processes:
- manager.add_process(process["name"], list2cmdline(process["cmd"]), cwd=cwd)
- # Lets go!
- manager.loop()
- print_results("./taskworker.log", workers)
- sys.exit(manager.returncode)
- def print_results(log_file: str, worker_count: int) -> None:
- click.echo("")
- click.echo("== Test run complete ==")
- click.echo("")
- fieldnames = ["event", "worker_id", "task_add_time", "execution_time", "latency", "task_id"]
- latency_times = []
- execution_times = []
- task_ids = set()
- duplicates = []
- with open(log_file) as logs:
- results = csv.DictReader(logs, fieldnames=fieldnames)
- for row in results:
- latency_times.append(float(row["latency"].strip()))
- execution_times.append(float(row["execution_time"].strip()))
- row_id = row["task_id"].strip()
- if row_id in task_ids:
- duplicates.append(row_id)
- task_ids.add(row_id)
- # We append tasks and then start applications. The first
- # message always has long latency as application startup
- # and kafka take some time to get going.
- first_latency = latency_times[0]
- min_latency = min(latency_times)
- max_latency = max(latency_times)
- avg_latency = sum(latency_times) / len(latency_times)
- processing_time = execution_times[-1] - execution_times[0]
- task_throughput = len(latency_times) / processing_time
- # Remove the startup overhead to get relative latency.
- adj_min_latency = min_latency - first_latency
- adj_max_latency = max_latency - first_latency
- adj_avg_latency = avg_latency - first_latency
- # Bucket latency and count totals in each bucket
- latency_spread = adj_max_latency - adj_min_latency
- bucket_count = 20
- bucket_width = latency_spread / bucket_count
- buckets = defaultdict(int)
- for value in latency_times:
- adjusted = max(value - first_latency, 0)
- bucket = int(adjusted / bucket_width)
- buckets[bucket] += 1
- click.echo("")
- click.echo("## Run summary")
- click.echo("")
- click.echo(f"Task count: {len(latency_times)}")
- click.echo(f"Processing time: {processing_time:.4f}")
- click.echo(f"Throughput: {task_throughput:.4f}")
- if duplicates:
- click.echo("")
- click.echo("Duplicate executions found:")
- for dupe in duplicates:
- click.echo(f"- {dupe}")
- click.echo("")
- click.echo("")
- click.echo("## Task latency")
- click.echo("")
- click.echo(f"First task Latency: {first_latency:.5f}")
- click.echo(
- f"Raw Min / Max / Avg latency: {min_latency:.5f} / {max_latency:.5f} / {avg_latency:.5f}"
- )
- click.echo(
- f"Adjusted Min / Max / Avg latency: {adj_min_latency:.5f} / {adj_max_latency:.5f} / {adj_avg_latency:.5f}"
- )
- click.echo("")
- click.echo("## Latency histogram")
- click.echo("")
- bars = []
- for key, count in buckets.items():
- bucket_upper = key * bucket_width
- # Limit to 50 to prevent wrapping in output
- bar = "█" * min(count, 50)
- bar += f" {count} "
- bars.append((bucket_upper, bar))
- for bucket_upper, bar in sorted(bars, key=lambda x: x[0]):
- click.echo(f"{bucket_upper:.5f} {bar}")
- if __name__ == "__main__":
- main()
|