#!/usr/bin/env python import csv import os import random import sys import time from collections import defaultdict from subprocess import list2cmdline import click from honcho.manager import Manager from sentry.runner import configure from sentry.runner.formatting import get_honcho_printer configure() @click.command("taskworker-test") @click.option("--number", "-n", help="Number of messages to append", default=10_000) @click.option("--verbose", "-v", help="Enable verbose output", default=False, is_flag=True) @click.option("--seed", "-s", help="pseudo random number generator seed", default=None) @click.option("--consumers", help="Number of consumer processes", default=1) @click.option("--workers", help="Number of worker processes", default=1) @click.option("--mode", help="The worker/consumer mode to use", default="pull") @click.option("--storage", help="The storage mode to use", default="postgres") @click.option("--report-only", help="Run report output only", is_flag=True, default=False) def main( number: int, verbose: bool, consumers: int, workers: int, seed: float | None, mode: str, report_only: bool, storage: str, ): from sentry.taskdemo import variable_time if report_only: return print_results("./taskworker.log", workers) if verbose: click.echo(f"Adding {number} task messages") if not seed: seed = random.random() # Fill the topic up with a number of tasks start = time.monotonic() for i in range(number): variable_time.delay(wait=random.random(), taskno=i) end = time.monotonic() click.echo(f"Appending {number} tasks took {(end-start)}s") cwd = os.getcwd() os.unlink("./taskworker.log") # Use honcho to control the worker and consumer proceses. honcho_printer = get_honcho_printer(prefix=True, pretty=False) manager = Manager(honcho_printer) processes = [] if mode == "pull": processes.append( { "name": "grpc", "cmd": ["sentry", "run", "kafka-task-grpc-pull", "--storage", storage], }, ) for i in range(consumers): processes.append( { "name": f"consumer-{i}", "cmd": [ "sentry", "run", "consumer", "taskworker", "--consumer-group", "taskworker-pull", "--log-level", "warning", "--", "--storage", storage, ], } ) for i in range(workers): processes.append( { "name": f"worker-{i}", "cmd": [ "sentry", "run", "taskworker-pull", "--namespace", "demos", ], }, ) elif mode == "push": worker_ports = [50051 + i for i in range(workers)] worker_addrs = ",".join([f"127.0.0.1:{port}" for port in worker_ports]) processes.append( { "name": "grpc", "cmd": [ "sentry", "run", "kafka-task-grpc-push", "--worker-addrs", worker_addrs, "--storage", storage, ], }, ) processes.append( { "name": "consumer", "cmd": [ "sentry", "run", "consumer", "taskworker", "--consumer-group", "taskworker-pull", "--log-level", "warning", "--", "--storage", storage, ], } ) for port in worker_ports: processes.append( { "name": f"worker-{port}", "cmd": [ "sentry", "run", "taskworker-push", "--namespace", "demos", "--port", str(port), ], }, ) elif mode == "reply": worker_ports = [50051 + i for i in range(workers)] worker_addrs = ",".join([f"127.0.0.1:{port}" for port in worker_ports]) processes.append( { "name": "consumer", "cmd": [ "sentry", "run", "consumer", "taskworker-reply", "--consumer-group", "taskworker-pull", "--log-level", "warning", "--", "--storage", storage, "--worker-addrs", worker_addrs, ], } ) for port in worker_ports: processes.append( { "name": f"worker-{port}", "cmd": [ "sentry", "run", "taskworker-reply", "--namespace", "demos", "--port", str(port), ], }, ) else: raise RuntimeError(f"Unexpected mode of {mode}. Use `push`, `pull`, or `reply` instead.") for process in processes: manager.add_process(process["name"], list2cmdline(process["cmd"]), cwd=cwd) # Lets go! manager.loop() print_results("./taskworker.log", workers) sys.exit(manager.returncode) def print_results(log_file: str, worker_count: int) -> None: click.echo("") click.echo("== Test run complete ==") click.echo("") fieldnames = ["event", "worker_id", "task_add_time", "execution_time", "latency", "task_id"] latency_times = [] execution_times = [] task_ids = set() duplicates = [] with open(log_file) as logs: results = csv.DictReader(logs, fieldnames=fieldnames) for row in results: latency_times.append(float(row["latency"].strip())) execution_times.append(float(row["execution_time"].strip())) row_id = row["task_id"].strip() if row_id in task_ids: duplicates.append(row_id) task_ids.add(row_id) # We append tasks and then start applications. The first # message always has long latency as application startup # and kafka take some time to get going. first_latency = latency_times[0] min_latency = min(latency_times) max_latency = max(latency_times) avg_latency = sum(latency_times) / len(latency_times) processing_time = execution_times[-1] - execution_times[0] task_throughput = len(latency_times) / processing_time # Remove the startup overhead to get relative latency. adj_min_latency = min_latency - first_latency adj_max_latency = max_latency - first_latency adj_avg_latency = avg_latency - first_latency # Bucket latency and count totals in each bucket latency_spread = adj_max_latency - adj_min_latency bucket_count = 20 bucket_width = latency_spread / bucket_count buckets = defaultdict(int) for value in latency_times: adjusted = max(value - first_latency, 0) bucket = int(adjusted / bucket_width) buckets[bucket] += 1 click.echo("") click.echo("## Run summary") click.echo("") click.echo(f"Task count: {len(latency_times)}") click.echo(f"Processing time: {processing_time:.4f}") click.echo(f"Throughput: {task_throughput:.4f}") if duplicates: click.echo("") click.echo("Duplicate executions found:") for dupe in duplicates: click.echo(f"- {dupe}") click.echo("") click.echo("") click.echo("## Task latency") click.echo("") click.echo(f"First task Latency: {first_latency:.5f}") click.echo( f"Raw Min / Max / Avg latency: {min_latency:.5f} / {max_latency:.5f} / {avg_latency:.5f}" ) click.echo( f"Adjusted Min / Max / Avg latency: {adj_min_latency:.5f} / {adj_max_latency:.5f} / {adj_avg_latency:.5f}" ) click.echo("") click.echo("## Latency histogram") click.echo("") bars = [] for key, count in buckets.items(): bucket_upper = key * bucket_width # Limit to 50 to prevent wrapping in output bar = "█" * min(count, 50) bar += f" {count} " bars.append((bucket_upper, bar)) for bucket_upper, bar in sorted(bars, key=lambda x: x[0]): click.echo(f"{bucket_upper:.5f} {bar}") if __name__ == "__main__": main()