cursor_pagination.py 12 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320
  1. from base64 import b64decode, b64encode
  2. from dataclasses import dataclass
  3. from typing import Any, List, Optional
  4. from urllib import parse
  5. from django.db.models import QuerySet
  6. from django.http import HttpRequest
  7. from django.utils.translation import gettext as _
  8. from ninja import Field, Schema
  9. from ninja.pagination import PaginationBase
  10. from pydantic import field_validator
  11. # Code in this file is taken directly from https://github.com/vitalik/django-ninja/pull/836
  12. # and should be swapped out once that MR is merged.
  13. @dataclass
  14. class Cursor:
  15. offset: int = 0
  16. reverse: bool = False
  17. position: Optional[str] = None
  18. def _clamp(val: int, min_: int, max_: int) -> int:
  19. return max(min_, min(val, max_))
  20. def _reverse_order(order: tuple) -> tuple:
  21. """
  22. Reverse the ordering specification for a Django ORM query.
  23. Given an order_by tuple such as `('-created', 'uuid')` reverse the
  24. ordering and return a new tuple, eg. `('created', '-uuid')`.
  25. """
  26. def invert(x: str) -> str:
  27. return x[1:] if x.startswith("-") else f"-{x}"
  28. return tuple(invert(item) for item in order)
  29. def _replace_query_param(url: str, key: str, val: str) -> str:
  30. scheme, netloc, path, query, fragment = parse.urlsplit(url)
  31. query_dict = parse.parse_qs(query, keep_blank_values=True)
  32. query_dict[key] = [val]
  33. query = parse.urlencode(sorted(query_dict.items()), doseq=True)
  34. return parse.urlunsplit((scheme, netloc, path, query, fragment))
  35. class CursorPagination(PaginationBase):
  36. class Input(Schema):
  37. limit: Optional[int] = Field(
  38. None, description=_("Number of results to return per page.")
  39. )
  40. cursor: Optional[str] = Field(
  41. None, description=_("The pagination cursor value."), validate_default=True
  42. )
  43. @field_validator("cursor")
  44. @classmethod
  45. def decode_cursor(cls, encoded_cursor: Optional[str]) -> Cursor:
  46. if encoded_cursor is None:
  47. return Cursor()
  48. try:
  49. querystring = b64decode(encoded_cursor).decode()
  50. tokens = parse.parse_qs(querystring, keep_blank_values=True)
  51. offset = int(tokens.get("o", ["0"])[0])
  52. offset = _clamp(offset, 0, CursorPagination._offset_cutoff)
  53. reverse = tokens.get("r", ["0"])[0]
  54. reverse = bool(int(reverse))
  55. position = tokens.get("p", [None])[0]
  56. except (TypeError, ValueError) as e:
  57. raise ValueError(_("Invalid cursor.")) from e
  58. return Cursor(offset=offset, reverse=reverse, position=position)
  59. class Output(Schema):
  60. results: List[Any] = Field(description=_("The page of objects."))
  61. count: int = Field(
  62. description=_("The total number of results across all pages.")
  63. )
  64. next: Optional[str] = Field(
  65. description=_("URL of next page of results if there is one.")
  66. )
  67. previous: Optional[str] = Field(
  68. description=_("URL of previous page of results if there is one.")
  69. )
  70. items_attribute = "results"
  71. default_ordering = ("-created",)
  72. max_page_size = 100
  73. _offset_cutoff = 100 # limit to protect against possibly malicious queries
  74. def paginate_queryset(
  75. self, queryset: QuerySet, pagination: Input, request: HttpRequest, **params
  76. ) -> dict:
  77. limit = _clamp(pagination.limit or self.max_page_size, 0, self.max_page_size)
  78. if not queryset.query.order_by:
  79. queryset = queryset.order_by(*self.default_ordering)
  80. order = queryset.query.order_by
  81. total_count = queryset.count()
  82. base_url = request.build_absolute_uri()
  83. cursor = pagination.cursor
  84. if cursor.reverse:
  85. queryset = queryset.order_by(*_reverse_order(order))
  86. if cursor.position is not None:
  87. is_reversed = order[0].startswith("-")
  88. order_attr = order[0].lstrip("-")
  89. if cursor.reverse != is_reversed:
  90. queryset = queryset.filter(**{f"{order_attr}__lt": cursor.position})
  91. else:
  92. queryset = queryset.filter(**{f"{order_attr}__gt": cursor.position})
  93. # If we have an offset cursor then offset the entire page by that amount.
  94. # We also always fetch an extra item in order to determine if there is a
  95. # page following on from this one.
  96. results = list(queryset[cursor.offset : cursor.offset + limit + 1])
  97. page = list(results[:limit])
  98. # Determine the position of the final item following the page.
  99. if len(results) > len(page):
  100. has_following_position = True
  101. following_position = self._get_position_from_instance(results[-1], order)
  102. else:
  103. has_following_position = False
  104. following_position = None
  105. if cursor.reverse:
  106. # If we have a reverse queryset, then the query ordering was in reverse
  107. # so we need to reverse the items again before returning them to the user.
  108. page = list(reversed(page))
  109. has_next = (cursor.position is not None) or (cursor.offset > 0)
  110. has_previous = has_following_position
  111. next_position = cursor.position if has_next else None
  112. previous_position = following_position if has_previous else None
  113. else:
  114. has_next = has_following_position
  115. has_previous = (cursor.position is not None) or (cursor.offset > 0)
  116. next_position = following_position if has_next else None
  117. previous_position = cursor.position if has_previous else None
  118. return {
  119. "results": page,
  120. "count": total_count,
  121. "next": self.next_link(
  122. base_url,
  123. page,
  124. cursor,
  125. order,
  126. has_previous,
  127. limit,
  128. next_position,
  129. previous_position,
  130. )
  131. if has_next
  132. else None,
  133. "previous": self.previous_link(
  134. base_url,
  135. page,
  136. cursor,
  137. order,
  138. has_next,
  139. limit,
  140. next_position,
  141. previous_position,
  142. )
  143. if has_previous
  144. else None,
  145. }
  146. def _encode_cursor(self, cursor: Cursor, base_url: str) -> str:
  147. tokens = {}
  148. if cursor.offset != 0:
  149. tokens["o"] = str(cursor.offset)
  150. if cursor.reverse:
  151. tokens["r"] = "1"
  152. if cursor.position is not None:
  153. tokens["p"] = cursor.position
  154. querystring = parse.urlencode(tokens, doseq=True)
  155. encoded = b64encode(querystring.encode()).decode()
  156. return _replace_query_param(base_url, "cursor", encoded)
  157. def next_link(
  158. self,
  159. base_url: str,
  160. page: list,
  161. cursor: Cursor,
  162. order: tuple,
  163. has_previous: bool,
  164. limit: int,
  165. next_position: str,
  166. previous_position: str,
  167. ) -> str:
  168. if page and cursor.reverse and cursor.offset:
  169. # If we're reversing direction and we have an offset cursor
  170. # then we cannot use the first position we find as a marker.
  171. compare = self._get_position_from_instance(page[-1], order)
  172. else:
  173. compare = next_position
  174. offset = 0
  175. has_item_with_unique_position = False
  176. for item in reversed(page):
  177. position = self._get_position_from_instance(item, order)
  178. if position != compare:
  179. # The item in this position and the item following it
  180. # have different positions. We can use this position as
  181. # our marker.
  182. has_item_with_unique_position = True
  183. break
  184. # The item in this position has the same position as the item
  185. # following it, we can't use it as a marker position, so increment
  186. # the offset and keep seeking to the previous item.
  187. compare = position
  188. offset += 1
  189. if page and not has_item_with_unique_position:
  190. # There were no unique positions in the page.
  191. if not has_previous:
  192. # We are on the first page.
  193. # Our cursor will have an offset equal to the page size,
  194. # but no position to filter against yet.
  195. offset = limit
  196. position = None
  197. elif cursor.reverse:
  198. # The change in direction will introduce a paging artifact,
  199. # where we end up skipping forward a few extra items.
  200. offset = 0
  201. position = previous_position
  202. else:
  203. # Use the position from the existing cursor and increment
  204. # it's offset by the page size.
  205. offset = cursor.offset + limit
  206. position = previous_position
  207. if not page:
  208. position = next_position
  209. next_cursor = Cursor(offset=offset, reverse=False, position=position)
  210. return self._encode_cursor(next_cursor, base_url)
  211. def previous_link(
  212. self,
  213. base_url: str,
  214. page: list,
  215. cursor: Cursor,
  216. order: tuple,
  217. has_next: bool,
  218. limit: int,
  219. next_position: str,
  220. previous_position: str,
  221. ):
  222. if page and not cursor.reverse and cursor.offset:
  223. # If we're reversing direction and we have an offset cursor
  224. # then we cannot use the first position we find as a marker.
  225. compare = self._get_position_from_instance(page[0], order)
  226. else:
  227. compare = previous_position
  228. offset = 0
  229. has_item_with_unique_position = False
  230. for item in page:
  231. position = self._get_position_from_instance(item, order)
  232. if position != compare:
  233. # The item in this position and the item following it
  234. # have different positions. We can use this position as
  235. # our marker.
  236. has_item_with_unique_position = True
  237. break
  238. # The item in this position has the same position as the item
  239. # following it, we can't use it as a marker position, so increment
  240. # the offset and keep seeking to the previous item.
  241. compare = position
  242. offset += 1
  243. if page and not has_item_with_unique_position:
  244. # There were no unique positions in the page.
  245. if not has_next:
  246. # We are on the final page.
  247. # Our cursor will have an offset equal to the page size,
  248. # but no position to filter against yet.
  249. offset = limit
  250. position = None
  251. elif cursor.reverse:
  252. # Use the position from the existing cursor and increment
  253. # it's offset by the page size.
  254. offset = cursor.offset + limit
  255. position = next_position
  256. else:
  257. # The change in direction will introduce a paging artifact,
  258. # where we end up skipping back a few extra items.
  259. offset = 0
  260. position = next_position
  261. if not page:
  262. position = previous_position
  263. cursor = Cursor(offset=offset, reverse=True, position=position)
  264. return self._encode_cursor(cursor, base_url)
  265. def _get_position_from_instance(self, instance, ordering) -> str:
  266. field_name = ordering[0].lstrip("-")
  267. if isinstance(instance, dict):
  268. attr = instance[field_name]
  269. else:
  270. attr = getattr(instance, field_name)
  271. return str(attr)