pagination.py 4.3 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141
  1. from typing import TYPE_CHECKING
  2. from urllib import parse
  3. from asgiref.sync import sync_to_async
  4. from django.http import HttpRequest, HttpResponse
  5. from ninja.conf import settings as ninja_settings
  6. from .cursor_pagination import CursorPagination, _clamp, _reverse_order
  7. if TYPE_CHECKING:
  8. from django.db.models import QuerySet
  9. class AsyncLinkHeaderPagination(CursorPagination):
  10. max_hits = 1000
  11. # Remove Output schema because we only want to return a list of items
  12. Output = None
  13. async def apaginate_queryset(
  14. self,
  15. queryset: "QuerySet",
  16. pagination: CursorPagination.Input,
  17. request: HttpRequest,
  18. response: HttpResponse,
  19. **params,
  20. ) -> dict:
  21. limit = _clamp(
  22. pagination.limit or ninja_settings.PAGINATION_PER_PAGE,
  23. 0,
  24. self.max_page_size,
  25. )
  26. full_queryset = queryset
  27. if not queryset.query.order_by:
  28. queryset = queryset.order_by(*self.default_ordering)
  29. order = queryset.query.order_by
  30. base_url = request.build_absolute_uri()
  31. cursor = pagination.cursor
  32. if cursor.reverse:
  33. queryset = queryset.order_by(*_reverse_order(order))
  34. if cursor.position is not None:
  35. is_reversed = order[0].startswith("-")
  36. order_attr = order[0].lstrip("-")
  37. if cursor.reverse != is_reversed:
  38. queryset = queryset.filter(**{f"{order_attr}__lt": cursor.position})
  39. else:
  40. queryset = queryset.filter(**{f"{order_attr}__gt": cursor.position})
  41. @sync_to_async
  42. def get_results():
  43. return list(queryset[cursor.offset : cursor.offset + limit + 1])
  44. results = await get_results()
  45. page = list(results[:limit])
  46. if len(results) > len(page):
  47. has_following_position = True
  48. following_position = self._get_position_from_instance(results[-1], order)
  49. else:
  50. has_following_position = False
  51. following_position = None
  52. if cursor.reverse:
  53. page = list(reversed(page))
  54. has_next = (cursor.position is not None) or (cursor.offset > 0)
  55. has_previous = has_following_position
  56. next_position = cursor.position if has_next else None
  57. previous_position = following_position if has_previous else None
  58. else:
  59. has_next = has_following_position
  60. has_previous = (cursor.position is not None) or (cursor.offset > 0)
  61. next_position = following_position if has_next else None
  62. previous_position = cursor.position if has_previous else None
  63. next = (
  64. self.next_link(
  65. base_url,
  66. page,
  67. cursor,
  68. order,
  69. has_previous,
  70. limit,
  71. next_position,
  72. previous_position,
  73. )
  74. if has_next
  75. else None
  76. )
  77. previous = (
  78. self.previous_link(
  79. base_url,
  80. page,
  81. cursor,
  82. order,
  83. has_next,
  84. limit,
  85. next_position,
  86. previous_position,
  87. )
  88. if has_previous
  89. else None
  90. )
  91. total_count = 0
  92. if has_next or has_previous:
  93. total_count = await self._aitems_count(full_queryset)
  94. else:
  95. total_count = len(page)
  96. links = []
  97. for url, label in (
  98. (previous, "previous"),
  99. (next, "next"),
  100. ):
  101. if url is not None:
  102. parsed = parse.urlparse(url)
  103. cursor = parse.parse_qs(parsed.query).get("cursor", [""])[0]
  104. links.append(
  105. '<{}>; rel="{}"; results="true"; cursor="{}"'.format(
  106. url, label, cursor
  107. )
  108. )
  109. else:
  110. links.append('<{}>; rel="{}"; results="false"'.format(base_url, label))
  111. response["Link"] = {", ".join(links)} if links else {}
  112. response["X-Max-Hits"] = self.max_hits
  113. response["X-Hits"] = total_count
  114. return page
  115. async def _aitems_count(self, queryset: "QuerySet") -> int:
  116. return await queryset.order_by()[: self.max_hits].acount() # type: ignore