123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170 |
- # Copyright 2024 Google LLC
- #
- # Licensed under the Apache License, Version 2.0 (the "License");
- # you may not use this file except in compliance with the License.
- # You may obtain a copy of the License at
- #
- # http://www.apache.org/licenses/LICENSE-2.0
- #
- # Unless required by applicable law or agreed to in writing, software
- # distributed under the License is distributed on an "AS IS" BASIS,
- # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- # See the License for the specific language governing permissions and
- # limitations under the License.
- import asyncio
- from aioresponses import aioresponses # type: ignore
- from mock import AsyncMock, Mock, patch
- import pytest # type: ignore
- import pytest_asyncio # type: ignore
- from google.auth import exceptions
- import google.auth.aio.transport.aiohttp as auth_aiohttp
- try:
- import aiohttp # type: ignore
- except ImportError as caught_exc: # pragma: NO COVER
- raise ImportError(
- "The aiohttp library is not installed from please install the aiohttp package to use the aiohttp transport."
- ) from caught_exc
- @pytest.fixture
- def mock_response():
- response = Mock()
- response.status = 200
- response.headers = {"Content-Type": "application/json", "Content-Length": "100"}
- mock_iterator = AsyncMock()
- mock_iterator.__aiter__.return_value = iter(
- [b"Cavefish ", b"have ", b"no ", b"sight."]
- )
- response.content.iter_chunked = lambda chunk_size: mock_iterator
- response.read = AsyncMock(return_value=b"Cavefish have no sight.")
- response.close = AsyncMock()
- return auth_aiohttp.Response(response)
- class TestResponse(object):
- @pytest.mark.asyncio
- async def test_response_status_code(self, mock_response):
- assert mock_response.status_code == 200
- @pytest.mark.asyncio
- async def test_response_headers(self, mock_response):
- assert mock_response.headers["Content-Type"] == "application/json"
- assert mock_response.headers["Content-Length"] == "100"
- @pytest.mark.asyncio
- async def test_response_content(self, mock_response):
- content = b"".join([chunk async for chunk in mock_response.content()])
- assert content == b"Cavefish have no sight."
- @pytest.mark.asyncio
- async def test_response_content_raises_error(self, mock_response):
- with patch.object(
- mock_response._response.content,
- "iter_chunked",
- side_effect=aiohttp.ClientPayloadError,
- ):
- with pytest.raises(exceptions.ResponseError) as exc:
- [chunk async for chunk in mock_response.content()]
- exc.match("Failed to read from the payload stream")
- @pytest.mark.asyncio
- async def test_response_read(self, mock_response):
- content = await mock_response.read()
- assert content == b"Cavefish have no sight."
- @pytest.mark.asyncio
- async def test_response_read_raises_error(self, mock_response):
- with patch.object(
- mock_response._response,
- "read",
- side_effect=aiohttp.ClientResponseError(None, None),
- ):
- with pytest.raises(exceptions.ResponseError) as exc:
- await mock_response.read()
- exc.match("Failed to read the response body.")
- @pytest.mark.asyncio
- async def test_response_close(self, mock_response):
- await mock_response.close()
- mock_response._response.close.assert_called_once()
- @pytest.mark.asyncio
- async def test_response_content_stream(self, mock_response):
- itr = mock_response.content().__aiter__()
- content = []
- try:
- while True:
- chunk = await itr.__anext__()
- content.append(chunk)
- except StopAsyncIteration:
- pass
- assert b"".join(content) == b"Cavefish have no sight."
- @pytest.mark.asyncio
- class TestRequest:
- @pytest_asyncio.fixture
- async def aiohttp_request(self):
- request = auth_aiohttp.Request()
- yield request
- await request.close()
- async def test_request_call_success(self, aiohttp_request):
- with aioresponses() as m:
- mocked_chunks = [b"Cavefish ", b"have ", b"no ", b"sight."]
- mocked_response = b"".join(mocked_chunks)
- m.get("http://example.com", status=200, body=mocked_response)
- response = await aiohttp_request("http://example.com")
- assert response.status_code == 200
- assert response.headers == {"Content-Type": "application/json"}
- content = b"".join([chunk async for chunk in response.content()])
- assert content == b"Cavefish have no sight."
- async def test_request_call_success_with_provided_session(self):
- mock_session = aiohttp.ClientSession()
- request = auth_aiohttp.Request(mock_session)
- with aioresponses() as m:
- mocked_chunks = [b"Cavefish ", b"have ", b"no ", b"sight."]
- mocked_response = b"".join(mocked_chunks)
- m.get("http://example.com", status=200, body=mocked_response)
- response = await request("http://example.com")
- assert response.status_code == 200
- assert response.headers == {"Content-Type": "application/json"}
- content = b"".join([chunk async for chunk in response.content()])
- assert content == b"Cavefish have no sight."
- async def test_request_call_raises_client_error(self, aiohttp_request):
- with aioresponses() as m:
- m.get("http://example.com", exception=aiohttp.ClientError)
- with pytest.raises(exceptions.TransportError) as exc:
- await aiohttp_request("http://example.com/api")
- exc.match("Failed to send request to http://example.com/api.")
- async def test_request_call_raises_timeout_error(self, aiohttp_request):
- with aioresponses() as m:
- m.get("http://example.com", exception=asyncio.TimeoutError)
- with pytest.raises(exceptions.TimeoutError) as exc:
- await aiohttp_request("http://example.com")
- exc.match("Request timed out after 180 seconds.")
- async def test_request_call_raises_transport_error_for_closed_session(
- self, aiohttp_request
- ):
- with aioresponses() as m:
- m.get("http://example.com", exception=asyncio.TimeoutError)
- aiohttp_request._closed = True
- with pytest.raises(exceptions.TransportError) as exc:
- await aiohttp_request("http://example.com")
- exc.match("session is closed.")
- aiohttp_request._closed = False
|