debounced_celery_task.py 3.2 KB

1234567891011121314151617181920212223242526272829303132333435363738394041424344454647484950515253545556575859606162636465666768697071727374757677787980818283848586878889
  1. """
  2. Based on https://gist.github.com/pardo/19d672235bbef6fa793a
  3. Debounced tasks should
  4. - Execute on first call
  5. - Execute on last call (last being the last call within a countdown time)
  6. - Execute periodically in between (every 10, 100, 1000th)
  7. Examples:
  8. - 1 call happens, execute immediately
  9. - 99 calls happen in 2 seconds - execute 1st, 10th 99th)
  10. - 1 call happens every second forever - execute 1st, 10th, 100th, 1000th, 2000th, etc
  11. """
  12. import functools
  13. from django.conf import settings
  14. from django.core.cache import cache
  15. from django_redis import get_redis_connection
  16. CACHE_PREFIX = ":1:" # Django cache version
  17. # Run task on each mark, last mark will repeat
  18. # 30th, 250th, 1000th, 2000th, etc
  19. RUN_ON = [30, 250, 1000]
  20. def debounced_wrap(func):
  21. @functools.wraps(func)
  22. def wrapper(*args, **kwargs):
  23. key = kwargs.pop("key")
  24. call_count = kwargs.pop("call_count", 1)
  25. count = cache.get(key, 1)
  26. # If last task, or on every RUN_ON
  27. if count <= call_count or call_count in RUN_ON or call_count % RUN_ON[-1] == 0:
  28. return func(*args, **kwargs)
  29. return wrapper
  30. def debounced_task(key_generator):
  31. """
  32. :param func: must be the @debounced_wrap decorated with @task / @shared_task from celery
  33. :param key_generator: function that knows how to generate a key from
  34. args and kwargs passed to func or a constant str used in the cache
  35. key will be prepended with function module and name
  36. Run on first task, last task, and every few tasks in between
  37. :return: function that calls apply_async on the task keep that in mind when send the arguments
  38. """
  39. def decorator(func):
  40. @functools.wraps(func)
  41. def wrapper(**kwargs):
  42. func_args = kwargs.get("args", [])
  43. func_kwargs = kwargs.get("kwargs", {})
  44. key = f"{func.__module__}.{func.__name__}.{key_generator(*func_args, **func_kwargs)}"
  45. # Use countdown for expiration times on counter
  46. kwargs["countdown"] = kwargs.get(
  47. "countdown", settings.TASK_DEBOUNCE_DELAY
  48. ) # Defaults to 30
  49. countdown = kwargs["countdown"]
  50. # redis-cache incr treats None as 0
  51. try:
  52. with get_redis_connection("default") as con:
  53. call_count = con.incr(CACHE_PREFIX + key)
  54. # Reset expiration on each call
  55. # Only redis-cache supports expire
  56. cache.expire(key, timeout=countdown + 1)
  57. # django cache requires two non-atomic calls
  58. except NotImplementedError:
  59. # Fallback method is limited and may execute more than desired
  60. cache.add(key, 0, countdown)
  61. call_count = cache.incr(key)
  62. if call_count == 1:
  63. kwargs["countdown"] = 0 # Execute first task immediately
  64. # Task should never expire, but better to expire if workers are overloaded
  65. # than to queue up and break further
  66. kwargs["expire"] = countdown * 100
  67. func_kwargs.update({"key": key, "call_count": call_count})
  68. kwargs["args"] = func_args
  69. kwargs["kwargs"] = func_kwargs
  70. return func.apply_async(**kwargs)
  71. return wrapper
  72. return decorator