123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320 |
- from base64 import b64decode, b64encode
- from dataclasses import dataclass
- from typing import Any, List, Optional
- from urllib import parse
- from django.db.models import QuerySet
- from django.http import HttpRequest
- from django.utils.translation import gettext as _
- from ninja import Field, Schema
- from ninja.pagination import PaginationBase
- from pydantic import field_validator
- # Code in this file is taken directly from https://github.com/vitalik/django-ninja/pull/836
- # and should be swapped out once that MR is merged.
- @dataclass
- class Cursor:
- offset: int = 0
- reverse: bool = False
- position: Optional[str] = None
- def _clamp(val: int, min_: int, max_: int) -> int:
- return max(min_, min(val, max_))
- def _reverse_order(order: tuple) -> tuple:
- """
- Reverse the ordering specification for a Django ORM query.
- Given an order_by tuple such as `('-created', 'uuid')` reverse the
- ordering and return a new tuple, eg. `('created', '-uuid')`.
- """
- def invert(x: str) -> str:
- return x[1:] if x.startswith("-") else f"-{x}"
- return tuple(invert(item) for item in order)
- def _replace_query_param(url: str, key: str, val: str) -> str:
- scheme, netloc, path, query, fragment = parse.urlsplit(url)
- query_dict = parse.parse_qs(query, keep_blank_values=True)
- query_dict[key] = [val]
- query = parse.urlencode(sorted(query_dict.items()), doseq=True)
- return parse.urlunsplit((scheme, netloc, path, query, fragment))
- class CursorPagination(PaginationBase):
- class Input(Schema):
- limit: Optional[int] = Field(
- None, description=_("Number of results to return per page.")
- )
- cursor: Optional[str] = Field(
- None, description=_("The pagination cursor value."), validate_default=True
- )
- @field_validator("cursor")
- @classmethod
- def decode_cursor(cls, encoded_cursor: Optional[str]) -> Cursor:
- if encoded_cursor is None:
- return Cursor()
- try:
- querystring = b64decode(encoded_cursor).decode()
- tokens = parse.parse_qs(querystring, keep_blank_values=True)
- offset = int(tokens.get("o", ["0"])[0])
- offset = _clamp(offset, 0, CursorPagination._offset_cutoff)
- reverse = tokens.get("r", ["0"])[0]
- reverse = bool(int(reverse))
- position = tokens.get("p", [None])[0]
- except (TypeError, ValueError) as e:
- raise ValueError(_("Invalid cursor.")) from e
- return Cursor(offset=offset, reverse=reverse, position=position)
- class Output(Schema):
- results: List[Any] = Field(description=_("The page of objects."))
- count: int = Field(
- description=_("The total number of results across all pages.")
- )
- next: Optional[str] = Field(
- description=_("URL of next page of results if there is one.")
- )
- previous: Optional[str] = Field(
- description=_("URL of previous page of results if there is one.")
- )
- items_attribute = "results"
- default_ordering = ("-created",)
- max_page_size = 100
- _offset_cutoff = 100 # limit to protect against possibly malicious queries
- def paginate_queryset(
- self, queryset: QuerySet, pagination: Input, request: HttpRequest, **params
- ) -> dict:
- limit = _clamp(pagination.limit or self.max_page_size, 0, self.max_page_size)
- if not queryset.query.order_by:
- queryset = queryset.order_by(*self.default_ordering)
- order = queryset.query.order_by
- total_count = queryset.count()
- base_url = request.build_absolute_uri()
- cursor = pagination.cursor
- if cursor.reverse:
- queryset = queryset.order_by(*_reverse_order(order))
- if cursor.position is not None:
- is_reversed = order[0].startswith("-")
- order_attr = order[0].lstrip("-")
- if cursor.reverse != is_reversed:
- queryset = queryset.filter(**{f"{order_attr}__lt": cursor.position})
- else:
- queryset = queryset.filter(**{f"{order_attr}__gt": cursor.position})
- # If we have an offset cursor then offset the entire page by that amount.
- # We also always fetch an extra item in order to determine if there is a
- # page following on from this one.
- results = list(queryset[cursor.offset : cursor.offset + limit + 1])
- page = list(results[:limit])
- # Determine the position of the final item following the page.
- if len(results) > len(page):
- has_following_position = True
- following_position = self._get_position_from_instance(results[-1], order)
- else:
- has_following_position = False
- following_position = None
- if cursor.reverse:
- # If we have a reverse queryset, then the query ordering was in reverse
- # so we need to reverse the items again before returning them to the user.
- page = list(reversed(page))
- has_next = (cursor.position is not None) or (cursor.offset > 0)
- has_previous = has_following_position
- next_position = cursor.position if has_next else None
- previous_position = following_position if has_previous else None
- else:
- has_next = has_following_position
- has_previous = (cursor.position is not None) or (cursor.offset > 0)
- next_position = following_position if has_next else None
- previous_position = cursor.position if has_previous else None
- return {
- "results": page,
- "count": total_count,
- "next": self.next_link(
- base_url,
- page,
- cursor,
- order,
- has_previous,
- limit,
- next_position,
- previous_position,
- )
- if has_next
- else None,
- "previous": self.previous_link(
- base_url,
- page,
- cursor,
- order,
- has_next,
- limit,
- next_position,
- previous_position,
- )
- if has_previous
- else None,
- }
- def _encode_cursor(self, cursor: Cursor, base_url: str) -> str:
- tokens = {}
- if cursor.offset != 0:
- tokens["o"] = str(cursor.offset)
- if cursor.reverse:
- tokens["r"] = "1"
- if cursor.position is not None:
- tokens["p"] = cursor.position
- querystring = parse.urlencode(tokens, doseq=True)
- encoded = b64encode(querystring.encode()).decode()
- return _replace_query_param(base_url, "cursor", encoded)
- def next_link(
- self,
- base_url: str,
- page: list,
- cursor: Cursor,
- order: tuple,
- has_previous: bool,
- limit: int,
- next_position: str,
- previous_position: str,
- ) -> str:
- if page and cursor.reverse and cursor.offset:
- # If we're reversing direction and we have an offset cursor
- # then we cannot use the first position we find as a marker.
- compare = self._get_position_from_instance(page[-1], order)
- else:
- compare = next_position
- offset = 0
- has_item_with_unique_position = False
- for item in reversed(page):
- position = self._get_position_from_instance(item, order)
- if position != compare:
- # The item in this position and the item following it
- # have different positions. We can use this position as
- # our marker.
- has_item_with_unique_position = True
- break
- # The item in this position has the same position as the item
- # following it, we can't use it as a marker position, so increment
- # the offset and keep seeking to the previous item.
- compare = position
- offset += 1
- if page and not has_item_with_unique_position:
- # There were no unique positions in the page.
- if not has_previous:
- # We are on the first page.
- # Our cursor will have an offset equal to the page size,
- # but no position to filter against yet.
- offset = limit
- position = None
- elif cursor.reverse:
- # The change in direction will introduce a paging artifact,
- # where we end up skipping forward a few extra items.
- offset = 0
- position = previous_position
- else:
- # Use the position from the existing cursor and increment
- # it's offset by the page size.
- offset = cursor.offset + limit
- position = previous_position
- if not page:
- position = next_position
- next_cursor = Cursor(offset=offset, reverse=False, position=position)
- return self._encode_cursor(next_cursor, base_url)
- def previous_link(
- self,
- base_url: str,
- page: list,
- cursor: Cursor,
- order: tuple,
- has_next: bool,
- limit: int,
- next_position: str,
- previous_position: str,
- ):
- if page and not cursor.reverse and cursor.offset:
- # If we're reversing direction and we have an offset cursor
- # then we cannot use the first position we find as a marker.
- compare = self._get_position_from_instance(page[0], order)
- else:
- compare = previous_position
- offset = 0
- has_item_with_unique_position = False
- for item in page:
- position = self._get_position_from_instance(item, order)
- if position != compare:
- # The item in this position and the item following it
- # have different positions. We can use this position as
- # our marker.
- has_item_with_unique_position = True
- break
- # The item in this position has the same position as the item
- # following it, we can't use it as a marker position, so increment
- # the offset and keep seeking to the previous item.
- compare = position
- offset += 1
- if page and not has_item_with_unique_position:
- # There were no unique positions in the page.
- if not has_next:
- # We are on the final page.
- # Our cursor will have an offset equal to the page size,
- # but no position to filter against yet.
- offset = limit
- position = None
- elif cursor.reverse:
- # Use the position from the existing cursor and increment
- # it's offset by the page size.
- offset = cursor.offset + limit
- position = next_position
- else:
- # The change in direction will introduce a paging artifact,
- # where we end up skipping back a few extra items.
- offset = 0
- position = next_position
- if not page:
- position = previous_position
- cursor = Cursor(offset=offset, reverse=True, position=position)
- return self._encode_cursor(cursor, base_url)
- def _get_position_from_instance(self, instance, ordering) -> str:
- field_name = ordering[0].lstrip("-")
- if isinstance(instance, dict):
- attr = instance[field_name]
- else:
- attr = getattr(instance, field_name)
- return str(attr)
|