utils.py 603 B

12345678910111213141516171819202122
  1. import random
  2. import string
  3. from asgiref.sync import sync_to_async
  4. from django.conf import settings
  5. def get_random_string(length=16):
  6. letters = string.ascii_lowercase
  7. result_str = "".join(random.choice(letters) for i in range(length))
  8. return result_str
  9. async def async_call_celery_task(task, *args):
  10. """
  11. Either dispatch the real celery task or run it with sync_to_async
  12. This can be used for testing or a celery-less operation.
  13. """
  14. if settings.CELERY_TASK_ALWAYS_EAGER:
  15. return await sync_to_async(task.delay)(*args)
  16. else:
  17. return task.delay(*args)