logging_pool.py 2.2 KB

1234567891011121314151617181920212223242526272829303132333435363738394041424344454647484950515253545556575859606162636465666768697071
  1. # Copyright 2015 gRPC authors.
  2. #
  3. # Licensed under the Apache License, Version 2.0 (the "License");
  4. # you may not use this file except in compliance with the License.
  5. # You may obtain a copy of the License at
  6. #
  7. # http://www.apache.org/licenses/LICENSE-2.0
  8. #
  9. # Unless required by applicable law or agreed to in writing, software
  10. # distributed under the License is distributed on an "AS IS" BASIS,
  11. # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  12. # See the License for the specific language governing permissions and
  13. # limitations under the License.
  14. """A thread pool that logs exceptions raised by tasks executed within it."""
  15. from concurrent import futures
  16. import logging
  17. _LOGGER = logging.getLogger(__name__)
  18. def _wrap(behavior):
  19. """Wraps an arbitrary callable behavior in exception-logging."""
  20. def _wrapping(*args, **kwargs):
  21. try:
  22. return behavior(*args, **kwargs)
  23. except Exception:
  24. _LOGGER.exception(
  25. 'Unexpected exception from %s executed in logging pool!',
  26. behavior)
  27. raise
  28. return _wrapping
  29. class _LoggingPool(object):
  30. """An exception-logging futures.ThreadPoolExecutor-compatible thread pool."""
  31. def __init__(self, backing_pool):
  32. self._backing_pool = backing_pool
  33. def __enter__(self):
  34. return self
  35. def __exit__(self, exc_type, exc_val, exc_tb):
  36. self._backing_pool.shutdown(wait=True)
  37. def submit(self, fn, *args, **kwargs):
  38. return self._backing_pool.submit(_wrap(fn), *args, **kwargs)
  39. def map(self, func, *iterables, **kwargs):
  40. return self._backing_pool.map(_wrap(func),
  41. *iterables,
  42. timeout=kwargs.get('timeout', None))
  43. def shutdown(self, wait=True):
  44. self._backing_pool.shutdown(wait=wait)
  45. def pool(max_workers):
  46. """Creates a thread pool that logs exceptions raised by the tasks within it.
  47. Args:
  48. max_workers: The maximum number of worker threads to allow the pool.
  49. Returns:
  50. A futures.ThreadPoolExecutor-compatible thread pool that logs exceptions
  51. raised by the tasks executed within it.
  52. """
  53. return _LoggingPool(futures.ThreadPoolExecutor(max_workers))