pagination.py 2.3 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566
  1. import logging
  2. import urllib.parse as urlparse
  3. from urllib.parse import parse_qs
  4. from rest_framework.exceptions import ValidationError
  5. from rest_framework.pagination import CursorPagination
  6. from rest_framework.response import Response
  7. logger = logging.getLogger(__name__)
  8. class LinkHeaderPagination(CursorPagination):
  9. """Inform the user of pagination links via response headers, similar to
  10. what's described in
  11. https://developer.github.com/guides/traversing-with-pagination/.
  12. """
  13. page_size_query_param = "limit"
  14. max_hits = 1000
  15. def paginate_queryset(self, queryset, request, view=None):
  16. try:
  17. page = super().paginate_queryset(queryset, request, view)
  18. if self.has_next:
  19. self.count = self.get_count(queryset)
  20. else:
  21. self.count = len(page)
  22. return page
  23. except ValueError as err:
  24. # https://gitlab.com/glitchtip/glitchtip-backend/-/issues/136
  25. logging.warning("Pagination received invalid cursor", exc_info=True)
  26. raise ValidationError("Invalid page cursor") from err
  27. def get_count(self, queryset):
  28. """Count with max limit, to prevent slowdown"""
  29. # Remove order by, it can slow down counts but has no effect
  30. return queryset.order_by()[: self.max_hits].count()
  31. def get_paginated_response(self, data):
  32. next_url = self.get_next_link()
  33. previous_url = self.get_previous_link()
  34. links = []
  35. for url, label in (
  36. (previous_url, "previous"),
  37. (next_url, "next"),
  38. ):
  39. if url is not None:
  40. parsed = urlparse.urlparse(url)
  41. cursor = parse_qs(parsed.query).get(self.cursor_query_param, [""])[0]
  42. links.append(
  43. '<{}>; rel="{}"; results="true"; cursor="{}"'.format(
  44. url, label, cursor
  45. )
  46. )
  47. else:
  48. links.append(
  49. '<{}>; rel="{}"; results="false"'.format(self.base_url, label)
  50. )
  51. headers = {"Link": ", ".join(links)} if links else {}
  52. headers["X-Max-Hits"] = self.max_hits
  53. headers["X-Hits"] = self.count
  54. return Response(data, headers=headers)