test_oauth1_session.py 15 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348
  1. from __future__ import unicode_literals, print_function
  2. import unittest
  3. import sys
  4. import requests
  5. from io import StringIO
  6. from oauthlib.oauth1 import SIGNATURE_TYPE_QUERY, SIGNATURE_TYPE_BODY
  7. from oauthlib.oauth1 import SIGNATURE_RSA, SIGNATURE_PLAINTEXT
  8. from requests_oauthlib import OAuth1Session
  9. try:
  10. import mock
  11. except ImportError:
  12. from unittest import mock
  13. try:
  14. import cryptography
  15. except ImportError:
  16. cryptography = None
  17. try:
  18. import jwt
  19. except ImportError:
  20. jwt = None
  21. if sys.version[0] == "3":
  22. unicode_type = str
  23. else:
  24. unicode_type = unicode
  25. TEST_RSA_KEY = (
  26. "-----BEGIN RSA PRIVATE KEY-----\n"
  27. "MIIEogIBAAKCAQEApF1JaMSN8TEsh4N4O/5SpEAVLivJyLH+Cgl3OQBPGgJkt8cg\n"
  28. "49oasl+5iJS+VdrILxWM9/JCJyURpUuslX4Eb4eUBtQ0x5BaPa8+S2NLdGTaL7nB\n"
  29. "OO8o8n0C5FEUU+qlEip79KE8aqOj+OC44VsIquSmOvWIQD26n3fCVlgwoRBD1gzz\n"
  30. "sDOeaSyzpKrZR851Kh6rEmF2qjJ8jt6EkxMsRNACmBomzgA4M1TTsisSUO87444p\n"
  31. "e35Z4/n5c735o2fZMrGgMwiJNh7rT8SYxtIkxngioiGnwkxGQxQ4NzPAHg+XSY0J\n"
  32. "04pNm7KqTkgtxyrqOANJLIjXlR+U9SQ90NjHVQIDAQABAoIBABuBPOKaWcJt3yzC\n"
  33. "NGGduoif7KtwSnEaUA+v69KPGa2Zju8uFHPssKD+4dZYRc2qMeunKJLpaGaSjnRh\n"
  34. "yHyvvOBJCN1nr3lhz6gY5kzJTfwpUFXCOPJlGy4Q+2Xnp4YvcvYqQ9n5DVovDiZ8\n"
  35. "vJOBn16xqpudMPLHIa7D5LJ8SY76HBjE+imTXw1EShdh5TOV9bmPFQqH6JFzowRH\n"
  36. "hyH2DPHuyHJj6cl8FyqJw5lVWzG3n6Prvk7bYHsjmGjurN35UsumNAp6VouNyUP1\n"
  37. "RAEcUJega49aIs6/FJ0ENJzQjlsAzVbTleHkpez2aIok+wsWJGJ4SVxAjADOWAaZ\n"
  38. "uEJPc3UCgYEA1g4ZGrXOuo75p9/MRIepXGpBWxip4V7B9XmO9WzPCv8nMorJntWB\n"
  39. "msYV1I01aITxadHatO4Gl2xLniNkDyrEQzJ7w38RQgsVK+CqbnC0K9N77QPbHeC1\n"
  40. "YQd9RCNyUohOimKvb7jyv798FBU1GO5QI2eNgfnnfteSVXhD2iOoTOsCgYEAxJJ+\n"
  41. "8toxJdnLa0uUsAbql6zeNXGbUBMzu3FomKlyuWuq841jS2kIalaO/TRj5hbnE45j\n"
  42. "mCjeLgTVO6Ach3Wfk4zrqajqfFJ0zUg/Wexp49lC3RWiV4icBb85Q6bzeJD9Dn9v\n"
  43. "hjpfWVkczf/NeA1fGH/pcgfkT6Dm706GFFttLL8CgYBl/HeXk1H47xAiHO4dJKnb\n"
  44. "v0B+X8To/RXamF01r+8BpUoOubOQetdyX7ic+d6deuHu8i6LD/GSCeYJZYFR/KVg\n"
  45. "AtiW757QYalnq3ZogkhFrVCZP8IRfTPOFBxp752TlyAcrSI7T9pQ47IBe4094KXM\n"
  46. "CJWSfPgAJkOxd0iU0XJpmwKBgGfQxuMTgSlwYRKFlD1zKap5TdID8fbUbVnth0Q5\n"
  47. "GbH7vwlp/qrxCdS/aj0n0irOpbOaW9ccnlrHiqY25VpVMLYIkt3DrDOEiNNx+KNR\n"
  48. "TItdTwbcSiTYrS4L0/56ydM/H6bsfsXxRjI18hSJqMZiqXqS84OZz2aOn+h7HCzc\n"
  49. "LEiZAoGASk20wFvilpRKHq79xxFWiDUPHi0x0pp82dYIEntGQkKUWkbSlhgf3MAi\n"
  50. "5NEQTDmXdnB+rVeWIvEi+BXfdnNgdn8eC4zSdtF4sIAhYr5VWZo0WVWDhT7u2ccv\n"
  51. "ZBFymiz8lo3gN57wGUCi9pbZqzV1+ZppX6YTNDdDCE0q+KO3Cec=\n"
  52. "-----END RSA PRIVATE KEY-----"
  53. )
  54. TEST_RSA_OAUTH_SIGNATURE = (
  55. "j8WF8PGjojT82aUDd2EL%2Bz7HCoHInFzWUpiEKMCy%2BJ2cYHWcBS7mXlmFDLgAKV0"
  56. "P%2FyX4TrpXODYnJ6dRWdfghqwDpi%2FlQmB2jxCiGMdJoYxh3c5zDf26gEbGdP6D7O"
  57. "Ssp5HUnzH6sNkmVjuE%2FxoJcHJdc23H6GhOs7VJ2LWNdbhKWP%2FMMlTrcoQDn8lz"
  58. "%2Fb24WsJ6ae1txkUzpFOOlLM8aTdNtGL4OtsubOlRhNqnAFq93FyhXg0KjzUyIZzmMX"
  59. "9Vx90jTks5QeBGYcLE0Op2iHb2u%2FO%2BEgdwFchgEwE5LgMUyHUI4F3Wglp28yHOAM"
  60. "jPkI%2FkWMvpxtMrU3Z3KN31WQ%3D%3D"
  61. )
  62. class OAuth1SessionTest(unittest.TestCase):
  63. def test_signature_types(self):
  64. def verify_signature(getter):
  65. def fake_send(r, **kwargs):
  66. signature = getter(r)
  67. if isinstance(signature, bytes):
  68. signature = signature.decode("utf-8")
  69. self.assertIn("oauth_signature", signature)
  70. resp = mock.MagicMock(spec=requests.Response)
  71. resp.cookies = []
  72. return resp
  73. return fake_send
  74. header = OAuth1Session("foo")
  75. header.send = verify_signature(lambda r: r.headers["Authorization"])
  76. header.post("https://i.b")
  77. query = OAuth1Session("foo", signature_type=SIGNATURE_TYPE_QUERY)
  78. query.send = verify_signature(lambda r: r.url)
  79. query.post("https://i.b")
  80. body = OAuth1Session("foo", signature_type=SIGNATURE_TYPE_BODY)
  81. headers = {"Content-Type": "application/x-www-form-urlencoded"}
  82. body.send = verify_signature(lambda r: r.body)
  83. body.post("https://i.b", headers=headers, data="")
  84. @mock.patch("oauthlib.oauth1.rfc5849.generate_timestamp")
  85. @mock.patch("oauthlib.oauth1.rfc5849.generate_nonce")
  86. def test_signature_methods(self, generate_nonce, generate_timestamp):
  87. if not cryptography:
  88. raise unittest.SkipTest("cryptography module is required")
  89. if not jwt:
  90. raise unittest.SkipTest("pyjwt module is required")
  91. generate_nonce.return_value = "abc"
  92. generate_timestamp.return_value = "123"
  93. signature = 'OAuth oauth_nonce="abc", oauth_timestamp="123", oauth_version="1.0", oauth_signature_method="HMAC-SHA1", oauth_consumer_key="foo", oauth_signature="h2sRqLArjhlc5p3FTkuNogVHlKE%3D"'
  94. auth = OAuth1Session("foo")
  95. auth.send = self.verify_signature(signature)
  96. auth.post("https://i.b")
  97. signature = 'OAuth oauth_nonce="abc", oauth_timestamp="123", oauth_version="1.0", oauth_signature_method="PLAINTEXT", oauth_consumer_key="foo", oauth_signature="%26"'
  98. auth = OAuth1Session("foo", signature_method=SIGNATURE_PLAINTEXT)
  99. auth.send = self.verify_signature(signature)
  100. auth.post("https://i.b")
  101. signature = (
  102. "OAuth "
  103. 'oauth_nonce="abc", oauth_timestamp="123", oauth_version="1.0", '
  104. 'oauth_signature_method="RSA-SHA1", oauth_consumer_key="foo", '
  105. 'oauth_signature="{sig}"'
  106. ).format(sig=TEST_RSA_OAUTH_SIGNATURE)
  107. auth = OAuth1Session(
  108. "foo", signature_method=SIGNATURE_RSA, rsa_key=TEST_RSA_KEY
  109. )
  110. auth.send = self.verify_signature(signature)
  111. auth.post("https://i.b")
  112. @mock.patch("oauthlib.oauth1.rfc5849.generate_timestamp")
  113. @mock.patch("oauthlib.oauth1.rfc5849.generate_nonce")
  114. def test_binary_upload(self, generate_nonce, generate_timestamp):
  115. generate_nonce.return_value = "abc"
  116. generate_timestamp.return_value = "123"
  117. fake_xml = StringIO("hello world")
  118. headers = {"Content-Type": "application/xml"}
  119. signature = 'OAuth oauth_nonce="abc", oauth_timestamp="123", oauth_version="1.0", oauth_signature_method="HMAC-SHA1", oauth_consumer_key="foo", oauth_signature="h2sRqLArjhlc5p3FTkuNogVHlKE%3D"'
  120. auth = OAuth1Session("foo")
  121. auth.send = self.verify_signature(signature)
  122. auth.post("https://i.b", headers=headers, files=[("fake", fake_xml)])
  123. @mock.patch("oauthlib.oauth1.rfc5849.generate_timestamp")
  124. @mock.patch("oauthlib.oauth1.rfc5849.generate_nonce")
  125. def test_nonascii(self, generate_nonce, generate_timestamp):
  126. generate_nonce.return_value = "abc"
  127. generate_timestamp.return_value = "123"
  128. signature = 'OAuth oauth_nonce="abc", oauth_timestamp="123", oauth_version="1.0", oauth_signature_method="HMAC-SHA1", oauth_consumer_key="foo", oauth_signature="W0haoue5IZAZoaJiYCtfqwMf8x8%3D"'
  129. auth = OAuth1Session("foo")
  130. auth.send = self.verify_signature(signature)
  131. auth.post("https://i.b?cjk=%E5%95%A6%E5%95%A6")
  132. def test_authorization_url(self):
  133. auth = OAuth1Session("foo")
  134. url = "https://example.comm/authorize"
  135. token = "asluif023sf"
  136. auth_url = auth.authorization_url(url, request_token=token)
  137. self.assertEqual(auth_url, url + "?oauth_token=" + token)
  138. def test_parse_response_url(self):
  139. url = "https://i.b/callback?oauth_token=foo&oauth_verifier=bar"
  140. auth = OAuth1Session("foo")
  141. resp = auth.parse_authorization_response(url)
  142. self.assertEqual(resp["oauth_token"], "foo")
  143. self.assertEqual(resp["oauth_verifier"], "bar")
  144. for k, v in resp.items():
  145. self.assertIsInstance(k, unicode_type)
  146. self.assertIsInstance(v, unicode_type)
  147. def test_fetch_request_token(self):
  148. auth = OAuth1Session("foo")
  149. auth.send = self.fake_body("oauth_token=foo")
  150. resp = auth.fetch_request_token("https://example.com/token")
  151. self.assertEqual(resp["oauth_token"], "foo")
  152. for k, v in resp.items():
  153. self.assertIsInstance(k, unicode_type)
  154. self.assertIsInstance(v, unicode_type)
  155. def test_fetch_request_token_with_optional_arguments(self):
  156. auth = OAuth1Session("foo")
  157. auth.send = self.fake_body("oauth_token=foo")
  158. resp = auth.fetch_request_token(
  159. "https://example.com/token", verify=False, stream=True
  160. )
  161. self.assertEqual(resp["oauth_token"], "foo")
  162. for k, v in resp.items():
  163. self.assertIsInstance(k, unicode_type)
  164. self.assertIsInstance(v, unicode_type)
  165. def test_fetch_access_token(self):
  166. auth = OAuth1Session("foo", verifier="bar")
  167. auth.send = self.fake_body("oauth_token=foo")
  168. resp = auth.fetch_access_token("https://example.com/token")
  169. self.assertEqual(resp["oauth_token"], "foo")
  170. for k, v in resp.items():
  171. self.assertIsInstance(k, unicode_type)
  172. self.assertIsInstance(v, unicode_type)
  173. def test_fetch_access_token_with_optional_arguments(self):
  174. auth = OAuth1Session("foo", verifier="bar")
  175. auth.send = self.fake_body("oauth_token=foo")
  176. resp = auth.fetch_access_token(
  177. "https://example.com/token", verify=False, stream=True
  178. )
  179. self.assertEqual(resp["oauth_token"], "foo")
  180. for k, v in resp.items():
  181. self.assertIsInstance(k, unicode_type)
  182. self.assertIsInstance(v, unicode_type)
  183. def _test_fetch_access_token_raises_error(self, auth):
  184. """Assert that an error is being raised whenever there's no verifier
  185. passed in to the client.
  186. """
  187. auth.send = self.fake_body("oauth_token=foo")
  188. with self.assertRaises(ValueError) as cm:
  189. auth.fetch_access_token("https://example.com/token")
  190. self.assertEqual("No client verifier has been set.", str(cm.exception))
  191. def test_fetch_token_invalid_response(self):
  192. auth = OAuth1Session("foo")
  193. auth.send = self.fake_body("not valid urlencoded response!")
  194. self.assertRaises(
  195. ValueError, auth.fetch_request_token, "https://example.com/token"
  196. )
  197. for code in (400, 401, 403):
  198. auth.send = self.fake_body("valid=response", code)
  199. with self.assertRaises(ValueError) as cm:
  200. auth.fetch_request_token("https://example.com/token")
  201. self.assertEqual(cm.exception.status_code, code)
  202. self.assertIsInstance(cm.exception.response, requests.Response)
  203. def test_fetch_access_token_missing_verifier(self):
  204. self._test_fetch_access_token_raises_error(OAuth1Session("foo"))
  205. def test_fetch_access_token_has_verifier_is_none(self):
  206. auth = OAuth1Session("foo")
  207. del auth._client.client.verifier
  208. self._test_fetch_access_token_raises_error(auth)
  209. def test_token_proxy_set(self):
  210. token = {
  211. "oauth_token": "fake-key",
  212. "oauth_token_secret": "fake-secret",
  213. "oauth_verifier": "fake-verifier",
  214. }
  215. sess = OAuth1Session("foo")
  216. self.assertIsNone(sess._client.client.resource_owner_key)
  217. self.assertIsNone(sess._client.client.resource_owner_secret)
  218. self.assertIsNone(sess._client.client.verifier)
  219. self.assertEqual(sess.token, {})
  220. sess.token = token
  221. self.assertEqual(sess._client.client.resource_owner_key, "fake-key")
  222. self.assertEqual(sess._client.client.resource_owner_secret, "fake-secret")
  223. self.assertEqual(sess._client.client.verifier, "fake-verifier")
  224. def test_token_proxy_get(self):
  225. token = {
  226. "oauth_token": "fake-key",
  227. "oauth_token_secret": "fake-secret",
  228. "oauth_verifier": "fake-verifier",
  229. }
  230. sess = OAuth1Session(
  231. "foo",
  232. resource_owner_key=token["oauth_token"],
  233. resource_owner_secret=token["oauth_token_secret"],
  234. verifier=token["oauth_verifier"],
  235. )
  236. self.assertEqual(sess.token, token)
  237. sess._client.client.resource_owner_key = "different-key"
  238. token["oauth_token"] = "different-key"
  239. self.assertEqual(sess.token, token)
  240. def test_authorized_false(self):
  241. sess = OAuth1Session("foo")
  242. self.assertIs(sess.authorized, False)
  243. def test_authorized_false_rsa(self):
  244. signature = (
  245. "OAuth "
  246. 'oauth_nonce="abc", oauth_timestamp="123", oauth_version="1.0", '
  247. 'oauth_signature_method="RSA-SHA1", oauth_consumer_key="foo", '
  248. 'oauth_signature="{sig}"'
  249. ).format(sig=TEST_RSA_OAUTH_SIGNATURE)
  250. sess = OAuth1Session(
  251. "foo", signature_method=SIGNATURE_RSA, rsa_key=TEST_RSA_KEY
  252. )
  253. sess.send = self.verify_signature(signature)
  254. self.assertIs(sess.authorized, False)
  255. def test_authorized_true(self):
  256. sess = OAuth1Session("key", "secret", verifier="bar")
  257. sess.send = self.fake_body("oauth_token=foo&oauth_token_secret=bar")
  258. sess.fetch_access_token("https://example.com/token")
  259. self.assertIs(sess.authorized, True)
  260. @mock.patch("oauthlib.oauth1.rfc5849.generate_timestamp")
  261. @mock.patch("oauthlib.oauth1.rfc5849.generate_nonce")
  262. def test_authorized_true_rsa(self, generate_nonce, generate_timestamp):
  263. if not cryptography:
  264. raise unittest.SkipTest("cryptography module is required")
  265. if not jwt:
  266. raise unittest.SkipTest("pyjwt module is required")
  267. generate_nonce.return_value = "abc"
  268. generate_timestamp.return_value = "123"
  269. signature = (
  270. "OAuth "
  271. 'oauth_nonce="abc", oauth_timestamp="123", oauth_version="1.0", '
  272. 'oauth_signature_method="RSA-SHA1", oauth_consumer_key="foo", '
  273. 'oauth_verifier="bar", oauth_signature="{sig}"'
  274. ).format(sig=TEST_RSA_OAUTH_SIGNATURE)
  275. sess = OAuth1Session(
  276. "key",
  277. "secret",
  278. signature_method=SIGNATURE_RSA,
  279. rsa_key=TEST_RSA_KEY,
  280. verifier="bar",
  281. )
  282. sess.send = self.fake_body("oauth_token=foo&oauth_token_secret=bar")
  283. sess.fetch_access_token("https://example.com/token")
  284. self.assertIs(sess.authorized, True)
  285. def verify_signature(self, signature):
  286. def fake_send(r, **kwargs):
  287. auth_header = r.headers["Authorization"]
  288. if isinstance(auth_header, bytes):
  289. auth_header = auth_header.decode("utf-8")
  290. self.assertEqual(auth_header, signature)
  291. resp = mock.MagicMock(spec=requests.Response)
  292. resp.cookies = []
  293. return resp
  294. return fake_send
  295. def fake_body(self, body, status_code=200):
  296. def fake_send(r, **kwargs):
  297. resp = mock.MagicMock(spec=requests.Response)
  298. resp.cookies = []
  299. resp.text = body
  300. resp.status_code = status_code
  301. return resp
  302. return fake_send