|
@@ -1,6 +1,5 @@
|
|
|
from __future__ import annotations
|
|
|
|
|
|
-import collections
|
|
|
import functools
|
|
|
import logging
|
|
|
import threading
|
|
@@ -10,7 +9,7 @@ from concurrent.futures._base import FINISHED, RUNNING
|
|
|
from contextlib import contextmanager
|
|
|
from queue import Full, PriorityQueue
|
|
|
from time import time
|
|
|
-from typing import TYPE_CHECKING, TypeVar
|
|
|
+from typing import TYPE_CHECKING, Generic, NamedTuple, TypeVar
|
|
|
|
|
|
from sentry_sdk import Hub
|
|
|
|
|
@@ -46,7 +45,10 @@ def execute(function: Callable[..., T], daemon=True):
|
|
|
|
|
|
|
|
|
@functools.total_ordering
|
|
|
-class PriorityTask(collections.namedtuple("PriorityTask", "priority item")):
|
|
|
+class PriorityTask(NamedTuple, Generic[T]):
|
|
|
+ priority: int
|
|
|
+ item: tuple[Callable[[], T], Hub, FutureBase[T]]
|
|
|
+
|
|
|
def __eq__(self, b):
|
|
|
return self.priority == b.priority
|
|
|
|
|
@@ -54,7 +56,7 @@ class PriorityTask(collections.namedtuple("PriorityTask", "priority item")):
|
|
|
return self.priority < b.priority
|
|
|
|
|
|
|
|
|
-class TimedFuture(FutureBase):
|
|
|
+class TimedFuture(FutureBase[T]):
|
|
|
_condition: threading.Condition
|
|
|
_state: str
|
|
|
|
|
@@ -126,7 +128,7 @@ class TimedFuture(FutureBase):
|
|
|
return super().set_exception(*args, **kwargs)
|
|
|
|
|
|
|
|
|
-class Executor:
|
|
|
+class Executor(Generic[T]):
|
|
|
"""
|
|
|
This class provides an API for executing tasks in different contexts
|
|
|
(immediately, or asynchronously.)
|
|
@@ -140,7 +142,7 @@ class Executor:
|
|
|
|
|
|
Future = TimedFuture
|
|
|
|
|
|
- def submit(self, callable, priority=0, block=True, timeout=None) -> TimedFuture:
|
|
|
+ def submit(self, callable, priority=0, block=True, timeout=None) -> TimedFuture[T]:
|
|
|
"""
|
|
|
Enqueue a task to be executed, returning a ``TimedFuture``.
|
|
|
|
|
@@ -151,7 +153,7 @@ class Executor:
|
|
|
raise NotImplementedError
|
|
|
|
|
|
|
|
|
-class SynchronousExecutor(Executor):
|
|
|
+class SynchronousExecutor(Executor[T]):
|
|
|
"""
|
|
|
This executor synchronously executes callables in the current thread.
|
|
|
|
|
@@ -166,7 +168,7 @@ class SynchronousExecutor(Executor):
|
|
|
"""
|
|
|
Immediately execute a callable, returning a ``TimedFuture``.
|
|
|
"""
|
|
|
- future = self.Future()
|
|
|
+ future: FutureBase[T] = self.Future()
|
|
|
assert future.set_running_or_notify_cancel()
|
|
|
try:
|
|
|
result = callable()
|
|
@@ -177,7 +179,7 @@ class SynchronousExecutor(Executor):
|
|
|
return future
|
|
|
|
|
|
|
|
|
-class ThreadedExecutor(Executor):
|
|
|
+class ThreadedExecutor(Executor[T]):
|
|
|
"""\
|
|
|
This executor provides a method of executing callables in a threaded worker
|
|
|
pool. The number of outstanding requests can be limited by the ``maxsize``
|
|
@@ -192,7 +194,7 @@ class ThreadedExecutor(Executor):
|
|
|
self.__worker_count = worker_count
|
|
|
self.__workers = set()
|
|
|
self.__started = False
|
|
|
- self.__queue: PriorityQueue[PriorityTask] = PriorityQueue(maxsize)
|
|
|
+ self.__queue: PriorityQueue[PriorityTask[T]] = PriorityQueue(maxsize)
|
|
|
self.__lock = threading.Lock()
|
|
|
|
|
|
def __worker(self):
|
|
@@ -237,7 +239,7 @@ class ThreadedExecutor(Executor):
|
|
|
if not self.__started:
|
|
|
self.start()
|
|
|
|
|
|
- future = self.Future()
|
|
|
+ future: FutureBase[T] = self.Future()
|
|
|
task = PriorityTask(priority, (callable, Hub.current, future))
|
|
|
try:
|
|
|
self.__queue.put(task, block=block, timeout=timeout)
|