aiohttp.py 6.6 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184
  1. # Copyright 2024 Google LLC
  2. #
  3. # Licensed under the Apache License, Version 2.0 (the "License");
  4. # you may not use this file except in compliance with the License.
  5. # You may obtain a copy of the License at
  6. #
  7. # http://www.apache.org/licenses/LICENSE-2.0
  8. #
  9. # Unless required by applicable law or agreed to in writing, software
  10. # distributed under the License is distributed on an "AS IS" BASIS,
  11. # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  12. # See the License for the specific language governing permissions and
  13. # limitations under the License.
  14. """Transport adapter for Asynchronous HTTP Requests based on aiohttp.
  15. """
  16. import asyncio
  17. from typing import AsyncGenerator, Mapping, Optional
  18. try:
  19. import aiohttp # type: ignore
  20. except ImportError as caught_exc: # pragma: NO COVER
  21. raise ImportError(
  22. "The aiohttp library is not installed from please install the aiohttp package to use the aiohttp transport."
  23. ) from caught_exc
  24. from google.auth import _helpers
  25. from google.auth import exceptions
  26. from google.auth.aio import transport
  27. class Response(transport.Response):
  28. """
  29. Represents an HTTP response and its data. It is returned by ``google.auth.aio.transport.sessions.AsyncAuthorizedSession``.
  30. Args:
  31. response (aiohttp.ClientResponse): An instance of aiohttp.ClientResponse.
  32. Attributes:
  33. status_code (int): The HTTP status code of the response.
  34. headers (Mapping[str, str]): The HTTP headers of the response.
  35. """
  36. def __init__(self, response: aiohttp.ClientResponse):
  37. self._response = response
  38. @property
  39. @_helpers.copy_docstring(transport.Response)
  40. def status_code(self) -> int:
  41. return self._response.status
  42. @property
  43. @_helpers.copy_docstring(transport.Response)
  44. def headers(self) -> Mapping[str, str]:
  45. return {key: value for key, value in self._response.headers.items()}
  46. @_helpers.copy_docstring(transport.Response)
  47. async def content(self, chunk_size: int = 1024) -> AsyncGenerator[bytes, None]:
  48. try:
  49. async for chunk in self._response.content.iter_chunked(
  50. chunk_size
  51. ): # pragma: no branch
  52. yield chunk
  53. except aiohttp.ClientPayloadError as exc:
  54. raise exceptions.ResponseError(
  55. "Failed to read from the payload stream."
  56. ) from exc
  57. @_helpers.copy_docstring(transport.Response)
  58. async def read(self) -> bytes:
  59. try:
  60. return await self._response.read()
  61. except aiohttp.ClientResponseError as exc:
  62. raise exceptions.ResponseError("Failed to read the response body.") from exc
  63. @_helpers.copy_docstring(transport.Response)
  64. async def close(self):
  65. self._response.close()
  66. class Request(transport.Request):
  67. """Asynchronous Requests request adapter.
  68. This class is used internally for making requests using aiohttp
  69. in a consistent way. If you use :class:`google.auth.aio.transport.sessions.AsyncAuthorizedSession`
  70. you do not need to construct or use this class directly.
  71. This class can be useful if you want to configure a Request callable
  72. with a custom ``aiohttp.ClientSession`` in :class:`AuthorizedSession` or if
  73. you want to manually refresh a :class:`~google.auth.aio.credentials.Credentials` instance::
  74. import aiohttp
  75. import google.auth.aio.transport.aiohttp
  76. # Default example:
  77. request = google.auth.aio.transport.aiohttp.Request()
  78. await credentials.refresh(request)
  79. # Custom aiohttp Session Example:
  80. session = session=aiohttp.ClientSession(auto_decompress=False)
  81. request = google.auth.aio.transport.aiohttp.Request(session=session)
  82. auth_sesion = google.auth.aio.transport.sessions.AsyncAuthorizedSession(auth_request=request)
  83. Args:
  84. session (aiohttp.ClientSession): An instance :class:`aiohttp.ClientSession` used
  85. to make HTTP requests. If not specified, a session will be created.
  86. .. automethod:: __call__
  87. """
  88. def __init__(self, session: aiohttp.ClientSession = None):
  89. self._session = session
  90. self._closed = False
  91. async def __call__(
  92. self,
  93. url: str,
  94. method: str = "GET",
  95. body: Optional[bytes] = None,
  96. headers: Optional[Mapping[str, str]] = None,
  97. timeout: float = transport._DEFAULT_TIMEOUT_SECONDS,
  98. **kwargs,
  99. ) -> transport.Response:
  100. """
  101. Make an HTTP request using aiohttp.
  102. Args:
  103. url (str): The URL to be requested.
  104. method (Optional[str]):
  105. The HTTP method to use for the request. Defaults to 'GET'.
  106. body (Optional[bytes]):
  107. The payload or body in HTTP request.
  108. headers (Optional[Mapping[str, str]]):
  109. Request headers.
  110. timeout (float): The number of seconds to wait for a
  111. response from the server. If not specified or if None, the
  112. requests default timeout will be used.
  113. kwargs: Additional arguments passed through to the underlying
  114. aiohttp :meth:`aiohttp.Session.request` method.
  115. Returns:
  116. google.auth.aio.transport.Response: The HTTP response.
  117. Raises:
  118. - google.auth.exceptions.TransportError: If the request fails or if the session is closed.
  119. - google.auth.exceptions.TimeoutError: If the request times out.
  120. """
  121. try:
  122. if self._closed:
  123. raise exceptions.TransportError("session is closed.")
  124. if not self._session:
  125. self._session = aiohttp.ClientSession()
  126. client_timeout = aiohttp.ClientTimeout(total=timeout)
  127. response = await self._session.request(
  128. method,
  129. url,
  130. data=body,
  131. headers=headers,
  132. timeout=client_timeout,
  133. **kwargs,
  134. )
  135. return Response(response)
  136. except aiohttp.ClientError as caught_exc:
  137. client_exc = exceptions.TransportError(f"Failed to send request to {url}.")
  138. raise client_exc from caught_exc
  139. except asyncio.TimeoutError as caught_exc:
  140. timeout_exc = exceptions.TimeoutError(
  141. f"Request timed out after {timeout} seconds."
  142. )
  143. raise timeout_exc from caught_exc
  144. async def close(self) -> None:
  145. """
  146. Close the underlying aiohttp session to release the acquired resources.
  147. """
  148. if not self._closed and self._session:
  149. await self._session.close()
  150. self._closed = True