debounced_celery_task.py 3.3 KB

1234567891011121314151617181920212223242526272829303132333435363738394041424344454647484950515253545556575859606162636465666768697071727374757677787980818283848586878889909192
  1. """
  2. Based on https://gist.github.com/pardo/19d672235bbef6fa793a
  3. Debounced tasks should
  4. - Execute on first call
  5. - Execute on last call (last being the last call within a countdown time)
  6. - Execute periodically in between (every 10, 100, 1000th)
  7. Examples:
  8. - 1 call happens, execute immediately
  9. - 99 calls happen in 2 seconds - execute 1st, 10th 99th)
  10. - 1 call happens every second forever - execute 1st, 10th, 100th, 1000th, 2000th, etc
  11. """
  12. import functools
  13. from datetime import timedelta
  14. from django.conf import settings
  15. from django.core.cache import cache
  16. from django.utils import timezone
  17. from django.utils.dateparse import parse_datetime
  18. from django_redis import get_redis_connection
  19. from glitchtip.celery import app
  20. CACHE_PREFIX = ":1:" # Django cache version
  21. # Run task on each mark, last mark will repeat
  22. # 10th, 100th, 1000th, 2000th, 3000th, etc
  23. RUN_ON = [10, 100, 1000]
  24. def debounced_wrap(func):
  25. @functools.wraps(func)
  26. def wrapper(*args, **kwargs):
  27. key = kwargs.pop("key")
  28. call_count = kwargs.pop("call_count", 1)
  29. count = cache.get(key, 1)
  30. # If last task, or on every RUN_ON
  31. if count <= call_count or call_count in RUN_ON or call_count % RUN_ON[-1] == 0:
  32. return func(*args, **kwargs)
  33. return wrapper
  34. def debounced_task(key_generator):
  35. """
  36. :param func: must be the @debounced_wrap decorated with @task / @shared_task from celery
  37. :param key_generator: function that knows how to generate a key from
  38. args and kwargs passed to func or a constant str used in the cache
  39. key will be prepended with function module and name
  40. Run on first task, last task, and every few tasks in between
  41. :return: function that calls apply_async on the task keep that in mind when send the arguments
  42. """
  43. def decorator(func):
  44. @functools.wraps(func)
  45. def wrapper(**kwargs):
  46. func_args = kwargs.get("args", [])
  47. func_kwargs = kwargs.get("kwargs", {})
  48. key = f"{func.__module__}.{func.__name__}.{key_generator(*func_args, **func_kwargs)}"
  49. # Use countdown for expiration times on counter
  50. kwargs["countdown"] = kwargs.get("countdown", 10) # Defaults to 10
  51. countdown = kwargs["countdown"]
  52. # redis-cache incr treats None as 0
  53. try:
  54. with get_redis_connection("default") as con:
  55. call_count = con.incr(CACHE_PREFIX + key)
  56. # Reset expiration on each call
  57. # Only redis-cache supports expire
  58. cache.expire(key, timeout=countdown + 1)
  59. # django cache requires two non-atomic calls
  60. except NotImplementedError:
  61. # Fallback method is limited and may execute more than desired
  62. cache.add(key, 0, countdown)
  63. call_count = cache.incr(key)
  64. if call_count == 1:
  65. kwargs["countdown"] = 0 # Execute first task immediately
  66. # Task should never expire, but better to expire if workers are overloaded
  67. # than to queue up and break further
  68. kwargs["expire"] = countdown * 100
  69. func_kwargs.update({"key": key, "call_count": call_count})
  70. kwargs["args"] = func_args
  71. kwargs["kwargs"] = func_kwargs
  72. return func.apply_async(**kwargs)
  73. return wrapper
  74. return decorator