debounced_celery_task.py 1.6 KB

12345678910111213141516171819202122232425262728293031323334353637383940414243444546
  1. # Credit https://gist.github.com/pardo/19d672235bbef6fa793a
  2. import functools
  3. from django.core.cache import cache
  4. def debounced_wrap(func):
  5. @functools.wraps(func)
  6. def wrapper(*args, **kwargs):
  7. key = kwargs.pop("key") # it's required
  8. call_count = kwargs.pop("call_count", 1)
  9. count = cache.get(key, 1)
  10. if count > call_count:
  11. # someone called the function again before the this was executed
  12. return None
  13. # I'm the last call
  14. return func(*args, **kwargs)
  15. return wrapper
  16. def debounced_task(key_generator):
  17. """
  18. :param func: must be the @debounced_wrap decorated with @task / @shared_task from celery
  19. :param key_generator: function that knows how to generate a key from
  20. args and kwargs passed to func or a constant str used in the cache
  21. key will be prepended with function module and name
  22. :return: function that calls apply_async on the task keep that in mind when send the arguments
  23. """
  24. def decorator(func):
  25. @functools.wraps(func)
  26. def wrapper(**kwargs):
  27. func_args = kwargs.get("args", [])
  28. func_kwargs = kwargs.get("kwargs", {})
  29. key = f"{func.__module__}.{func.__name__}.{key_generator(*func_args, **func_kwargs)}"
  30. cache.add(key, 0)
  31. call_count = cache.incr(key)
  32. func_kwargs.update({"key": key, "call_count": call_count})
  33. kwargs["args"] = func_args
  34. kwargs["kwargs"] = func_kwargs
  35. return func.apply_async(**kwargs)
  36. return wrapper
  37. return decorator