test_aioresponses.py 30 KB


  1. # -*- coding: utf-8 -*-
  2. import asyncio
  3. import re
  4. from asyncio import CancelledError, TimeoutError
  5. from random import uniform
  6. from typing import Coroutine, Generator, Union
  7. from unittest.mock import patch
  8. from aiohttp import hdrs
  9. from aiohttp import http
  10. from aiohttp import web
  11. from aiohttp.client import ClientSession
  12. from aiohttp.client_reqrep import ClientResponse
  13. from aiohttp.test_utils import TestServer
  14. from packaging.version import Version
  15. try:
  16. from aiohttp.errors import (
  17. ClientConnectionError,
  18. ClientResponseError,
  19. HttpProcessingError,
  20. )
  21. except ImportError:
  22. from aiohttp.client_exceptions import (
  23. ClientConnectionError,
  24. ClientResponseError,
  25. )
  26. from aiohttp.http_exceptions import HttpProcessingError
  27. from aioresponses.compat import AIOHTTP_VERSION, URL
  28. from aioresponses import CallbackResult, aioresponses
  29. from .base import fail_on, skipIf, AsyncTestCase
  30. MINIMUM_AIOHTTP_VERSION = Version('3.4.0')
  31. class AIOResponsesTestCase(AsyncTestCase):
  32. async def setup(self):
  33. self.url = 'http://example.com/api?foo=bar#fragment'
  34. self.session = ClientSession()
  35. self.external_server = self.make_external_server()
  36. await self.external_server.start_server()
  37. async def teardown(self):
  38. close_result = self.session.close()
  39. if close_result is not None:
  40. await close_result
  41. def run_async(self, coroutine: Union[Coroutine, Generator]):
  42. return self.loop.run_until_complete(coroutine)
  43. def make_external_server(self):
  44. """
  45. В оригинальном коде для тестирования passthrough исполняются настоящие
  46. запросы до сайта http://httpbin.org. В sandbox нет интернета, потому я заменил
  47. httpbin на локальный сервер. Осторожнее при обновлении!
  48. """
  49. async def testserver_status_201(request):
  50. return web.Response(status=201)
  51. async def testserver_get(request):
  52. return web.Response()
  53. app = web.Application()
  54. app.router.add_get('/status/201', testserver_status_201)
  55. app.router.add_get('/get', testserver_get)
  56. return TestServer(app)
  57. async def request(self, url: str):
  58. return await self.session.get(url)
  59. @fail_on(unused_loop=False)
  60. def test_shortcut_method(self):
  61. for http_method in (
  62. hdrs.METH_HEAD,
  63. hdrs.METH_GET,
  64. hdrs.METH_POST,
  65. hdrs.METH_PUT,
  66. hdrs.METH_PATCH,
  67. hdrs.METH_DELETE,
  68. hdrs.METH_OPTIONS,
  69. ):
  70. with self.subTest(), \
  71. patch('aioresponses.aioresponses.add') as mocked, \
  72. aioresponses() as m:
  73. getattr(m, http_method.lower())(self.url)
  74. mocked.assert_called_once_with(self.url, method=http_method)
  75. @aioresponses()
  76. def test_returned_instance(self, m):
  77. m.get(self.url)
  78. response = self.run_async(self.session.get(self.url))
  79. self.assertIsInstance(response, ClientResponse)
  80. @aioresponses()
  81. async def test_returned_instance_and_status_code(self, m):
  82. m.get(self.url, status=204)
  83. response = await self.session.get(self.url)
  84. self.assertIsInstance(response, ClientResponse)
  85. self.assertEqual(response.status, 204)
  86. @aioresponses()
  87. async def test_returned_response_headers(self, m):
  88. m.get(self.url,
  89. content_type='text/html',
  90. headers={'Connection': 'keep-alive'})
  91. response = await self.session.get(self.url)
  92. self.assertEqual(response.headers['Connection'], 'keep-alive')
  93. self.assertEqual(response.headers[hdrs.CONTENT_TYPE], 'text/html')
  94. @aioresponses()
  95. async def test_returned_response_cookies(self, m):
  96. m.get(self.url,
  97. headers={'Set-Cookie': 'cookie=value'})
  98. response = await self.session.get(self.url)
  99. self.assertEqual(response.cookies['cookie'].value, 'value')
  100. @aioresponses()
  101. async def test_returned_response_raw_headers(self, m):
  102. m.get(self.url,
  103. content_type='text/html',
  104. headers={'Connection': 'keep-alive'})
  105. response = await self.session.get(self.url)
  106. expected_raw_headers = (
  107. (hdrs.CONTENT_TYPE.encode(), b'text/html'),
  108. (b'Connection', b'keep-alive')
  109. )
  110. self.assertEqual(response.raw_headers, expected_raw_headers)
  111. @aioresponses()
  112. async def test_raise_for_status(self, m):
  113. m.get(self.url, status=400)
  114. with self.assertRaises(ClientResponseError) as cm:
  115. response = await self.session.get(self.url)
  116. response.raise_for_status()
  117. self.assertEqual(cm.exception.message, http.RESPONSES[400][0])
  118. @aioresponses()
  119. @skipIf(condition=AIOHTTP_VERSION < MINIMUM_AIOHTTP_VERSION,
  120. reason='aiohttp<3.4.0 does not support raise_for_status '
  121. 'arguments for requests')
  122. async def test_request_raise_for_status(self, m):
  123. m.get(self.url, status=400)
  124. with self.assertRaises(ClientResponseError) as cm:
  125. await self.session.get(self.url, raise_for_status=True)
  126. self.assertEqual(cm.exception.message, http.RESPONSES[400][0])
  127. @aioresponses()
  128. async def test_returned_instance_and_params_handling(self, m):
  129. expected_url = 'http://example.com/api?foo=bar&x=42#fragment'
  130. m.get(expected_url)
  131. response = await self.session.get(self.url, params={'x': 42})
  132. self.assertIsInstance(response, ClientResponse)
  133. self.assertEqual(response.status, 200)
  134. expected_url = 'http://example.com/api?x=42#fragment'
  135. m.get(expected_url)
  136. response = await self.session.get(
  137. 'http://example.com/api#fragment',
  138. params={'x': 42}
  139. )
  140. self.assertIsInstance(response, ClientResponse)
  141. self.assertEqual(response.status, 200)
  142. self.assertEqual(len(m.requests), 2)
  143. with self.assertRaises(AssertionError):
  144. m.assert_called_once()
  145. @aioresponses()
  146. def test_method_dont_match(self, m):
  147. m.get(self.url)
  148. with self.assertRaises(ClientConnectionError):
  149. self.run_async(self.session.post(self.url))
  150. @aioresponses()
  151. def test_post_with_data(self, m: aioresponses):
  152. body = {'foo': 'bar'}
  153. payload = {'spam': 'eggs'}
  154. user_agent = {'User-Agent': 'aioresponses'}
  155. m.post(
  156. self.url,
  157. payload=payload,
  158. headers=dict(connection='keep-alive'),
  159. body=body,
  160. )
  161. response = self.run_async(
  162. self.session.post(
  163. self.url,
  164. data=payload,
  165. headers=user_agent
  166. )
  167. )
  168. self.assertIsInstance(response, ClientResponse)
  169. self.assertEqual(response.status, 200)
  170. response_data = self.run_async(response.json())
  171. self.assertEqual(response_data, payload)
  172. m.assert_called_once_with(
  173. self.url,
  174. method='POST',
  175. data=payload,
  176. headers={'User-Agent': 'aioresponses'}
  177. )
  178. # Wrong data
  179. with self.assertRaises(AssertionError):
  180. m.assert_called_once_with(
  181. self.url,
  182. method='POST',
  183. data=body,
  184. headers={'User-Agent': 'aioresponses'}
  185. )
  186. # Wrong url
  187. with self.assertRaises(AssertionError):
  188. m.assert_called_once_with(
  189. 'http://httpbin.org/',
  190. method='POST',
  191. data=payload,
  192. headers={'User-Agent': 'aioresponses'}
  193. )
  194. # Wrong headers
  195. with self.assertRaises(AssertionError):
  196. m.assert_called_once_with(
  197. self.url,
  198. method='POST',
  199. data=payload,
  200. headers={'User-Agent': 'aiorequest'}
  201. )
  202. @aioresponses()
  203. async def test_streaming(self, m):
  204. m.get(self.url, body='Test')
  205. resp = await self.session.get(self.url)
  206. content = await resp.content.read()
  207. self.assertEqual(content, b'Test')
  208. @aioresponses()
  209. async def test_streaming_up_to(self, m):
  210. m.get(self.url, body='Test')
  211. resp = await self.session.get(self.url)
  212. content = await resp.content.read(2)
  213. self.assertEqual(content, b'Te')
  214. content = await resp.content.read(2)
  215. self.assertEqual(content, b'st')
  216. @aioresponses()
  217. async def test_binary_body(self, m):
  218. body = b'Invalid utf-8: \x95\x00\x85'
  219. m.get(self.url, body=body)
  220. resp = await self.session.get(self.url)
  221. content = await resp.read()
  222. self.assertEqual(content, body)
  223. @aioresponses()
  224. async def test_binary_body_via_callback(self, m):
  225. body = b'\x00\x01\x02\x80\x81\x82\x83\x84\x85'
  226. def callback(url, **kwargs):
  227. return CallbackResult(body=body)
  228. m.get(self.url, callback=callback)
  229. resp = await self.session.get(self.url)
  230. content = await resp.read()
  231. self.assertEqual(content, body)
  232. async def test_mocking_as_context_manager(self):
  233. with aioresponses() as aiomock:
  234. aiomock.add(self.url, payload={'foo': 'bar'})
  235. resp = await self.session.get(self.url)
  236. self.assertEqual(resp.status, 200)
  237. payload = await resp.json()
  238. self.assertDictEqual(payload, {'foo': 'bar'})
  239. def test_mocking_as_decorator(self):
  240. @aioresponses()
  241. def foo(loop, m):
  242. m.add(self.url, payload={'foo': 'bar'})
  243. resp = loop.run_until_complete(self.session.get(self.url))
  244. self.assertEqual(resp.status, 200)
  245. payload = loop.run_until_complete(resp.json())
  246. self.assertDictEqual(payload, {'foo': 'bar'})
  247. foo(self.loop)
  248. async def test_passing_argument(self):
  249. @aioresponses(param='mocked')
  250. async def foo(mocked):
  251. mocked.add(self.url, payload={'foo': 'bar'})
  252. resp = await self.session.get(self.url)
  253. self.assertEqual(resp.status, 200)
  254. await foo()
  255. @fail_on(unused_loop=False)
  256. def test_mocking_as_decorator_wrong_mocked_arg_name(self):
  257. @aioresponses(param='foo')
  258. def foo(bar):
  259. # no matter what is here it should raise an error
  260. pass
  261. with self.assertRaises(TypeError) as cm:
  262. foo()
  263. exc = cm.exception
  264. self.assertIn("foo() got an unexpected keyword argument 'foo'",
  265. str(exc))
  266. async def test_unknown_request(self):
  267. with aioresponses() as aiomock:
  268. aiomock.add(self.url, payload={'foo': 'bar'})
  269. with self.assertRaises(ClientConnectionError):
  270. await self.session.get('http://example.com/foo')
  271. async def test_raising_exception(self):
  272. with aioresponses() as aiomock:
  273. url = 'http://example.com/Exception'
  274. aiomock.get(url, exception=Exception)
  275. with self.assertRaises(Exception):
  276. await self.session.get(url)
  277. url = 'http://example.com/Exception_object'
  278. aiomock.get(url, exception=Exception())
  279. with self.assertRaises(Exception):
  280. await self.session.get(url)
  281. url = 'http://example.com/BaseException'
  282. aiomock.get(url, exception=BaseException)
  283. with self.assertRaises(BaseException):
  284. await self.session.get(url)
  285. url = 'http://example.com/BaseException_object'
  286. aiomock.get(url, exception=BaseException())
  287. with self.assertRaises(BaseException):
  288. await self.session.get(url)
  289. url = 'http://example.com/CancelError'
  290. aiomock.get(url, exception=CancelledError)
  291. with self.assertRaises(CancelledError):
  292. await self.session.get(url)
  293. url = 'http://example.com/TimeoutError'
  294. aiomock.get(url, exception=TimeoutError)
  295. with self.assertRaises(TimeoutError):
  296. await self.session.get(url)
  297. url = 'http://example.com/HttpProcessingError'
  298. aiomock.get(url, exception=HttpProcessingError(message='foo'))
  299. with self.assertRaises(HttpProcessingError):
  300. await self.session.get(url)
  301. callback_called = asyncio.Event()
  302. url = 'http://example.com/HttpProcessingError'
  303. aiomock.get(url, exception=HttpProcessingError(message='foo'),
  304. callback=lambda *_, **__: callback_called.set())
  305. with self.assertRaises(HttpProcessingError):
  306. await self.session.get(url)
  307. await callback_called.wait()
  308. async def test_multiple_requests(self):
  309. """Ensure that requests are saved the way they would have been sent."""
  310. with aioresponses() as m:
  311. m.get(self.url, status=200)
  312. m.get(self.url, status=201)
  313. m.get(self.url, status=202)
  314. json_content_as_ref = [1]
  315. resp = await self.session.get(
  316. self.url, json=json_content_as_ref
  317. )
  318. self.assertEqual(resp.status, 200)
  319. json_content_as_ref[:] = [2]
  320. resp = await self.session.get(
  321. self.url, json=json_content_as_ref
  322. )
  323. self.assertEqual(resp.status, 201)
  324. json_content_as_ref[:] = [3]
  325. resp = await self.session.get(
  326. self.url, json=json_content_as_ref
  327. )
  328. self.assertEqual(resp.status, 202)
  329. key = ('GET', URL(self.url))
  330. self.assertIn(key, m.requests)
  331. self.assertEqual(len(m.requests[key]), 3)
  332. first_request = m.requests[key][0]
  333. self.assertEqual(first_request.args, tuple())
  334. self.assertEqual(first_request.kwargs,
  335. {'allow_redirects': True, "json": [1]})
  336. second_request = m.requests[key][1]
  337. self.assertEqual(second_request.args, tuple())
  338. self.assertEqual(second_request.kwargs,
  339. {'allow_redirects': True, "json": [2]})
  340. third_request = m.requests[key][2]
  341. self.assertEqual(third_request.args, tuple())
  342. self.assertEqual(third_request.kwargs,
  343. {'allow_redirects': True, "json": [3]})
  344. async def test_request_with_non_deepcopyable_parameter(self):
  345. def non_deep_copyable():
  346. """A generator does not allow deepcopy."""
  347. for line in ["header1,header2", "v1,v2", "v10,v20"]:
  348. yield line
  349. generator_value = non_deep_copyable()
  350. with aioresponses() as m:
  351. m.get(self.url, status=200)
  352. resp = await self.session.get(self.url, data=generator_value)
  353. self.assertEqual(resp.status, 200)
  354. key = ('GET', URL(self.url))
  355. self.assertIn(key, m.requests)
  356. self.assertEqual(len(m.requests[key]), 1)
  357. request = m.requests[key][0]
  358. self.assertEqual(request.args, tuple())
  359. self.assertEqual(request.kwargs,
  360. {'allow_redirects': True,
  361. "data": generator_value})
  362. async def test_request_retrieval_in_case_no_response(self):
  363. with aioresponses() as m:
  364. with self.assertRaises(ClientConnectionError):
  365. await self.session.get(self.url)
  366. key = ('GET', URL(self.url))
  367. self.assertIn(key, m.requests)
  368. self.assertEqual(len(m.requests[key]), 1)
  369. self.assertEqual(m.requests[key][0].args, tuple())
  370. self.assertEqual(m.requests[key][0].kwargs,
  371. {'allow_redirects': True})
  372. async def test_request_failure_in_case_session_is_closed(self):
  373. async def do_request(session):
  374. return (await session.get(self.url))
  375. with aioresponses():
  376. coro = do_request(self.session)
  377. await self.session.close()
  378. with self.assertRaises(RuntimeError) as exception_info:
  379. await coro
  380. assert str(exception_info.exception) == "Session is closed"
  381. async def test_address_as_instance_of_url_combined_with_pass_through(self):
  382. external_api = str(self.external_server.make_url('/status/201'))
  383. async def doit():
  384. api_resp = await self.session.get(self.url)
  385. # we have to hit actual url,
  386. # otherwise we do not test pass through option properly
  387. ext_rep = await self.session.get(URL(external_api))
  388. return api_resp, ext_rep
  389. with aioresponses(passthrough=[external_api]) as m:
  390. m.get(self.url, status=200)
  391. api, ext = await doit()
  392. self.assertEqual(api.status, 200)
  393. self.assertEqual(ext.status, 201)
  394. async def test_pass_through_with_origin_params(self):
  395. external_api = str(self.external_server.make_url('/get'))
  396. async def doit(params):
  397. # we have to hit actual url,
  398. # otherwise we do not test pass through option properly
  399. ext_rep = await self.session.get(
  400. URL(external_api), params=params
  401. )
  402. return ext_rep
  403. with aioresponses(passthrough=[external_api]) as m:
  404. params = {'foo': 'bar'}
  405. ext = await doit(params=params)
  406. self.assertEqual(ext.status, 200)
  407. self.assertEqual(str(ext.url), external_api + '?foo=bar')
  408. @aioresponses()
  409. async def test_custom_response_class(self, m):
  410. class CustomClientResponse(ClientResponse):
  411. pass
  412. m.get(self.url, body='Test', response_class=CustomClientResponse)
  413. resp = await self.session.get(self.url)
  414. self.assertTrue(isinstance(resp, CustomClientResponse))
  415. @aioresponses()
  416. def test_exceptions_in_the_middle_of_responses(self, mocked):
  417. mocked.get(self.url, payload={}, status=204)
  418. mocked.get(self.url, exception=ValueError('oops'), )
  419. mocked.get(self.url, payload={}, status=204)
  420. mocked.get(self.url, exception=ValueError('oops'), )
  421. mocked.get(self.url, payload={}, status=200)
  422. async def doit():
  423. return (await self.session.get(self.url))
  424. self.assertEqual(self.run_async(doit()).status, 204)
  425. with self.assertRaises(ValueError):
  426. self.run_async(doit())
  427. self.assertEqual(self.run_async(doit()).status, 204)
  428. with self.assertRaises(ValueError):
  429. self.run_async(doit())
  430. self.assertEqual(self.run_async(doit()).status, 200)
  431. @aioresponses()
  432. async def test_request_should_match_regexp(self, mocked):
  433. mocked.get(
  434. re.compile(r'^http://example\.com/api\?foo=.*$'),
  435. payload={}, status=200
  436. )
  437. response = await self.request(self.url)
  438. self.assertEqual(response.status, 200)
  439. @aioresponses()
  440. async def test_request_does_not_match_regexp(self, mocked):
  441. mocked.get(
  442. re.compile(r'^http://exampleexample\.com/api\?foo=.*$'),
  443. payload={}, status=200
  444. )
  445. with self.assertRaises(ClientConnectionError):
  446. await self.request(self.url)
  447. @aioresponses()
  448. def test_timeout(self, mocked):
  449. mocked.get(self.url, timeout=True)
  450. with self.assertRaises(asyncio.TimeoutError):
  451. self.run_async(self.request(self.url))
  452. @aioresponses()
  453. def test_callback(self, m):
  454. body = b'New body'
  455. def callback(url, **kwargs):
  456. self.assertEqual(str(url), self.url)
  457. self.assertEqual(kwargs, {'allow_redirects': True})
  458. return CallbackResult(body=body)
  459. m.get(self.url, callback=callback)
  460. response = self.run_async(self.request(self.url))
  461. data = self.run_async(response.read())
  462. assert data == body
  463. @aioresponses()
  464. def test_callback_coroutine(self, m):
  465. body = b'New body'
  466. event = asyncio.Event()
  467. async def callback(url, **kwargs):
  468. await event.wait()
  469. self.assertEqual(str(url), self.url)
  470. self.assertEqual(kwargs, {'allow_redirects': True})
  471. return CallbackResult(body=body)
  472. m.get(self.url, callback=callback)
  473. future = asyncio.ensure_future(self.request(self.url))
  474. self.run_async(asyncio.wait([future], timeout=0))
  475. assert not future.done()
  476. event.set()
  477. self.run_async(asyncio.wait([future], timeout=0))
  478. assert future.done()
  479. response = future.result()
  480. data = self.run_async(response.read())
  481. assert data == body
  482. @aioresponses()
  483. def test_assert_not_called(self, m: aioresponses):
  484. m.get(self.url)
  485. m.assert_not_called()
  486. self.run_async(self.session.get(self.url))
  487. with self.assertRaises(AssertionError):
  488. m.assert_not_called()
  489. @aioresponses()
  490. def test_assert_called(self, m: aioresponses):
  491. m.get(self.url)
  492. with self.assertRaises(AssertionError):
  493. m.assert_called()
  494. self.run_async(self.session.get(self.url))
  495. m.assert_called_once()
  496. m.assert_called_once_with(self.url)
  497. m.assert_called_with(self.url)
  498. with self.assertRaises(AssertionError):
  499. m.assert_not_called()
  500. with self.assertRaises(AssertionError):
  501. m.assert_called_with("http://foo.bar")
  502. @aioresponses()
  503. async def test_assert_called_twice(self, m: aioresponses):
  504. m.get(self.url, repeat=True)
  505. m.assert_not_called()
  506. await self.session.get(self.url)
  507. await self.session.get(self.url)
  508. with self.assertRaises(AssertionError):
  509. m.assert_called_once()
  510. @aioresponses()
  511. async def test_assert_any_call(self, m: aioresponses):
  512. http_bin_url = "http://httpbin.org"
  513. m.get(self.url)
  514. m.get(http_bin_url)
  515. await self.session.get(self.url)
  516. response = await self.session.get(http_bin_url)
  517. self.assertEqual(response.status, 200)
  518. m.assert_any_call(self.url)
  519. m.assert_any_call(http_bin_url)
  520. @aioresponses()
  521. async def test_assert_any_call_not_called(self, m: aioresponses):
  522. http_bin_url = "http://httpbin.org"
  523. m.get(self.url)
  524. response = await self.session.get(self.url)
  525. self.assertEqual(response.status, 200)
  526. m.assert_any_call(self.url)
  527. with self.assertRaises(AssertionError):
  528. m.assert_any_call(http_bin_url)
  529. @aioresponses()
  530. async def test_exception_requests_are_tracked(self, mocked):
  531. kwargs = {"json": [42], "allow_redirects": True}
  532. mocked.get(self.url, exception=ValueError('oops'))
  533. with self.assertRaises(ValueError):
  534. await self.session.get(self.url, **kwargs)
  535. key = ('GET', URL(self.url))
  536. mocked_requests = mocked.requests[key]
  537. self.assertEqual(len(mocked_requests), 1)
  538. request = mocked_requests[0]
  539. self.assertEqual(request.args, ())
  540. self.assertEqual(request.kwargs, kwargs)
  541. async def test_possible_race_condition(self):
  542. async def random_sleep_cb(url, **kwargs):
  543. await asyncio.sleep(uniform(0.1, 1))
  544. return CallbackResult(body='test')
  545. with aioresponses() as mocked:
  546. for i in range(20):
  547. mocked.get(
  548. 'http://example.org/id-{}'.format(i),
  549. callback=random_sleep_cb
  550. )
  551. tasks = [
  552. self.session.get('http://example.org/id-{}'.format(i)) for
  553. i in range(20)
  554. ]
  555. await asyncio.gather(*tasks)
  556. class AIOResponsesRaiseForStatusSessionTestCase(AsyncTestCase):
  557. """Test case for sessions with raise_for_status=True.
  558. This flag, introduced in aiohttp v2.0.0, automatically calls
  559. `raise_for_status()`.
  560. It is overridden by the `raise_for_status` argument of the request since
  561. aiohttp v3.4.a0.
  562. """
  563. async def setup(self):
  564. self.url = 'http://example.com/api?foo=bar#fragment'
  565. self.session = ClientSession(raise_for_status=True)
  566. async def teardown(self):
  567. close_result = self.session.close()
  568. if close_result is not None:
  569. await close_result
  570. @aioresponses()
  571. async def test_raise_for_status(self, m):
  572. m.get(self.url, status=400)
  573. with self.assertRaises(ClientResponseError) as cm:
  574. await self.session.get(self.url)
  575. self.assertEqual(cm.exception.message, http.RESPONSES[400][0])
  576. @aioresponses()
  577. @skipIf(condition=AIOHTTP_VERSION < MINIMUM_AIOHTTP_VERSION,
  578. reason='aiohttp<3.4.0 does not support raise_for_status '
  579. 'arguments for requests')
  580. async def test_do_not_raise_for_status(self, m):
  581. m.get(self.url, status=400)
  582. response = await self.session.get(self.url,
  583. raise_for_status=False)
  584. self.assertEqual(response.status, 400)
  585. @aioresponses()
  586. @skipIf(condition=AIOHTTP_VERSION < Version('3.9.0'),
  587. reason='aiohttp<3.9.0 does not support callable raise_for_status '
  588. 'arguments for requests')
  589. async def test_callable_raise_for_status(self, m):
  590. async def raise_for_status(response: ClientResponse):
  591. if response.status >= 400:
  592. raise Exception("callable raise_for_status")
  593. m.get(self.url, status=400)
  594. with self.assertRaises(Exception) as cm:
  595. await self.session.get(self.url,
  596. raise_for_status=raise_for_status)
  597. self.assertEqual(str(cm.exception), "callable raise_for_status")
  598. class AIOResponseRedirectTest(AsyncTestCase):
  599. async def setup(self):
  600. self.url = "http://10.1.1.1:8080/redirect"
  601. self.session = ClientSession()
  602. async def teardown(self):
  603. close_result = self.session.close()
  604. if close_result is not None:
  605. await close_result
  606. @aioresponses()
  607. async def test_redirect_followed(self, rsps):
  608. rsps.get(
  609. self.url,
  610. status=307,
  611. headers={"Location": "https://httpbin.org"},
  612. )
  613. rsps.get("https://httpbin.org")
  614. response = await self.session.get(
  615. self.url, allow_redirects=True
  616. )
  617. self.assertEqual(response.status, 200)
  618. self.assertEqual(str(response.url), "https://httpbin.org")
  619. self.assertEqual(len(response.history), 1)
  620. self.assertEqual(str(response.history[0].url), self.url)
  621. @aioresponses()
  622. async def test_post_redirect_followed(self, rsps):
  623. rsps.post(
  624. self.url,
  625. status=307,
  626. headers={"Location": "https://httpbin.org"},
  627. )
  628. rsps.get("https://httpbin.org")
  629. response = await self.session.post(
  630. self.url, allow_redirects=True
  631. )
  632. self.assertEqual(response.status, 200)
  633. self.assertEqual(str(response.url), "https://httpbin.org")
  634. self.assertEqual(response.method, "get")
  635. self.assertEqual(len(response.history), 1)
  636. self.assertEqual(str(response.history[0].url), self.url)
  637. @aioresponses()
  638. async def test_redirect_missing_mocked_match(self, rsps):
  639. rsps.get(
  640. self.url,
  641. status=307,
  642. headers={"Location": "https://httpbin.org"},
  643. )
  644. with self.assertRaises(ClientConnectionError) as cm:
  645. await self.session.get(
  646. self.url, allow_redirects=True
  647. )
  648. self.assertEqual(
  649. str(cm.exception),
  650. 'Connection refused: GET http://10.1.1.1:8080/redirect'
  651. )
  652. @aioresponses()
  653. async def test_redirect_missing_location_header(self, rsps):
  654. rsps.get(self.url, status=307)
  655. response = await self.session.get(self.url, allow_redirects=True)
  656. self.assertEqual(str(response.url), self.url)
  657. @aioresponses()
  658. async def test_request_info(self, rsps):
  659. rsps.get(self.url, status=200)
  660. response = await self.session.get(self.url)
  661. request_info = response.request_info
  662. assert str(request_info.url) == self.url
  663. assert request_info.headers == {}
  664. @aioresponses()
  665. async def test_request_info_with_original_request_headers(self, rsps):
  666. headers = {"Authorization": "Bearer access-token"}
  667. rsps.get(self.url, status=200)
  668. response = await self.session.get(self.url, headers=headers)
  669. request_info = response.request_info
  670. assert str(request_info.url) == self.url
  671. assert request_info.headers == headers
  672. @aioresponses()
  673. async def test_relative_url_redirect_followed(self, rsps):
  674. base_url = "https://httpbin.org"
  675. url = f"{base_url}/foo/bar"
  676. rsps.get(
  677. url,
  678. status=307,
  679. headers={"Location": "../baz"},
  680. )
  681. rsps.get(f"{base_url}/baz")
  682. response = await self.session.get(url, allow_redirects=True)
  683. self.assertEqual(response.status, 200)
  684. self.assertEqual(str(response.url), f"{base_url}/baz")
  685. self.assertEqual(len(response.history), 1)
  686. self.assertEqual(str(response.history[0].url), url)
  687. async def _test_pass_through_unmatched_requests(self):
  688. matched_url = "https://matched_example.org"
  689. unmatched_url = "https://httpbin.org/get"
  690. params_unmatched = {'foo': 'bar'}
  691. with aioresponses(passthrough_unmatched=True) as m:
  692. m.post(URL(matched_url), status=200)
  693. mocked_response = await self.session.post(URL(matched_url))
  694. response = await self.session.get(
  695. URL(unmatched_url), params=params_unmatched
  696. )
  697. self.assertEqual(response.status, 200)
  698. self.assertEqual(str(response.url), 'https://httpbin.org/get?foo=bar')
  699. self.assertEqual(mocked_response.status, 200)